diff --git a/common/commonspace/diffservice/diffservice.go b/common/commonspace/diffservice/diffservice.go index a6199f04..71fe9bef 100644 --- a/common/commonspace/diffservice/diffservice.go +++ b/common/commonspace/diffservice/diffservice.go @@ -14,8 +14,8 @@ import ( ) type DiffService interface { + HeadNotifiable HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) - UpdateHeads(id string, heads []string) RemoveObject(id string) AllIds() []string diff --git a/common/commonspace/diffservice/headnotifiable.go b/common/commonspace/diffservice/headnotifiable.go new file mode 100644 index 00000000..80819bfe --- /dev/null +++ b/common/commonspace/diffservice/headnotifiable.go @@ -0,0 +1,5 @@ +package diffservice + +type HeadNotifiable interface { + UpdateHeads(id string, heads []string) +} diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 23794ebc..dd9c94d5 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -91,14 +91,15 @@ func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { lastConfiguration := s.configurationService.GetLast() confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool) diffService := diffservice.NewDiffService(id, s.config.SyncPeriod, st, confConnector, s.cache, log) - syncService := syncservice.NewSyncService(id, diffService, lastConfiguration, confConnector) + syncService := syncservice.NewSyncService(id, confConnector) sp := &space{ - id: id, - syncService: syncService, - diffService: diffService, - cache: s.cache, - account: s.account, - storage: st, + id: id, + syncService: syncService, + diffService: diffService, + cache: s.cache, + account: s.account, + configuration: lastConfiguration, + storage: st, } if err := sp.Init(ctx); err != nil { return nil, err diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 73b6f770..b957a515 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -13,6 +13,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/treegetter" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" aclstorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" @@ -69,12 +70,13 @@ type space struct { rpc *rpcHandler - syncService syncservice.SyncService - diffService diffservice.DiffService - storage storage.SpaceStorage - cache treegetter.TreeGetter - account account.Service - aclList *syncacl.SyncACL + syncService syncservice.SyncService + diffService diffservice.DiffService + storage storage.SpaceStorage + cache treegetter.TreeGetter + account account.Service + aclList *syncacl.SyncACL + configuration nodeconf.Configuration isClosed atomic.Bool } @@ -101,7 +103,7 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } - s.aclList = syncacl.NewSyncACL(aclList, s.syncService.SyncClient()) + s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool()) objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache) s.syncService.Init(objectGetter) s.diffService.Init(initialIds) @@ -129,12 +131,15 @@ func (s *space) DeriveTree(ctx context.Context, payload tree.ObjectTreeCreatePay err = ErrSpaceClosed return } - deps := synctree.SyncTreeCreateDeps{ - Payload: payload, - SyncClient: s.syncService.SyncClient(), - Listener: listener, - AclList: s.aclList, - CreateStorage: s.storage.CreateTreeStorage, + deps := synctree.CreateDeps{ + SpaceId: s.id, + Payload: payload, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + CreateStorage: s.storage.CreateTreeStorage, } return synctree.DeriveSyncTree(ctx, deps) } @@ -144,12 +149,15 @@ func (s *space) CreateTree(ctx context.Context, payload tree.ObjectTreeCreatePay err = ErrSpaceClosed return } - deps := synctree.SyncTreeCreateDeps{ - Payload: payload, - SyncClient: s.syncService.SyncClient(), - Listener: listener, - AclList: s.aclList, - CreateStorage: s.storage.CreateTreeStorage, + deps := synctree.CreateDeps{ + SpaceId: s.id, + Payload: payload, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + CreateStorage: s.storage.CreateTreeStorage, } return synctree.CreateSyncTree(ctx, deps) } @@ -165,9 +173,9 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene if err != nil { return nil, err } - return s.syncService.SyncClient().SendSync( + return s.syncService.StreamPool().SendSync( peerId, - s.syncService.SyncClient().CreateNewTreeRequest(id), + synctree.GetRequestFactory().CreateNewTreeRequest(id), ) } @@ -204,11 +212,14 @@ func (s *space) BuildTree(ctx context.Context, id string, listener updatelistene return } } - deps := synctree.SyncTreeBuildDeps{ - SyncClient: s.syncService.SyncClient(), - Listener: listener, - AclList: s.aclList, - Storage: store, + deps := synctree.BuildDeps{ + SpaceId: s.id, + StreamPool: s.syncService.StreamPool(), + Configuration: s.configuration, + HeadNotifiable: s.diffService, + Listener: listener, + AclList: s.aclList, + Storage: store, } return synctree.BuildSyncTree(ctx, isFirstBuild, deps) } diff --git a/common/commonspace/syncacl/syncacl.go b/common/commonspace/syncacl/syncacl.go index 85d65f1a..c7343625 100644 --- a/common/commonspace/syncacl/syncacl.go +++ b/common/commonspace/syncacl/syncacl.go @@ -8,14 +8,14 @@ import ( type SyncACL struct { list.ACLList - syncservice.SyncClient synchandler.SyncHandler + streamPool syncservice.StreamPool } -func NewSyncACL(aclList list.ACLList, syncClient syncservice.SyncClient) *SyncACL { +func NewSyncACL(aclList list.ACLList, streamPool syncservice.StreamPool) *SyncACL { return &SyncACL{ ACLList: aclList, - SyncClient: syncClient, SyncHandler: nil, + streamPool: streamPool, } } diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index d11cf94f..d07cb1b2 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -18,22 +18,18 @@ var log = logger.NewNamed("syncservice").Sugar() type SyncService interface { ocache.ObjectLastUsage synchandler.SyncHandler - SyncClient() SyncClient + StreamPool() StreamPool Init(getter objectgetter.ObjectGetter) Close() (err error) } -type HeadNotifiable interface { - UpdateHeads(id string, heads []string) -} - const respPeersStreamCheckInterval = time.Second * 10 type syncService struct { spaceId string - syncClient SyncClient + streamPool StreamPool clientFactory spacesyncproto.ClientFactory objectGetter objectgetter.ObjectGetter @@ -45,17 +41,13 @@ type syncService struct { func NewSyncService( spaceId string, - headNotifiable HeadNotifiable, - configuration nodeconf.Configuration, confConnector nodeconf.ConfConnector) (syncService SyncService) { streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { return syncService.HandleMessage(ctx, senderId, message) }) - factory := newRequestFactory() - syncClient := newSyncClient(spaceId, streamPool, headNotifiable, factory, configuration) syncService = newSyncService( spaceId, - syncClient, + streamPool, spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient), confConnector) return @@ -63,11 +55,11 @@ func NewSyncService( func newSyncService( spaceId string, - syncClient SyncClient, + streamPool StreamPool, clientFactory spacesyncproto.ClientFactory, connector nodeconf.ConfConnector) *syncService { return &syncService{ - syncClient: syncClient, + streamPool: streamPool, connector: connector, clientFactory: clientFactory, spaceId: spaceId, @@ -84,11 +76,11 @@ func (s *syncService) Init(objectGetter objectgetter.ObjectGetter) { func (s *syncService) Close() (err error) { s.stopStreamLoop() <-s.streamLoopDone - return s.syncClient.Close() + return s.streamPool.Close() } func (s *syncService) LastUsage() time.Time { - return s.syncClient.LastUsage() + return s.streamPool.LastUsage() } func (s *syncService) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { @@ -107,7 +99,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { return } for _, peer := range respPeers { - if s.syncClient.HasActiveStream(peer.Id()) { + if s.streamPool.HasActiveStream(peer.Id()) { continue } stream, err := s.clientFactory.Client(peer).Stream(ctx) @@ -125,7 +117,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err) continue } - s.syncClient.AddAndReadStreamAsync(stream) + s.streamPool.AddAndReadStreamAsync(stream) } } @@ -142,6 +134,6 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { } } -func (s *syncService) SyncClient() SyncClient { - return s.syncClient +func (s *syncService) StreamPool() StreamPool { + return s.streamPool } diff --git a/common/commonspace/syncservice/requestfactory.go b/common/commonspace/synctree/requestfactory.go similarity index 96% rename from common/commonspace/syncservice/requestfactory.go rename to common/commonspace/synctree/requestfactory.go index 08332751..f35133ae 100644 --- a/common/commonspace/syncservice/requestfactory.go +++ b/common/commonspace/synctree/requestfactory.go @@ -1,4 +1,4 @@ -package syncservice +package synctree import ( "fmt" @@ -15,8 +15,10 @@ type RequestFactory interface { CreateFullSyncResponse(t tree.ObjectTree, theirHeads, theirSnapshotPath []string, trackingId string) (*spacesyncproto.ObjectSyncMessage, error) } -func newRequestFactory() RequestFactory { - return &requestFactory{} +var factory = &requestFactory{} + +func GetRequestFactory() RequestFactory { + return factory } type requestFactory struct{} diff --git a/common/commonspace/syncservice/syncclient.go b/common/commonspace/synctree/syncclient.go similarity index 75% rename from common/commonspace/syncservice/syncclient.go rename to common/commonspace/synctree/syncclient.go index d5f0b9f3..842d796b 100644 --- a/common/commonspace/syncservice/syncclient.go +++ b/common/commonspace/synctree/syncclient.go @@ -1,28 +1,35 @@ -package syncservice +package synctree import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" "time" ) type SyncClient interface { - StreamPool + syncservice.StreamPool RequestFactory ocache.ObjectLastUsage BroadcastAsyncOrSendResponsible(message *spacesyncproto.ObjectSyncMessage) (err error) } type syncClient struct { - StreamPool + syncservice.StreamPool RequestFactory spaceId string - notifiable HeadNotifiable + notifiable diffservice.HeadNotifiable configuration nodeconf.Configuration } -func newSyncClient(spaceId string, pool StreamPool, notifiable HeadNotifiable, factory RequestFactory, configuration nodeconf.Configuration) SyncClient { +func newSyncClient( + spaceId string, + pool syncservice.StreamPool, + notifiable diffservice.HeadNotifiable, + factory RequestFactory, + configuration nodeconf.Configuration) SyncClient { return &syncClient{ StreamPool: pool, RequestFactory: factory, diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 74d0b312..fd32a06b 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -4,9 +4,11 @@ import ( "context" "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" @@ -19,7 +21,7 @@ var ErrSyncTreeClosed = errors.New("sync tree is closed") type SyncTree struct { tree.ObjectTree synchandler.SyncHandler - syncClient syncservice.SyncClient + syncClient SyncClient listener updatelistener.UpdateListener isClosed bool } @@ -30,78 +32,102 @@ var createDerivedObjectTree = tree.CreateDerivedObjectTree var createObjectTree = tree.CreateObjectTree var buildObjectTree = tree.BuildObjectTree -type SyncTreeCreateDeps struct { - Payload tree.ObjectTreeCreatePayload - SyncClient syncservice.SyncClient - Listener updatelistener.UpdateListener - AclList list.ACLList - CreateStorage storage.TreeStorageCreatorFunc +type CreateDeps struct { + SpaceId string + Payload tree.ObjectTreeCreatePayload + Configuration nodeconf.Configuration + HeadNotifiable diffservice.HeadNotifiable + StreamPool syncservice.StreamPool + Listener updatelistener.UpdateListener + AclList list.ACLList + CreateStorage storage.TreeStorageCreatorFunc } -type SyncTreeBuildDeps struct { - SyncClient syncservice.SyncClient - Listener updatelistener.UpdateListener - AclList list.ACLList - Storage storage.TreeStorage +type BuildDeps struct { + SpaceId string + StreamPool syncservice.StreamPool + Configuration nodeconf.Configuration + HeadNotifiable diffservice.HeadNotifiable + Listener updatelistener.UpdateListener + AclList list.ACLList + Storage storage.TreeStorage } func DeriveSyncTree( ctx context.Context, - deps SyncTreeCreateDeps) (t tree.ObjectTree, err error) { + deps CreateDeps) (t tree.ObjectTree, err error) { t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return } + syncClient := newSyncClient( + deps.SpaceId, + deps.StreamPool, + deps.HeadNotifiable, + GetRequestFactory(), + deps.Configuration) syncTree := &SyncTree{ ObjectTree: t, - syncClient: deps.SyncClient, + syncClient: syncClient, listener: deps.Listener, } - syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient) + syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler t = syncTree - headUpdate := deps.SyncClient.CreateHeadUpdate(t, nil) - err = deps.SyncClient.BroadcastAsync(headUpdate) + headUpdate := syncClient.CreateHeadUpdate(t, nil) + err = syncClient.BroadcastAsync(headUpdate) return } func CreateSyncTree( ctx context.Context, - deps SyncTreeCreateDeps) (t tree.ObjectTree, err error) { + deps CreateDeps) (t tree.ObjectTree, err error) { t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return } + syncClient := newSyncClient( + deps.SpaceId, + deps.StreamPool, + deps.HeadNotifiable, + GetRequestFactory(), + deps.Configuration) syncTree := &SyncTree{ ObjectTree: t, - syncClient: deps.SyncClient, + syncClient: syncClient, listener: deps.Listener, } - syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient) + syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler t = syncTree - headUpdate := deps.SyncClient.CreateHeadUpdate(t, nil) - err = deps.SyncClient.BroadcastAsync(headUpdate) + headUpdate := syncClient.CreateHeadUpdate(t, nil) + err = syncClient.BroadcastAsync(headUpdate) return } func BuildSyncTree( ctx context.Context, isFirstBuild bool, - deps SyncTreeBuildDeps) (t tree.ObjectTree, err error) { + deps BuildDeps) (t tree.ObjectTree, err error) { t, err = buildObjectTree(deps.Storage, deps.AclList) if err != nil { return } + syncClient := newSyncClient( + deps.SpaceId, + deps.StreamPool, + deps.HeadNotifiable, + GetRequestFactory(), + deps.Configuration) syncTree := &SyncTree{ ObjectTree: t, - syncClient: deps.SyncClient, + syncClient: syncClient, listener: deps.Listener, } - syncHandler := newSyncTreeHandler(syncTree, deps.SyncClient) + syncHandler := newSyncTreeHandler(syncTree, syncClient) syncTree.SyncHandler = syncHandler t = syncTree diff --git a/common/commonspace/synctree/synctree_test.go b/common/commonspace/synctree/synctree_test.go index 4f9d2ac4..8d62f9b0 100644 --- a/common/commonspace/synctree/synctree_test.go +++ b/common/commonspace/synctree/synctree_test.go @@ -3,7 +3,6 @@ package synctree import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/mock_syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree/updatelistener/mock_updatelistener" @@ -21,7 +20,7 @@ import ( type syncTreeMatcher struct { objTree tree2.ObjectTree - client syncservice.SyncClient + client SyncClient listener updatelistener.UpdateListener } diff --git a/common/commonspace/synctree/synctreehandler.go b/common/commonspace/synctree/synctreehandler.go index fb71f280..441e71de 100644 --- a/common/commonspace/synctree/synctreehandler.go +++ b/common/commonspace/synctree/synctreehandler.go @@ -3,7 +3,6 @@ package synctree import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" @@ -11,10 +10,10 @@ import ( type syncTreeHandler struct { objTree tree.ObjectTree - syncClient syncservice.SyncClient + syncClient SyncClient } -func newSyncTreeHandler(objTree tree.ObjectTree, syncClient syncservice.SyncClient) synchandler.SyncHandler { +func newSyncTreeHandler(objTree tree.ObjectTree, syncClient SyncClient) synchandler.SyncHandler { return &syncTreeHandler{ objTree: objTree, syncClient: syncClient,