diff --git a/common/commonspace/commongetter.go b/common/commonspace/commongetter.go new file mode 100644 index 00000000..da40bfff --- /dev/null +++ b/common/commonspace/commongetter.go @@ -0,0 +1,35 @@ +package commonspace + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectgetter" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" +) + +type commonSpaceGetter struct { + spaceId string + aclList *syncacl.SyncACL + treeGetter treegetter.TreeGetter +} + +func newCommonSpaceGetter(spaceId string, aclList *syncacl.SyncACL, treeGetter treegetter.TreeGetter) objectgetter.ObjectGetter { + return &commonSpaceGetter{ + spaceId: spaceId, + aclList: aclList, + treeGetter: treeGetter, + } +} + +func (c *commonSpaceGetter) GetObject(ctx context.Context, objectId string) (obj objectgetter.Object, err error) { + if c.aclList.ID() == objectId { + obj = c.aclList + return + } + t, err := c.treeGetter.GetTree(ctx, c.spaceId, objectId) + if err != nil { + return + } + obj = t.(objectgetter.Object) + return +} diff --git a/common/commonspace/objectgetter/objectgetter.go b/common/commonspace/objectgetter/objectgetter.go new file mode 100644 index 00000000..11b9e835 --- /dev/null +++ b/common/commonspace/objectgetter/objectgetter.go @@ -0,0 +1,14 @@ +package objectgetter + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" +) + +type Object interface { + synchandler.SyncHandler +} + +type ObjectGetter interface { + GetObject(ctx context.Context, objectId string) (Object, error) +} diff --git a/common/commonspace/service.go b/common/commonspace/service.go index b6bfee6f..23794ebc 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -87,10 +87,11 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { if err != nil { return nil, err } + lastConfiguration := s.configurationService.GetLast() confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.cache, log) - syncService := syncservice.NewSyncService(id, diffService, s.cache, lastConfiguration, confConnector) + syncService := syncservice.NewSyncService(id, diffService, lastConfiguration, confConnector) sp := &space{ id: id, syncService: syncService, diff --git a/common/commonspace/space.go b/common/commonspace/space.go index c84dc1c1..73b6f770 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -8,6 +8,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncacl" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" @@ -73,7 +74,7 @@ type space struct { storage storage.SpaceStorage cache treegetter.TreeGetter account account.Service - aclList list.ACLList + aclList *syncacl.SyncACL isClosed atomic.Bool } @@ -96,12 +97,14 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } - s.aclList, err = list.BuildACLListWithIdentity(s.account.Account(), aclStorage) + aclList, err := list.BuildACLListWithIdentity(s.account.Account(), aclStorage) if err != nil { return } + s.aclList = syncacl.NewSyncACL(aclList, s.syncService.SyncClient()) + objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache) + s.syncService.Init(objectGetter) s.diffService.Init(initialIds) - s.syncService.Init() return nil } @@ -126,7 +129,14 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay err = ErrSpaceClosed return } - return synctree.DeriveSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage) + deps := synctree.SyncTreeCreateDeps{ + Payload: payload, + SyncClient: s.syncService.SyncClient(), + Listener: listener, + AclList: s.aclList, + CreateStorage: s.storage.CreateTreeStorage, + } + return synctree.DeriveSyncTree(ctx, deps) } func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePayload, listener updatelistener.UpdateListener) (tr tree.ObjectTree, err error) { @@ -134,7 +144,14 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay err = ErrSpaceClosed return } - return synctree.CreateSyncTree(ctx, payload, s.syncService.SyncClient(), listener, s.aclList, s.storage.CreateTreeStorage) + deps := synctree.SyncTreeCreateDeps{ + Payload: payload, + SyncClient: s.syncService.SyncClient(), + Listener: listener, + AclList: s.aclList, + CreateStorage: s.storage.CreateTreeStorage, + } + return synctree.CreateSyncTree(ctx, deps) } func (s *space) BuildTree(ctx context.Context, id string, listener updatelistener.UpdateListener) (t tree.ObjectTree, err error) { @@ -187,7 +204,13 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene return } } - return synctree.BuildSyncTree(ctx, s.syncService.SyncClient(), store.(aclstorage.TreeStorage), listener, s.aclList, isFirstBuild) + deps := synctree.SyncTreeBuildDeps{ + SyncClient: s.syncService.SyncClient(), + Listener: listener, + AclList: s.aclList, + Storage: store, + } + return synctree.BuildSyncTree(ctx, isFirstBuild, deps) } func (s *space) Close() error { diff --git a/common/commonspace/syncacl/syncacl.go b/common/commonspace/syncacl/syncacl.go new file mode 100644 index 00000000..85d65f1a --- /dev/null +++ b/common/commonspace/syncacl/syncacl.go @@ -0,0 +1,21 @@ +package syncacl + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" +) + +type SyncACL struct { + list.ACLList + syncservice.SyncClient + synchandler.SyncHandler +} + +func NewSyncACL(aclList list.ACLList, syncClient syncservice.SyncClient) *SyncACL { + return &SyncACL{ + ACLList: aclList, + SyncClient: syncClient, + SyncHandler: nil, + } +} diff --git a/common/commonspace/syncservice/synchandler/synchhandler.go b/common/commonspace/syncservice/synchandler/synchhandler.go new file mode 100644 index 00000000..e6bed4bf --- /dev/null +++ b/common/commonspace/syncservice/synchandler/synchhandler.go @@ -0,0 +1,10 @@ +package synchandler + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" +) + +type SyncHandler interface { + HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) +} diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 5142a8cc..d11cf94f 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -4,8 +4,9 @@ package syncservice import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/objectgetter" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" @@ -16,9 +17,10 @@ var log = logger.NewNamed("syncservice").Sugar() type SyncService interface { ocache.ObjectLastUsage + synchandler.SyncHandler SyncClient() SyncClient - Init() + Init(getter objectgetter.ObjectGetter) Close() (err error) } @@ -33,6 +35,7 @@ type syncService struct { syncClient SyncClient clientFactory spacesyncproto.ClientFactory + objectGetter objectgetter.ObjectGetter streamLoopCtx context.Context stopStreamLoop context.CancelFunc @@ -43,25 +46,19 @@ type syncService struct { func NewSyncService( spaceId string, headNotifiable HeadNotifiable, - cache treegetter.TreeGetter, configuration nodeconf.Configuration, - confConnector nodeconf.ConfConnector) SyncService { - var syncHandler SyncHandler + confConnector nodeconf.ConfConnector) (syncService SyncService) { streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - return syncHandler.HandleMessage(ctx, senderId, message) + return syncService.HandleMessage(ctx, senderId, message) }) factory := newRequestFactory() syncClient := newSyncClient(spaceId, streamPool, headNotifiable, factory, configuration) - syncHandler = newSyncHandler(spaceId, cache, syncClient) - return newSyncService( + syncService = newSyncService( spaceId, syncClient, spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient), confConnector) -} - -func (s *syncService) LastUsage() time.Time { - return s.syncClient.LastUsage() + return } func newSyncService( @@ -78,7 +75,8 @@ func newSyncService( } } -func (s *syncService) Init() { +func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) { + s.objectGetter = objectGetter s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background()) go s.responsibleStreamCheckLoop(s.streamLoopCtx) } @@ -89,6 +87,18 @@ func (s *syncService) Close() (err error) { return s.syncClient.Close() } +func (s *syncService) LastUsage() time.Time { + return s.syncClient.LastUsage() +} + +func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + obj, err := s.objectGetter.GetObject(ctx, message.TreeId) + if err != nil { + return + } + return obj.HandleMessage(ctx, senderId, message) +} + func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { defer close(s.streamLoopDone) checkResponsiblePeers := func() { diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 62a33465..74d0b312 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" @@ -17,6 +18,7 @@ var ErrSyncTreeClosed = errors.New("sync tree is closed") // SyncTree sends head updates to sync service and also sends new changes to update listener type SyncTree struct { tree.ObjectTree + synchandler.SyncHandler syncClient syncservice.SyncClient listener updatelistener.UpdateListener isClosed bool @@ -28,85 +30,89 @@ var createDerivedObjectTree = tree.CreateDerivedObjectTree var createObjectTree = tree.CreateObjectTree var buildObjectTree = tree.BuildObjectTree +type SyncTreeCreateDeps struct { + Payload tree.ObjectTreeCreatePayload + SyncClient syncservice.SyncClient + Listener updatelistener.UpdateListener + AclList list.ACLList + CreateStorage storage.TreeStorageCreatorFunc +} + +type SyncTreeBuildDeps struct { + SyncClient syncservice.SyncClient + Listener updatelistener.UpdateListener + AclList list.ACLList + Storage storage.TreeStorage +} + func DeriveSyncTree( ctx context.Context, - payload tree.ObjectTreeCreatePayload, - syncClient syncservice.SyncClient, - listener updatelistener.UpdateListener, - aclList list.ACLList, - createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { - t, err = createDerivedObjectTree(payload, aclList, createStorage) + deps SyncTreeCreateDeps) (t tree.ObjectTree, err error) { + t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return } - t = &SyncTree{ + syncTree := &SyncTree{ ObjectTree: t, - syncClient: syncClient, - listener: listener, + syncClient: deps.SyncClient, + listener: deps.Listener, } + syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient) + syncTree.SyncHandler = syncHandler + t = syncTree - headUpdate := syncClient.CreateHeadUpdate(t, nil) - err = syncClient.BroadcastAsync(headUpdate) + headUpdate := deps.SyncClient.CreateHeadUpdate(t, nil) + err = deps.SyncClient.BroadcastAsync(headUpdate) return } func CreateSyncTree( ctx context.Context, - payload tree.ObjectTreeCreatePayload, - syncClient syncservice.SyncClient, - listener updatelistener.UpdateListener, - aclList list.ACLList, - createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) { - t, err = createObjectTree(payload, aclList, createStorage) + deps SyncTreeCreateDeps) (t tree.ObjectTree, err error) { + t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return } - t = &SyncTree{ + syncTree := &SyncTree{ ObjectTree: t, - syncClient: syncClient, - listener: listener, + syncClient: deps.SyncClient, + listener: deps.Listener, } + syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient) + syncTree.SyncHandler = syncHandler + t = syncTree - headUpdate := syncClient.CreateHeadUpdate(t, nil) - err = syncClient.BroadcastAsync(headUpdate) + headUpdate := deps.SyncClient.CreateHeadUpdate(t, nil) + err = deps.SyncClient.BroadcastAsync(headUpdate) return } func BuildSyncTree( ctx context.Context, - syncClient syncservice.SyncClient, - treeStorage storage.TreeStorage, - listener updatelistener.UpdateListener, - aclList list.ACLList, - isFirstBuild bool) (t tree.ObjectTree, err error) { - return buildSyncTree(ctx, syncClient, treeStorage, listener, aclList, isFirstBuild) -} + isFirstBuild bool, + deps SyncTreeBuildDeps) (t tree.ObjectTree, err error) { -func buildSyncTree( - ctx context.Context, - syncClient syncservice.SyncClient, - treeStorage storage.TreeStorage, - listener updatelistener.UpdateListener, - aclList list.ACLList, - isFirstBuild bool) (t tree.ObjectTree, err error) { - t, err = buildObjectTree(treeStorage, aclList) + t, err = buildObjectTree(deps.Storage, deps.AclList) if err != nil { return } - t = &SyncTree{ + syncTree := &SyncTree{ ObjectTree: t, - syncClient: syncClient, - listener: listener, + syncClient: deps.SyncClient, + listener: deps.Listener, } + syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient) + syncTree.SyncHandler = syncHandler + t = syncTree - headUpdate := syncClient.CreateHeadUpdate(t, nil) + headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // here we will have different behaviour based on who is sending this update if isFirstBuild { // send to everybody, because everybody should know that the node or client got new tree - err = syncClient.BroadcastAsync(headUpdate) + err = syncTree.syncClient.BroadcastAsync(headUpdate) } else { // send either to everybody if client or to replica set if node - err = syncClient.BroadcastAsyncOrSendResponsible(headUpdate) + err = syncTree.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) } return } diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/synctree/synctreehandler.go similarity index 80% rename from common/commonspace/syncservice/synchandler.go rename to common/commonspace/synctree/synctreehandler.go index 32c1b00e..fb71f280 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -1,32 +1,27 @@ -package syncservice +package synctree import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" ) -type syncHandler struct { - spaceId string - treeCache treegetter.TreeGetter - syncClient SyncClient +type syncTreeHandler struct { + objTree tree.ObjectTree + syncClient syncservice.SyncClient } -type SyncHandler interface { - HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) -} - -func newSyncHandler(spaceId string, treeCache treegetter.TreeGetter, syncClient SyncClient) *syncHandler { - return &syncHandler{ - spaceId: spaceId, - treeCache: treeCache, +func newSyncTreeHandler(objTree tree.ObjectTree, syncClient syncservice.SyncClient) synchandler.SyncHandler { + return &syncTreeHandler{ + objTree: objTree, syncClient: syncClient, } } -func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) error { +func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) error { content := msg.GetContent() switch { case content.GetFullSyncRequest() != nil: @@ -39,7 +34,7 @@ func (s *syncHandler) HandleMessage(ctx context.Context, senderId string, msg *s return nil } -func (s *syncHandler) handleHeadUpdate( +func (s *syncTreeHandler) handleHeadUpdate( ctx context.Context, senderId string, update *spacesyncproto.ObjectHeadUpdate, @@ -51,11 +46,8 @@ func (s *syncHandler) handleHeadUpdate( var ( fullRequest *spacesyncproto.ObjectSyncMessage isEmptyUpdate = len(update.Changes) == 0 + objTree = s.objTree ) - objTree, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) - if err != nil { - return - } err = func() error { objTree.Lock() @@ -63,6 +55,7 @@ func (s *syncHandler) handleHeadUpdate( // isEmptyUpdate is sent when the tree is brought up from cache if isEmptyUpdate { + log.With("treeId", msg.TreeId).Debug("is empty update") if slice.UnsortedEquals(objTree.Heads(), update.Heads) { return nil } @@ -102,7 +95,7 @@ func (s *syncHandler) handleHeadUpdate( return } -func (s *syncHandler) handleFullSyncRequest( +func (s *syncTreeHandler) handleFullSyncRequest( ctx context.Context, senderId string, request *spacesyncproto.ObjectFullSyncRequest, @@ -115,6 +108,7 @@ func (s *syncHandler) handleFullSyncRequest( var ( fullResponse *spacesyncproto.ObjectSyncMessage header = msg.RootChange + objTree = s.objTree ) defer func() { if err != nil { @@ -122,11 +116,6 @@ func (s *syncHandler) handleFullSyncRequest( } }() - objTree, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) - if err != nil { - return - } - err = func() error { objTree.Lock() defer objTree.Unlock() @@ -152,7 +141,7 @@ func (s *syncHandler) handleFullSyncRequest( return s.syncClient.SendAsync([]string{senderId}, fullResponse) } -func (s *syncHandler) handleFullSyncResponse( +func (s *syncTreeHandler) handleFullSyncResponse( ctx context.Context, senderId string, response *spacesyncproto.ObjectFullSyncResponse, @@ -161,7 +150,7 @@ func (s *syncHandler) handleFullSyncResponse( With("heads", response.Heads). With("treeId", msg.TreeId). Debug("received full sync response message") - objTree, err := s.treeCache.GetTree(ctx, s.spaceId, msg.TreeId) + objTree := s.objTree if err != nil { log.With("senderId", senderId). With("heads", response.Heads). @@ -188,6 +177,6 @@ func (s *syncHandler) handleFullSyncResponse( return } -func (s *syncHandler) alreadyHasHeads(t tree.ObjectTree, heads []string) bool { +func (s *syncTreeHandler) alreadyHasHeads(t tree.ObjectTree, heads []string) bool { return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...) } diff --git a/common/commonspace/syncservice/synchandler_test.go b/common/commonspace/synctree/synctreehandler_test.go similarity index 98% rename from common/commonspace/syncservice/synchandler_test.go rename to common/commonspace/synctree/synctreehandler_test.go index 1b18be9f..e86aedbb 100644 --- a/common/commonspace/syncservice/synchandler_test.go +++ b/common/commonspace/synctree/synctreehandler_test.go @@ -1,4 +1,4 @@ -package syncservice +package synctree import ( "context" @@ -44,7 +44,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { syncClientMock := mock_syncservice.NewMockSyncClient(ctrl) objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl)) - syncHandler := newSyncHandler(spaceId, cacheMock, syncClientMock) + syncHandler := newSyncTreeHandler(spaceId, cacheMock, syncClientMock) t.Run("head update non empty all heads added", func(t *testing.T) { treeId := "treeId" senderId := "senderId" @@ -196,7 +196,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { syncClientMock := mock_syncservice.NewMockSyncClient(ctrl) objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl)) - syncHandler := newSyncHandler(spaceId, cacheMock, syncClientMock) + syncHandler := newSyncTreeHandler(spaceId, cacheMock, syncClientMock) t.Run("full sync request with change", func(t *testing.T) { treeId := "treeId" senderId := "senderId" @@ -307,7 +307,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) { syncClientMock := mock_syncservice.NewMockSyncClient(ctrl) objectTreeMock := newTestObjMock(mock_tree.NewMockObjectTree(ctrl)) - syncHandler := newSyncHandler(spaceId, cacheMock, syncClientMock) + syncHandler := newSyncTreeHandler(spaceId, cacheMock, syncClientMock) t.Run("full sync response with change", func(t *testing.T) { treeId := "treeId" senderId := "senderId"