Change headsync logic (ensure space closing on nodes)

This commit is contained in:
mcrakhman 2023-01-02 10:54:17 +01:00 committed by Mikhail Iudin
parent 0f257cbc0c
commit 9ba3c1b34a
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
14 changed files with 156 additions and 96 deletions

View File

@ -38,6 +38,16 @@ func (s *storageService) SpaceStorage(id string) (spacestorage.SpaceStorage, err
return newSpaceStorage(s.db, id) return newSpaceStorage(s.db, id)
} }
func (s *storageService) SpaceExists(id string) bool {
return s.db.View(func(txn *badger.Txn) error {
_, err := getTxn(txn, newSpaceKeys(id).HeaderKey())
if err != nil {
return err
}
return nil
}) == nil
}
func (s *storageService) CreateSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) { func (s *storageService) CreateSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) {
return createSpaceStorage(s.db, payload) return createSpaceStorage(s.db, payload)
} }

View File

@ -5,7 +5,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" "golang.org/x/exp/slices"
) )
type ConfConnector interface { type ConfConnector interface {
@ -50,7 +50,7 @@ func (s *confConnector) connectOneOrMany(
allNodes = s.conf.NodeIds(spaceId) allNodes = s.conf.NodeIds(spaceId)
) )
for _, id := range allNodes { for _, id := range allNodes {
if slice.FindPos(activeNodeIds, id) == -1 { if !slices.Contains(activeNodeIds, id) {
inactiveNodeIds = append(inactiveNodeIds, id) inactiveNodeIds = append(inactiveNodeIds, id)
} }
} }

View File

@ -9,7 +9,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/syncobjectgetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/object/syncobjectgetter"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectsync/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectsync/synchandler"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/periodicsync"
"go.uber.org/zap" "go.uber.org/zap"
"time" "time"
) )
@ -32,7 +31,6 @@ type objectSync struct {
streamPool StreamPool streamPool StreamPool
checker StreamChecker checker StreamChecker
periodicSync periodicsync.PeriodicSync
objectGetter syncobjectgetter.SyncObjectGetter objectGetter syncobjectgetter.SyncObjectGetter
actionQueue ActionQueue actionQueue ActionQueue
@ -42,8 +40,7 @@ type objectSync struct {
func NewObjectSync( func NewObjectSync(
spaceId string, spaceId string,
confConnector confconnector.ConfConnector, confConnector confconnector.ConfConnector) (objectSync ObjectSync) {
periodicSeconds int) (objectSync ObjectSync) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return objectSync.HandleMessage(ctx, senderId, message) return objectSync.HandleMessage(ctx, senderId, message)
}) })
@ -57,14 +54,9 @@ func NewObjectSync(
clientFactory, clientFactory,
syncCtx, syncCtx,
syncLog) syncLog)
periodicSync := periodicsync.NewPeriodicSync(periodicSeconds, 0, func(ctx context.Context) error {
checker.CheckResponsiblePeers()
return nil
}, syncLog)
objectSync = newObjectSync( objectSync = newObjectSync(
spaceId, spaceId,
streamPool, streamPool,
periodicSync,
checker, checker,
syncCtx, syncCtx,
cancel) cancel)
@ -74,31 +66,28 @@ func NewObjectSync(
func newObjectSync( func newObjectSync(
spaceId string, spaceId string,
streamPool StreamPool, streamPool StreamPool,
periodicSync periodicsync.PeriodicSync,
checker StreamChecker, checker StreamChecker,
syncCtx context.Context, syncCtx context.Context,
cancel context.CancelFunc, cancel context.CancelFunc,
) *objectSync { ) *objectSync {
return &objectSync{ return &objectSync{
periodicSync: periodicSync, streamPool: streamPool,
streamPool: streamPool, spaceId: spaceId,
spaceId: spaceId, checker: checker,
checker: checker, syncCtx: syncCtx,
syncCtx: syncCtx, cancelSync: cancel,
cancelSync: cancel, actionQueue: NewDefaultActionQueue(),
actionQueue: NewDefaultActionQueue(),
} }
} }
func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) { func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) {
s.objectGetter = objectGetter s.objectGetter = objectGetter
s.actionQueue.Run() s.actionQueue.Run()
s.periodicSync.Run() go s.checker.CheckResponsiblePeers()
} }
func (s *objectSync) Close() (err error) { func (s *objectSync) Close() (err error) {
s.actionQueue.Close() s.actionQueue.Close()
s.periodicSync.Close()
s.cancelSync() s.cancelSync()
return s.streamPool.Close() return s.streamPool.Close()
} }

View File

@ -118,7 +118,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
} }
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, syncStatus, log) headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, syncStatus, log)
objectSync := objectsync.NewObjectSync(id, confConnector, s.config.SyncPeriod) objectSync := objectsync.NewObjectSync(id, confConnector)
sp := &space{ sp := &space{
id: id, id: id,
objectSync: objectSync, objectSync: objectSync,

View File

@ -47,6 +47,7 @@ type SpaceStorageCreatePayload struct {
type SpaceStorageProvider interface { type SpaceStorageProvider interface {
app.Component app.Component
SpaceStorage(id string) (SpaceStorage, error) SpaceStorage(id string) (SpaceStorage, error)
SpaceExists(id string) bool
CreateSpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error) CreateSpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error)
} }

View File

@ -8,7 +8,8 @@ import (
var ( var (
errGroup = rpcerr.ErrGroup(ErrCodes_ErrorOffset) errGroup = rpcerr.ErrGroup(ErrCodes_ErrorOffset)
ErrUnexpected = errGroup.Register(errors.New("unexpected error"), uint64(ErrCodes_Unexpected)) ErrUnexpected = errGroup.Register(errors.New("unexpected error"), uint64(ErrCodes_Unexpected))
ErrSpaceMissing = errGroup.Register(errors.New("space is missing"), uint64(ErrCodes_SpaceMissing)) ErrSpaceMissing = errGroup.Register(errors.New("space is missing"), uint64(ErrCodes_SpaceMissing))
ErrSpaceExists = errGroup.Register(errors.New("space exists"), uint64(ErrCodes_SpaceExists)) ErrSpaceExists = errGroup.Register(errors.New("space exists"), uint64(ErrCodes_SpaceExists))
ErrSpaceNotInCache = errGroup.Register(errors.New("space not in cache"), uint64(ErrCodes_SpaceNotInCache))
) )

View File

@ -7,6 +7,7 @@ enum ErrCodes {
Unexpected = 0; Unexpected = 0;
SpaceMissing = 1; SpaceMissing = 1;
SpaceExists = 2; SpaceExists = 2;
SpaceNotInCache = 3;
ErrorOffset = 100; ErrorOffset = 100;
} }

View File

@ -25,24 +25,27 @@ const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ErrCodes int32 type ErrCodes int32
const ( const (
ErrCodes_Unexpected ErrCodes = 0 ErrCodes_Unexpected ErrCodes = 0
ErrCodes_SpaceMissing ErrCodes = 1 ErrCodes_SpaceMissing ErrCodes = 1
ErrCodes_SpaceExists ErrCodes = 2 ErrCodes_SpaceExists ErrCodes = 2
ErrCodes_ErrorOffset ErrCodes = 100 ErrCodes_SpaceNotInCache ErrCodes = 3
ErrCodes_ErrorOffset ErrCodes = 100
) )
var ErrCodes_name = map[int32]string{ var ErrCodes_name = map[int32]string{
0: "Unexpected", 0: "Unexpected",
1: "SpaceMissing", 1: "SpaceMissing",
2: "SpaceExists", 2: "SpaceExists",
3: "SpaceNotInCache",
100: "ErrorOffset", 100: "ErrorOffset",
} }
var ErrCodes_value = map[string]int32{ var ErrCodes_value = map[string]int32{
"Unexpected": 0, "Unexpected": 0,
"SpaceMissing": 1, "SpaceMissing": 1,
"SpaceExists": 2, "SpaceExists": 2,
"ErrorOffset": 100, "SpaceNotInCache": 3,
"ErrorOffset": 100,
} }
func (x ErrCodes) String() string { func (x ErrCodes) String() string {
@ -1071,63 +1074,64 @@ func init() {
} }
var fileDescriptor_80e49f1f4ac27799 = []byte{ var fileDescriptor_80e49f1f4ac27799 = []byte{
// 887 bytes of a gzipped FileDescriptorProto // 903 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4d, 0x6f, 0x1b, 0x45, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x6f, 0x1b, 0x45,
0x18, 0xf6, 0x6e, 0x9c, 0x26, 0x7e, 0xb3, 0x75, 0xdd, 0x21, 0x85, 0xc5, 0x8d, 0x5c, 0x6b, 0x0e, 0x14, 0xf7, 0x6e, 0x9c, 0x26, 0x7e, 0xd9, 0x3a, 0xdb, 0x69, 0x0a, 0x8b, 0x1b, 0xb9, 0xd6, 0x1e,
0x28, 0xe2, 0xd0, 0x8f, 0x14, 0x81, 0x10, 0x70, 0xa0, 0x89, 0x4b, 0x2d, 0x54, 0x52, 0x8d, 0x41, 0x50, 0xc4, 0xa1, 0x7f, 0x52, 0x04, 0x42, 0xc0, 0x81, 0x26, 0x2e, 0x5d, 0xa1, 0x92, 0x6a, 0x0c,
0x48, 0x48, 0x20, 0x4d, 0x77, 0xdf, 0xd8, 0x8b, 0xd6, 0x33, 0xcb, 0xce, 0x98, 0xc6, 0x07, 0x0e, 0x42, 0x42, 0x02, 0x69, 0xba, 0xfb, 0x62, 0x2f, 0x5a, 0xcf, 0x2c, 0x3b, 0x63, 0x1a, 0x1f, 0x38,
0x9c, 0xb8, 0xf2, 0x17, 0xf8, 0x0f, 0xfc, 0x08, 0x8e, 0x3d, 0x72, 0x44, 0xc9, 0x1f, 0x41, 0x33, 0x70, 0xe2, 0xca, 0x57, 0xe0, 0x3b, 0xf0, 0x21, 0x38, 0xf6, 0xc8, 0x11, 0x25, 0x5f, 0x04, 0xcd,
0xfb, 0x1d, 0x6f, 0x7a, 0xe8, 0x65, 0x33, 0xef, 0xd7, 0xf3, 0x7e, 0xcc, 0x3b, 0x4f, 0x0c, 0x8f, 0xec, 0x5f, 0xdb, 0x9b, 0x1c, 0xb8, 0x38, 0x33, 0xef, 0xcf, 0xef, 0xfd, 0xde, 0x9b, 0x99, 0xdf,
0x02, 0xb9, 0x5c, 0x4a, 0xa1, 0x12, 0x1e, 0xe0, 0x03, 0xfb, 0x55, 0x6b, 0x11, 0x24, 0xa9, 0xd4, 0x06, 0x9e, 0x84, 0x62, 0x3e, 0x17, 0x5c, 0xa6, 0x2c, 0xc4, 0x47, 0xe6, 0x57, 0x2e, 0x79, 0x98,
0xf2, 0x81, 0xfd, 0xaa, 0x4a, 0x7b, 0xdf, 0x2a, 0x48, 0xaf, 0x54, 0xd0, 0x29, 0xdc, 0x7c, 0x86, 0x66, 0x42, 0x89, 0x47, 0xe6, 0x57, 0xd6, 0xd6, 0x87, 0xc6, 0x40, 0x7a, 0x95, 0xc1, 0x0f, 0xe0,
0x3c, 0x9c, 0xad, 0x45, 0xc0, 0xb8, 0x98, 0x23, 0x21, 0xd0, 0x3d, 0x4b, 0xe5, 0xd2, 0x77, 0xc6, 0xf6, 0x0b, 0x64, 0xd1, 0x64, 0xc9, 0x43, 0xca, 0xf8, 0x14, 0x09, 0x81, 0xee, 0x79, 0x26, 0xe6,
0xce, 0x61, 0x97, 0xd9, 0x33, 0xe9, 0x83, 0xab, 0xa5, 0xef, 0x5a, 0x8d, 0xab, 0x25, 0xd9, 0x87, 0x9e, 0x35, 0xb2, 0x8e, 0xba, 0xd4, 0xac, 0x49, 0x1f, 0x6c, 0x25, 0x3c, 0xdb, 0x58, 0x6c, 0x25,
0xed, 0x38, 0x5a, 0x46, 0xda, 0xdf, 0x1a, 0x3b, 0x87, 0x37, 0x59, 0x26, 0xd0, 0x73, 0xe8, 0x97, 0xc8, 0x01, 0x6c, 0x27, 0xf1, 0x3c, 0x56, 0xde, 0xd6, 0xc8, 0x3a, 0xba, 0x4d, 0xf3, 0x8d, 0x7f,
0x50, 0xa8, 0x56, 0xb1, 0x36, 0x58, 0x0b, 0xae, 0x16, 0x16, 0xcb, 0x63, 0xf6, 0x4c, 0x3e, 0x87, 0x01, 0xfd, 0x0a, 0x0a, 0xe5, 0x22, 0x51, 0x1a, 0x6b, 0xc6, 0xe4, 0xcc, 0x60, 0x39, 0xd4, 0xac,
0x5d, 0x8c, 0x71, 0x89, 0x42, 0x2b, 0xdf, 0x1d, 0x6f, 0x1d, 0xee, 0x1d, 0x8d, 0xef, 0x57, 0xf5, 0xc9, 0x67, 0xb0, 0x8b, 0x09, 0xce, 0x91, 0x2b, 0xe9, 0xd9, 0xa3, 0xad, 0xa3, 0xbd, 0xe3, 0xd1,
0x35, 0x01, 0x26, 0x99, 0x23, 0x2b, 0x23, 0x4c, 0xe6, 0x40, 0xae, 0x44, 0x99, 0xd9, 0x0a, 0xf4, 0xc3, 0x9a, 0xdf, 0x2a, 0xc0, 0x38, 0x0f, 0xa4, 0x55, 0x86, 0xae, 0x1c, 0x8a, 0x05, 0xaf, 0x2a,
0x33, 0xb8, 0xd3, 0x1a, 0x68, 0x0a, 0x8f, 0x42, 0x9b, 0xbe, 0xc7, 0xdc, 0x28, 0xb4, 0x05, 0x21, 0x9b, 0x8d, 0xff, 0x29, 0xdc, 0x6b, 0x4d, 0xd4, 0xc4, 0xe3, 0xc8, 0x94, 0xef, 0x51, 0x3b, 0x8e,
0x0f, 0x6d, 0x2b, 0x3d, 0x66, 0xcf, 0xf4, 0x47, 0xb8, 0x55, 0x05, 0xff, 0xb2, 0x42, 0xa5, 0x89, 0x0c, 0x21, 0x64, 0x91, 0x69, 0xa5, 0x47, 0xcd, 0xda, 0xff, 0x01, 0xf6, 0xeb, 0xe4, 0x9f, 0x17,
0x0f, 0x3b, 0xb6, 0xa4, 0x69, 0x11, 0x5b, 0x88, 0xe4, 0x21, 0xdc, 0x48, 0xcd, 0x98, 0x8a, 0xda, 0x28, 0x15, 0xf1, 0x60, 0xc7, 0x50, 0x0a, 0xca, 0xdc, 0x72, 0x4b, 0x1e, 0xc3, 0xad, 0x4c, 0x8f,
0xfd, 0xb6, 0xda, 0x8d, 0x03, 0xcb, 0xfd, 0xe8, 0x57, 0x30, 0xa8, 0xd5, 0x96, 0x48, 0xa1, 0x90, 0xa9, 0xe4, 0xee, 0xb5, 0x71, 0xd7, 0x01, 0xb4, 0x88, 0xf3, 0xbf, 0x04, 0xb7, 0xc1, 0x2d, 0x15,
0x3c, 0x86, 0x9d, 0xd4, 0xd6, 0xa9, 0x7c, 0xc7, 0xc2, 0xbc, 0x7f, 0xed, 0x08, 0x58, 0xe1, 0x49, 0x5c, 0x22, 0x79, 0x0a, 0x3b, 0x99, 0xe1, 0x29, 0x3d, 0xcb, 0xc0, 0xbc, 0x77, 0xed, 0x08, 0x68,
0x7f, 0x83, 0xdb, 0xa7, 0x2f, 0x7f, 0xc6, 0x40, 0x1b, 0xe3, 0x73, 0x54, 0x8a, 0xcf, 0xf1, 0x0d, 0x19, 0xe9, 0xff, 0x0a, 0x77, 0xce, 0x5e, 0xff, 0x84, 0xa1, 0xd2, 0xce, 0x97, 0x28, 0x25, 0x9b,
0x95, 0xfa, 0x26, 0x47, 0x12, 0xaf, 0xa7, 0x45, 0xb7, 0x85, 0x68, 0x2c, 0x09, 0x5f, 0xc7, 0x92, 0xe2, 0x0d, 0x4c, 0x3d, 0x5d, 0x23, 0x4d, 0x96, 0x41, 0xd9, 0x6d, 0xb9, 0xd5, 0x9e, 0x94, 0x2d,
0x87, 0x76, 0x8a, 0x1e, 0x2b, 0x44, 0x32, 0x84, 0x5d, 0x69, 0x53, 0x4c, 0x43, 0xbf, 0x6b, 0x83, 0x13, 0xc1, 0x22, 0x33, 0x45, 0x87, 0x96, 0x5b, 0x32, 0x80, 0x5d, 0x61, 0x4a, 0x04, 0x91, 0xd7,
0x4a, 0x99, 0x4e, 0x60, 0x30, 0x33, 0xd0, 0x2f, 0x56, 0x6a, 0x51, 0xcc, 0xe9, 0x51, 0x85, 0x64, 0x35, 0x49, 0xd5, 0xde, 0x1f, 0x83, 0x3b, 0xd1, 0xd0, 0xaf, 0x16, 0x72, 0x56, 0xce, 0xe9, 0x49,
0xb2, 0xef, 0x1d, 0xbd, 0x57, 0xeb, 0x23, 0xf3, 0xce, 0xcc, 0x65, 0x0a, 0xfa, 0x0e, 0xdc, 0xae, 0x8d, 0xa4, 0xab, 0xef, 0x1d, 0xbf, 0xdb, 0xe8, 0x23, 0x8f, 0xce, 0xdd, 0x55, 0x09, 0xff, 0x2e,
0xc1, 0x64, 0xf3, 0xa0, 0xb4, 0xc4, 0x8e, 0xe3, 0x02, 0xfb, 0xca, 0xd5, 0xd1, 0xa7, 0x65, 0xa0, 0xdc, 0x69, 0xc0, 0xe4, 0xf3, 0xf0, 0xfd, 0x0a, 0x3b, 0x49, 0x4a, 0xec, 0xb5, 0xa3, 0xf3, 0x9f,
0xf1, 0xc9, 0x07, 0xf9, 0x16, 0x05, 0xfc, 0xee, 0x82, 0x57, 0xb7, 0x90, 0x2f, 0x61, 0xcf, 0xc6, 0x57, 0x89, 0x3a, 0xa6, 0x18, 0xe4, 0xff, 0x20, 0xf0, 0x9b, 0x0d, 0x4e, 0xd3, 0x43, 0xbe, 0x80,
0x98, 0xb9, 0x63, 0x9a, 0xe3, 0xdc, 0xab, 0xe1, 0x30, 0xfe, 0x6a, 0x56, 0x39, 0x7c, 0x1f, 0xe9, 0x3d, 0x93, 0xa3, 0xe7, 0x8e, 0x59, 0x81, 0xf3, 0xa0, 0x81, 0x43, 0xd9, 0x9b, 0x49, 0x1d, 0xf0,
0xc5, 0x34, 0x64, 0xf5, 0x18, 0x32, 0x02, 0xe0, 0x41, 0x9c, 0x03, 0xda, 0x71, 0x7b, 0xac, 0xa6, 0x5d, 0xac, 0x66, 0x41, 0x44, 0x9b, 0x39, 0x64, 0x08, 0xc0, 0xc2, 0xa4, 0x00, 0x34, 0xe3, 0x76,
0x21, 0x14, 0xbc, 0x4a, 0x9a, 0x66, 0x63, 0xef, 0xb1, 0x86, 0x8e, 0x1c, 0xc1, 0xbe, 0x85, 0x9c, 0x68, 0xc3, 0x42, 0x7c, 0x70, 0xea, 0x5d, 0x90, 0x8f, 0xbd, 0x47, 0x57, 0x6c, 0xe4, 0x18, 0x0e,
0xa1, 0xd6, 0x91, 0x98, 0xab, 0x02, 0xad, 0x6b, 0xd1, 0x5a, 0x6d, 0xe4, 0x63, 0x78, 0xb7, 0x4d, 0x0c, 0xe4, 0x04, 0x95, 0x8a, 0xf9, 0x54, 0x96, 0x68, 0x5d, 0x83, 0xd6, 0xea, 0x23, 0x1f, 0xc1,
0x3f, 0x0d, 0xfd, 0x6d, 0x9b, 0xe1, 0x1a, 0x2b, 0xfd, 0xcb, 0x81, 0xbd, 0x5a, 0x4b, 0xe6, 0xde, 0x3b, 0x6d, 0xf6, 0x20, 0xf2, 0xb6, 0x4d, 0x85, 0x6b, 0xbc, 0xfe, 0x9f, 0x16, 0xec, 0x35, 0x5a,
0xa3, 0x10, 0x85, 0x8e, 0xf4, 0x3a, 0x7f, 0xab, 0xa5, 0x4c, 0x0e, 0xa0, 0xa7, 0xa3, 0x25, 0x2a, 0xd2, 0xe7, 0x1e, 0x47, 0xc8, 0x55, 0xac, 0x96, 0xc5, 0x5b, 0xad, 0xf6, 0xe4, 0x10, 0x7a, 0x2a,
0xcd, 0x97, 0x89, 0x6d, 0x6d, 0x8b, 0x55, 0x0a, 0x63, 0xb5, 0x39, 0xbe, 0x5d, 0x27, 0x98, 0xb7, 0x9e, 0xa3, 0x54, 0x6c, 0x9e, 0x9a, 0xd6, 0xb6, 0x68, 0x6d, 0xd0, 0x5e, 0x53, 0xe3, 0x9b, 0x65,
0x55, 0x29, 0xc8, 0x07, 0xd0, 0x37, 0x4b, 0x17, 0x05, 0x5c, 0x47, 0x52, 0x7c, 0x8d, 0x6b, 0xdb, 0x8a, 0x45, 0x5b, 0xb5, 0x81, 0xbc, 0x0f, 0x7d, 0x7d, 0xe9, 0xe2, 0x90, 0xa9, 0x58, 0xf0, 0xaf,
0x4d, 0x97, 0x5d, 0xd1, 0x9a, 0x67, 0xa9, 0x10, 0xb3, 0xaa, 0x3d, 0x66, 0xcf, 0xf4, 0x05, 0xf4, 0x70, 0x69, 0xba, 0xe9, 0xd2, 0x35, 0xab, 0x7e, 0x96, 0x12, 0x31, 0x67, 0xed, 0x50, 0xb3, 0xf6,
0x9b, 0x83, 0x27, 0xe3, 0xcd, 0x8b, 0xf2, 0x9a, 0xf7, 0x60, 0xaa, 0x89, 0xe6, 0x82, 0xeb, 0x55, 0x5f, 0x41, 0x7f, 0x75, 0xf0, 0x64, 0xb4, 0x79, 0x50, 0xce, 0xea, 0x39, 0x68, 0x36, 0xf1, 0x94,
0x8a, 0xf9, 0x35, 0x54, 0x0a, 0x7a, 0x02, 0xfb, 0x6d, 0x57, 0x69, 0xa2, 0x52, 0xfe, 0xaa, 0x81, 0x33, 0xb5, 0xc8, 0xb0, 0x38, 0x86, 0xda, 0xe0, 0x9f, 0xc2, 0x41, 0xdb, 0x51, 0xea, 0xac, 0x8c,
0x5a, 0x29, 0xf2, 0x3d, 0x74, 0xcb, 0x3d, 0xfc, 0x09, 0xf6, 0x67, 0xf5, 0xa9, 0x1e, 0x4b, 0xa1, 0xbd, 0x59, 0x41, 0xad, 0x0d, 0xc5, 0x3d, 0xb4, 0xab, 0x7b, 0xf8, 0x23, 0x1c, 0x4c, 0x9a, 0x53,
0x0d, 0xd5, 0x7c, 0x01, 0x5e, 0xf6, 0x56, 0x4e, 0x30, 0x46, 0x8d, 0x2d, 0xfb, 0x78, 0x5a, 0x33, 0x3d, 0x11, 0x5c, 0x69, 0xa9, 0xf9, 0x1c, 0x9c, 0xfc, 0xad, 0x9c, 0x62, 0x82, 0x0a, 0x5b, 0xee,
0x3f, 0xeb, 0xb0, 0x86, 0xfb, 0x93, 0x1d, 0xd8, 0xfe, 0x95, 0xc7, 0x2b, 0xa4, 0x23, 0xf0, 0xea, 0xe3, 0x59, 0xc3, 0xfd, 0xa2, 0x43, 0x57, 0xc2, 0x9f, 0xed, 0xc0, 0xf6, 0x2f, 0x2c, 0x59, 0xa0,
0x8e, 0x1b, 0xef, 0xe0, 0x13, 0xb8, 0xd3, 0xc8, 0x3f, 0x13, 0x3c, 0x51, 0x0b, 0xa9, 0xcd, 0x12, 0x3f, 0x04, 0xa7, 0x19, 0xb8, 0xf1, 0x0e, 0x3e, 0x86, 0x7b, 0x2b, 0xf5, 0x27, 0x9c, 0xa5, 0x72,
0x86, 0x36, 0x24, 0x9c, 0x86, 0x19, 0xaf, 0xf4, 0x58, 0x4d, 0x43, 0xff, 0x70, 0xc0, 0x2b, 0x82, 0x26, 0x94, 0xbe, 0x84, 0x91, 0x49, 0x89, 0x82, 0x28, 0xd7, 0x95, 0x1e, 0x6d, 0x58, 0xfc, 0xdf,
0x4e, 0xb8, 0xe6, 0xe4, 0x53, 0xd8, 0x09, 0xb2, 0xe2, 0x73, 0x16, 0xba, 0x77, 0xf5, 0xf1, 0x5c, 0x2d, 0x70, 0xca, 0xa4, 0x53, 0xa6, 0x18, 0xf9, 0x04, 0x76, 0xc2, 0x9c, 0x7c, 0xa1, 0x42, 0x0f,
0xe9, 0x91, 0x15, 0xfe, 0x86, 0xc4, 0x55, 0x9e, 0xd7, 0x8e, 0xa6, 0x49, 0xe2, 0xad, 0xf5, 0xb1, 0xd6, 0x1f, 0xcf, 0x5a, 0x8f, 0xb4, 0x8c, 0xd7, 0x22, 0x2e, 0x8b, 0xba, 0x66, 0x34, 0xab, 0x22,
0x32, 0xe2, 0xc3, 0x6f, 0x60, 0x77, 0x92, 0xa6, 0xc7, 0x32, 0x44, 0x45, 0xfa, 0x00, 0xdf, 0x09, 0xde, 0xca, 0x8f, 0x56, 0x19, 0x1f, 0x84, 0xb0, 0x3b, 0xce, 0xb2, 0x13, 0x11, 0xa1, 0x24, 0x7d,
0x3c, 0x4f, 0x30, 0xd0, 0x18, 0x0e, 0x3a, 0x64, 0x90, 0xbf, 0xce, 0xe7, 0x91, 0x52, 0x91, 0x98, 0x80, 0x6f, 0x39, 0x5e, 0xa4, 0x18, 0x2a, 0x8c, 0xdc, 0x0e, 0x71, 0x8b, 0xd7, 0xf9, 0x32, 0x96,
0x0f, 0x1c, 0x72, 0x2b, 0xdf, 0xd5, 0xc9, 0x79, 0xa4, 0xb4, 0x1a, 0xb8, 0x46, 0x31, 0x49, 0x53, 0x32, 0xe6, 0x53, 0xd7, 0x22, 0xfb, 0xc5, 0x5d, 0x1d, 0x5f, 0xc4, 0x52, 0x49, 0xd7, 0x26, 0x77,
0x99, 0x9e, 0x9e, 0x9d, 0x29, 0xd4, 0x83, 0xf0, 0xe8, 0x6f, 0x17, 0x7a, 0x59, 0xce, 0xb5, 0x08, 0x61, 0xdf, 0x18, 0xbe, 0x16, 0x2a, 0xe0, 0x27, 0x2c, 0x9c, 0xa1, 0xbb, 0xa5, 0xa3, 0xc6, 0x59,
0xc8, 0x31, 0xec, 0x16, 0x14, 0x4a, 0x86, 0xad, 0xbc, 0x6a, 0x09, 0x66, 0x78, 0xb7, 0x9d, 0x73, 0x26, 0xb2, 0xb3, 0xf3, 0x73, 0x89, 0xca, 0x8d, 0x8e, 0xff, 0xb2, 0xa1, 0x97, 0x13, 0x59, 0xf2,
0x33, 0x62, 0x79, 0x9a, 0x23, 0x1a, 0x9a, 0x22, 0x77, 0x37, 0x48, 0xa5, 0xe2, 0xc0, 0xe1, 0x41, 0x90, 0x9c, 0xc0, 0x6e, 0xa9, 0xab, 0x64, 0xd0, 0x2a, 0xb6, 0x46, 0x75, 0x06, 0xf7, 0xdb, 0x85,
0xbb, 0x71, 0x03, 0x27, 0x8e, 0xdb, 0x70, 0x4a, 0xbe, 0x6b, 0xc3, 0xa9, 0x11, 0x1d, 0x83, 0x41, 0x38, 0x57, 0x9b, 0xe7, 0x05, 0xa2, 0xd6, 0x2e, 0x72, 0x7f, 0x43, 0x69, 0x6a, 0x61, 0x1c, 0x1c,
0x45, 0xfe, 0x33, 0x9d, 0x22, 0x5f, 0x92, 0x83, 0x8d, 0xdd, 0xaa, 0xfd, 0x67, 0x18, 0xbe, 0xd1, 0xb6, 0x3b, 0x37, 0x70, 0x92, 0xa4, 0x0d, 0xa7, 0x12, 0xc1, 0x36, 0x9c, 0x86, 0xfa, 0x51, 0x70,
0x7a, 0xe8, 0x3c, 0x74, 0x9e, 0x7c, 0xf4, 0xcf, 0xc5, 0xc8, 0x79, 0x7d, 0x31, 0x72, 0xfe, 0xbb, 0xeb, 0x2f, 0xc2, 0x44, 0x65, 0xc8, 0xe6, 0xe4, 0x70, 0xe3, 0xc2, 0x35, 0x3e, 0x17, 0x83, 0x1b,
0x18, 0x39, 0x7f, 0x5e, 0x8e, 0x3a, 0xaf, 0x2f, 0x47, 0x9d, 0x7f, 0x2f, 0x47, 0x9d, 0x1f, 0x86, 0xbd, 0x47, 0xd6, 0x63, 0xeb, 0xd9, 0x87, 0x7f, 0x5f, 0x0e, 0xad, 0xb7, 0x97, 0x43, 0xeb, 0xdf,
0xd7, 0xff, 0xa4, 0x78, 0x79, 0xc3, 0xfe, 0x79, 0xfc, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xcb, 0xa1, 0xf5, 0xc7, 0xd5, 0xb0, 0xf3, 0xf6, 0x6a, 0xd8, 0xf9, 0xe7, 0x6a, 0xd8, 0xf9, 0x7e,
0xf0, 0x9f, 0x14, 0x77, 0x08, 0x00, 0x00, 0x70, 0xfd, 0xff, 0x19, 0xaf, 0x6f, 0x99, 0x3f, 0x4f, 0xff, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x24,
0x0c, 0x9c, 0xee, 0x8c, 0x08, 0x00, 0x00,
} }
func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) {

View File

@ -61,7 +61,7 @@ nodes:
- file - file
space: space:
gcTTL: 60 gcTTL: 60
syncPeriod: 600 syncPeriod: 20
storage: storage:
path: db/node/1/data path: db/node/1/data
metric: metric:

View File

@ -61,7 +61,7 @@ nodes:
- file - file
space: space:
gcTTL: 60 gcTTL: 60
syncPeriod: 600 syncPeriod: 20
storage: storage:
path: db/node/2/data path: db/node/2/data
metric: metric:

View File

@ -61,7 +61,7 @@ nodes:
- file - file
space: space:
gcTTL: 60 gcTTL: 60
syncPeriod: 600 syncPeriod: 20
storage: storage:
path: db/node/3/data path: db/node/3/data
metric: metric:

View File

@ -2,6 +2,7 @@ package nodespace
import ( import (
"context" "context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
) )
@ -54,10 +55,17 @@ func (r *rpcHandler) SpacePush(ctx context.Context, req *spacesyncproto.SpacePus
return return
} }
func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
sp, err := r.s.GetSpace(ctx, req.SpaceId) sp, err := r.s.GetOrPickSpace(ctx, req.SpaceId)
if err != nil { if err != nil {
return nil, spacesyncproto.ErrSpaceMissing switch err {
case ocache.ErrNotExists:
err = spacesyncproto.ErrSpaceNotInCache
case spacesyncproto.ErrSpaceMissing:
default:
err = spacesyncproto.ErrUnexpected
}
return
} }
return sp.SpaceSyncRpc().HeadSync(ctx, req) return sp.SpaceSyncRpc().HeadSync(ctx, req)
} }

View File

@ -8,7 +8,10 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
peer "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"golang.org/x/exp/slices"
"time" "time"
) )
@ -22,6 +25,7 @@ func New() Service {
type Service interface { type Service interface {
GetSpace(ctx context.Context, id string) (commonspace.Space, error) GetSpace(ctx context.Context, id string) (commonspace.Space, error)
GetOrPickSpace(ctx context.Context, id string) (commonspace.Space, error)
app.ComponentRunnable app.ComponentRunnable
} }
@ -29,12 +33,14 @@ type service struct {
conf commonspace.Config conf commonspace.Config
spaceCache ocache.OCache spaceCache ocache.OCache
commonSpace commonspace.SpaceService commonSpace commonspace.SpaceService
confService nodeconf.Service
spaceStorageProvider spacestorage.SpaceStorageProvider spaceStorageProvider spacestorage.SpaceStorageProvider
} }
func (s *service) Init(a *app.App) (err error) { func (s *service) Init(a *app.App) (err error) {
s.conf = a.MustComponent("config").(commonspace.ConfigGetter).GetSpace() s.conf = a.MustComponent("config").(commonspace.ConfigGetter).GetSpace()
s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.SpaceService) s.commonSpace = a.MustComponent(commonspace.CName).(commonspace.SpaceService)
s.confService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.spaceStorageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.spaceStorageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
s.spaceCache = ocache.New( s.spaceCache = ocache.New(
s.loadSpace, s.loadSpace,
@ -61,6 +67,37 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e
return v.(commonspace.Space), nil return v.(commonspace.Space), nil
} }
func (s *service) GetOrPickSpace(ctx context.Context, id string) (sp commonspace.Space, err error) {
var v ocache.Object
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return
}
// if we are getting request from our fellow node
// don't try to wake the space, otherwise it can be possible
// that node would be waking up infinitely, depending on
// OCache ttl and HeadSync period
if slices.Contains(s.confService.GetLast().NodeIds(id), peerId) {
v, err = s.spaceCache.Pick(ctx, id)
if err != nil {
// safely checking that we don't have the space storage
// this should not open the database in case of node
if !s.spaceStorageProvider.SpaceExists(id) {
err = spacesyncproto.ErrSpaceMissing
}
return
}
} else {
// if the request is from the client it is safe to wake up the node
v, err = s.spaceCache.Get(ctx, id)
if err != nil {
return
}
}
sp = v.(commonspace.Space)
return
}
func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) { func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) {
cc, err := s.commonSpace.NewSpace(ctx, id) cc, err := s.commonSpace.NewSpace(ctx, id)
if err != nil { if err != nil {

View File

@ -4,6 +4,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacestorage"
"os" "os"
"path"
) )
type storageService struct { type storageService struct {
@ -33,6 +34,14 @@ func (s *storageService) SpaceStorage(id string) (spacestorage.SpaceStorage, err
return newSpaceStorage(s.rootPath, id) return newSpaceStorage(s.rootPath, id)
} }
func (s *storageService) SpaceExists(id string) bool {
dbPath := path.Join(s.rootPath, id)
if _, err := os.Stat(dbPath); err != nil {
return false
}
return true
}
func (s *storageService) CreateSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) { func (s *storageService) CreateSpaceStorage(payload spacestorage.SpaceStorageCreatePayload) (spacestorage.SpaceStorage, error) {
return createSpaceStorage(s.rootPath, payload) return createSpaceStorage(s.rootPath, payload)
} }