diff --git a/app/app.go b/app/app.go index b8be83a0..89e7ab46 100644 --- a/app/app.go +++ b/app/app.go @@ -125,14 +125,10 @@ func (app *App) ChildApp() *App { func (app *App) Register(s Component) *App { app.mu.Lock() defer app.mu.Unlock() - current := app - for current != nil { - for _, es := range current.components { - if s.Name() == es.Name() { - panic(fmt.Errorf("component '%s' already registered", s.Name())) - } + for _, es := range app.components { + if s.Name() == es.Name() { + panic(fmt.Errorf("component '%s' already registered", s.Name())) } - current = current.parent } app.components = append(app.components, s) return app diff --git a/app/app_test.go b/app/app_test.go index 7b5678d6..0c122b24 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -48,6 +48,11 @@ func TestAppServiceRegistry(t *testing.T) { names := app.ComponentNames() assert.Equal(t, names, []string{"x1", "c1", "r1", "s1"}) }) + t.Run("Child override", func(t *testing.T) { + app := app.ChildApp() + app.Register(newTestService(testTypeRunnable, "s1", nil, nil)) + _ = app.MustComponent("s1").(*testRunnable) + }) } func TestAppStart(t *testing.T) { diff --git a/commonspace/deletionstate/deletionstate.go b/commonspace/deletionstate/deletionstate.go index 109a5ea2..692ba627 100644 --- a/commonspace/deletionstate/deletionstate.go +++ b/commonspace/deletionstate/deletionstate.go @@ -35,7 +35,7 @@ type objectDeletionState struct { } func (st *objectDeletionState) Init(a *app.App) (err error) { - st.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + st.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) return nil } diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index cc5faefd..3f5bcd25 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -73,12 +73,12 @@ func (h *headSync) Init(a *app.App) (err error) { h.syncPeriod = cfg.GetSpace().SyncPeriod h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) h.log = log.With(zap.String("spaceId", h.spaceId)) - h.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) h.diff = ldiff.New(16, 16) - h.peerManager = a.MustComponent(peermanager.ManagerName).(peermanager.PeerManager) + h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) - h.treeManager = app.MustComponent[treemanager.TreeManager](a) + h.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) h.deletionState = a.MustComponent(deletionstate.CName).(deletionstate.ObjectDeletionState) h.syncer = newDiffSyncer(h) sync := func(ctx context.Context) (err error) { diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index f3bc57a5..426b16cd 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -19,12 +19,16 @@ type SyncAcl struct { list.AclList } +func (s *SyncAcl) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) { + return nil, nil +} + func (s *SyncAcl) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { return nil } func (s *SyncAcl) Init(a *app.App) (err error) { - storage := a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + storage := a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) aclStorage, err := storage.AclStorage() if err != nil { return err diff --git a/commonspace/objectsync/syncclient/requestfactory.go b/commonspace/object/tree/synctree/requestfactory.go similarity index 99% rename from commonspace/objectsync/syncclient/requestfactory.go rename to commonspace/object/tree/synctree/requestfactory.go index 0d908179..8d91add8 100644 --- a/commonspace/objectsync/syncclient/requestfactory.go +++ b/commonspace/object/tree/synctree/requestfactory.go @@ -1,4 +1,4 @@ -package syncclient +package synctree import ( "fmt" diff --git a/commonspace/objectsync/syncclient/syncclient.go b/commonspace/object/tree/synctree/syncclient.go similarity index 64% rename from commonspace/objectsync/syncclient/syncclient.go rename to commonspace/object/tree/synctree/syncclient.go index 69cf4224..13909b3b 100644 --- a/commonspace/objectsync/syncclient/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -1,23 +1,15 @@ -package syncclient +package synctree import ( "context" - "github.com/anyproto/any-sync/app" - "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anyproto/any-sync/commonspace/requestsender" - "github.com/anyproto/any-sync/commonspace/spacestate" + "github.com/anyproto/any-sync/commonspace/peermanager" + "github.com/anyproto/any-sync/commonspace/requestmanager" "github.com/anyproto/any-sync/commonspace/spacesyncproto" - "github.com/anyproto/any-sync/commonspace/streamsender" "go.uber.org/zap" ) -const CName = "common.objectsync.syncclient" - -var log = logger.NewNamed(CName) - type SyncClient interface { - app.Component RequestFactory Broadcast(msg *treechangeproto.TreeSyncMessage) SendUpdate(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) @@ -27,35 +19,25 @@ type SyncClient interface { type syncClient struct { RequestFactory - spaceId string - requestSender requestsender.RequestSender - streamSender streamsender.StreamSender + spaceId string + requestManager requestmanager.RequestManager + peerManager peermanager.PeerManager } -func New() SyncClient { +func NewSyncClient(spaceId string, requestManager requestmanager.RequestManager, peerManager peermanager.PeerManager) SyncClient { return &syncClient{ RequestFactory: &requestFactory{}, + spaceId: spaceId, + requestManager: requestManager, + peerManager: peerManager, } } - -func (s *syncClient) Init(a *app.App) (err error) { - sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) - s.spaceId = sharedState.SpaceId - s.requestSender = a.MustComponent(requestsender.CName).(requestsender.RequestSender) - s.streamSender = a.MustComponent(streamsender.CName).(streamsender.StreamSender) - return nil -} - -func (s *syncClient) Name() (name string) { - return CName -} - func (s *syncClient) Broadcast(msg *treechangeproto.TreeSyncMessage) { objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") if err != nil { return } - err = s.streamSender.Broadcast(objMsg) + err = s.peerManager.Broadcast(context.Background(), objMsg) if err != nil { log.Debug("broadcast error", zap.Error(err)) } @@ -66,7 +48,7 @@ func (s *syncClient) SendUpdate(peerId, objectId string, msg *treechangeproto.Tr if err != nil { return } - return s.streamSender.SendPeer(peerId, objMsg) + return s.peerManager.SendPeer(context.Background(), peerId, objMsg) } func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { @@ -74,7 +56,7 @@ func (s *syncClient) SendRequest(ctx context.Context, peerId, objectId string, m if err != nil { return } - return s.requestSender.SendRequest(ctx, peerId, objMsg) + return s.requestManager.SendRequest(ctx, peerId, objMsg) } func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (err error) { @@ -82,7 +64,7 @@ func (s *syncClient) QueueRequest(peerId, objectId string, msg *treechangeproto. if err != nil { return } - return s.requestSender.QueueRequest(peerId, objMsg) + return s.requestManager.QueueRequest(peerId, objMsg) } func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { diff --git a/commonspace/object/tree/synctree/syncprotocol.go b/commonspace/object/tree/synctree/syncprotocol.go new file mode 100644 index 00000000..be759259 --- /dev/null +++ b/commonspace/object/tree/synctree/syncprotocol.go @@ -0,0 +1,152 @@ +package synctree + +import ( + "context" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/util/slice" + "go.uber.org/zap" +) + +type TreeSyncProtocol interface { + HeadUpdate(ctx context.Context, senderId string, update *treechangeproto.TreeHeadUpdate) (request *treechangeproto.TreeSyncMessage, err error) + FullSyncRequest(ctx context.Context, senderId string, request *treechangeproto.TreeFullSyncRequest) (response *treechangeproto.TreeSyncMessage, err error) + FullSyncResponse(ctx context.Context, senderId string, response *treechangeproto.TreeFullSyncResponse) (err error) +} + +type treeSyncProtocol struct { + log logger.CtxLogger + spaceId string + objTree objecttree.ObjectTree + reqFactory RequestFactory +} + +func newTreeSyncProtocol(spaceId string, objTree objecttree.ObjectTree, reqFactory RequestFactory) *treeSyncProtocol { + return &treeSyncProtocol{ + log: log.With(zap.String("spaceId", spaceId), zap.String("treeId", objTree.Id())), + spaceId: spaceId, + objTree: objTree, + reqFactory: reqFactory, + } +} + +func (t *treeSyncProtocol) HeadUpdate(ctx context.Context, senderId string, update *treechangeproto.TreeHeadUpdate) (fullRequest *treechangeproto.TreeSyncMessage, err error) { + var ( + isEmptyUpdate = len(update.Changes) == 0 + objTree = t.objTree + ) + log := t.log.With( + zap.String("senderId", senderId), + zap.Strings("update heads", update.Heads), + zap.Int("len(update changes)", len(update.Changes))) + log.DebugCtx(ctx, "received head update message") + + defer func() { + if err != nil { + log.ErrorCtx(ctx, "head update finished with error", zap.Error(err)) + } else if fullRequest != nil { + cnt := fullRequest.Content.GetFullSyncRequest() + log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes))) + log.DebugCtx(ctx, "returning full sync request") + } else { + if !isEmptyUpdate { + log.DebugCtx(ctx, "head update finished correctly") + } + } + }() + + // isEmptyUpdate is sent when the tree is brought up from cache + if isEmptyUpdate { + headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads) + log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals)) + if headEquals { + return + } + + // we need to sync in any case + fullRequest, err = t.reqFactory.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) + return + } + + if t.alreadyHasHeads(objTree, update.Heads) { + return + } + + _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ + NewHeads: update.Heads, + RawChanges: update.Changes, + }) + if err != nil { + return + } + + if t.alreadyHasHeads(objTree, update.Heads) { + return + } + + fullRequest, err = t.reqFactory.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) + return +} + +func (t *treeSyncProtocol) FullSyncRequest(ctx context.Context, senderId string, request *treechangeproto.TreeFullSyncRequest) (fullResponse *treechangeproto.TreeSyncMessage, err error) { + var ( + objTree = t.objTree + ) + log := t.log.With(zap.String("senderId", senderId), + zap.Strings("request heads", request.Heads), + zap.Int("len(request changes)", len(request.Changes))) + log.DebugCtx(ctx, "received full sync request message") + + defer func() { + if err != nil { + log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err)) + } else if fullResponse != nil { + cnt := fullResponse.Content.GetFullSyncResponse() + log = log.With(zap.Strings("response heads", cnt.Heads), zap.Int("len(response changes)", len(cnt.Changes))) + log.DebugCtx(ctx, "full sync response sent") + } + }() + + if len(request.Changes) != 0 && !t.alreadyHasHeads(objTree, request.Heads) { + _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ + NewHeads: request.Heads, + RawChanges: request.Changes, + }) + if err != nil { + return + } + } + fullResponse, err = t.reqFactory.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) + return +} + +func (t *treeSyncProtocol) FullSyncResponse(ctx context.Context, senderId string, response *treechangeproto.TreeFullSyncResponse) (err error) { + var ( + objTree = t.objTree + ) + log := log.With( + zap.Strings("heads", response.Heads), + zap.Int("len(changes)", len(response.Changes))) + log.DebugCtx(ctx, "received full sync response message") + defer func() { + if err != nil { + log.ErrorCtx(ctx, "full sync response failed", zap.Error(err)) + } else { + log.DebugCtx(ctx, "full sync response succeeded") + } + }() + if t.alreadyHasHeads(objTree, response.Heads) { + return + } + + _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ + NewHeads: response.Heads, + RawChanges: response.Changes, + }) + return +} + +func (t *treeSyncProtocol) alreadyHasHeads(ot objecttree.ObjectTree, heads []string) bool { + return slice.UnsortedEquals(ot.Heads(), heads) || ot.HasChanges(heads...) +} diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 1a6d2497..4c694880 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -4,7 +4,6 @@ package synctree import ( "context" "errors" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "time" "github.com/anyproto/any-sync/app/logger" @@ -44,7 +43,7 @@ type SyncTree interface { type syncTree struct { objecttree.ObjectTree synchandler.SyncHandler - syncClient syncclient.SyncClient + syncClient SyncClient syncStatus syncstatus.StatusUpdater notifiable HeadNotifiable listener updatelistener.UpdateListener @@ -61,7 +60,7 @@ type ResponsiblePeersGetter interface { type BuildDeps struct { SpaceId string - SyncClient syncclient.SyncClient + SyncClient SyncClient Configuration nodeconf.NodeConf HeadNotifiable HeadNotifiable Listener updatelistener.UpdateListener diff --git a/commonspace/object/tree/synctree/synctree_test.go b/commonspace/object/tree/synctree/synctree_test.go index 5fe9b2e6..0c12bd34 100644 --- a/commonspace/object/tree/synctree/synctree_test.go +++ b/commonspace/object/tree/synctree/synctree_test.go @@ -9,7 +9,6 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/nodeconf" "github.com/golang/mock/gomock" @@ -19,7 +18,7 @@ import ( type syncTreeMatcher struct { objTree objecttree.ObjectTree - client syncclient.SyncClient + client SyncClient listener updatelistener.UpdateListener } @@ -35,8 +34,8 @@ func (s syncTreeMatcher) String() string { return "" } -func syncClientFuncCreator(client syncclient.SyncClient) func(spaceId string, factory syncclient.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) syncclient.SyncClient { - return func(spaceId string, factory syncclient.RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) syncclient.SyncClient { +func syncClientFuncCreator(client SyncClient) func(spaceId string, factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) SyncClient { + return func(spaceId string, factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.NodeConf) SyncClient { return client } } diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index a8f98f62..5953ca97 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -2,40 +2,64 @@ package synctree import ( "context" + "errors" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" - "github.com/anyproto/any-sync/util/slice" "github.com/gogo/protobuf/proto" - "go.uber.org/zap" "sync" ) +var ( + ErrMessageIsRequest = errors.New("message is request") + ErrMessageIsNotRequest = errors.New("message is not request") +) + type syncTreeHandler struct { - objTree objecttree.ObjectTree - syncClient syncclient.SyncClient - syncStatus syncstatus.StatusUpdater - handlerLock sync.Mutex - spaceId string - queue ReceiveQueue + objTree objecttree.ObjectTree + syncClient SyncClient + syncProtocol TreeSyncProtocol + syncStatus syncstatus.StatusUpdater + handlerLock sync.Mutex + spaceId string + queue ReceiveQueue } const maxQueueSize = 5 -// TODO: Make sync and async message handling -func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient syncclient.SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { +func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { return &syncTreeHandler{ - objTree: objTree, - syncClient: syncClient, - syncStatus: syncStatus, - spaceId: spaceId, - queue: newReceiveQueue(maxQueueSize), + objTree: objTree, + syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient), + syncClient: syncClient, + syncStatus: syncStatus, + spaceId: spaceId, + queue: newReceiveQueue(maxQueueSize), } } +func (s *syncTreeHandler) HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) { + unmarshalled := &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(request.Payload, unmarshalled) + if err != nil { + return + } + fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest() + if fullSyncRequest == nil { + err = ErrMessageIsNotRequest + return + } + s.syncStatus.HeadsReceive(senderId, request.ObjectId, treechangeproto.GetHeads(unmarshalled)) + treeResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest) + if err != nil { + return + } + response, err = MarshallTreeMessage(treeResp, s.spaceId, request.ObjectId, "") + return +} + func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, unmarshalled) @@ -55,181 +79,27 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) { s.objTree.Lock() defer s.objTree.Unlock() - msg, replyId, err := s.queue.GetMessage(senderId) + msg, _, err := s.queue.GetMessage(senderId) if err != nil { return } defer s.queue.ClearQueue(senderId) + treeId := s.objTree.Id() content := msg.GetContent() switch { case content.GetHeadUpdate() != nil: - return s.handleHeadUpdate(ctx, senderId, content.GetHeadUpdate(), replyId) + var syncReq *treechangeproto.TreeSyncMessage + syncReq, err = s.syncProtocol.HeadUpdate(ctx, senderId, content.GetHeadUpdate()) + if err != nil { + return + } + return s.syncClient.QueueRequest(senderId, treeId, syncReq) case content.GetFullSyncRequest() != nil: - return s.handleFullSyncRequest(ctx, senderId, content.GetFullSyncRequest(), replyId) + return ErrMessageIsRequest case content.GetFullSyncResponse() != nil: - return s.handleFullSyncResponse(ctx, senderId, content.GetFullSyncResponse()) + return s.syncProtocol.FullSyncResponse(ctx, senderId, content.GetFullSyncResponse()) } return } - -func (s *syncTreeHandler) handleHeadUpdate( - ctx context.Context, - senderId string, - update *treechangeproto.TreeHeadUpdate, - replyId string) (err error) { - var ( - fullRequest *treechangeproto.TreeSyncMessage - isEmptyUpdate = len(update.Changes) == 0 - objTree = s.objTree - treeId = objTree.Id() - ) - log := log.With( - zap.Strings("update heads", update.Heads), - zap.String("treeId", treeId), - zap.String("spaceId", s.spaceId), - zap.Int("len(update changes)", len(update.Changes))) - log.DebugCtx(ctx, "received head update message") - - defer func() { - if err != nil { - log.ErrorCtx(ctx, "head update finished with error", zap.Error(err)) - } else if fullRequest != nil { - cnt := fullRequest.Content.GetFullSyncRequest() - log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes))) - log.DebugCtx(ctx, "sending full sync request") - } else { - if !isEmptyUpdate { - log.DebugCtx(ctx, "head update finished correctly") - } - } - }() - - // isEmptyUpdate is sent when the tree is brought up from cache - if isEmptyUpdate { - headEquals := slice.UnsortedEquals(objTree.Heads(), update.Heads) - log.DebugCtx(ctx, "is empty update", zap.String("treeId", objTree.Id()), zap.Bool("headEquals", headEquals)) - if headEquals { - return - } - - // we need to sync in any case - fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) - if err != nil { - return - } - - return s.syncClient.QueueRequest(senderId, treeId, fullRequest) - } - - if s.alreadyHasHeads(objTree, update.Heads) { - return - } - - _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ - NewHeads: update.Heads, - RawChanges: update.Changes, - }) - if err != nil { - return - } - - if s.alreadyHasHeads(objTree, update.Heads) { - return - } - - fullRequest, err = s.syncClient.CreateFullSyncRequest(objTree, update.Heads, update.SnapshotPath) - if err != nil { - return - } - - return s.syncClient.QueueRequest(senderId, treeId, fullRequest) -} - -func (s *syncTreeHandler) handleFullSyncRequest( - ctx context.Context, - senderId string, - request *treechangeproto.TreeFullSyncRequest, - replyId string) (err error) { - var ( - fullResponse *treechangeproto.TreeSyncMessage - header = s.objTree.Header() - objTree = s.objTree - treeId = s.objTree.Id() - ) - - log := log.With(zap.String("senderId", senderId), - zap.Strings("request heads", request.Heads), - zap.String("treeId", treeId), - zap.String("replyId", replyId), - zap.String("spaceId", s.spaceId), - zap.Int("len(request changes)", len(request.Changes))) - log.DebugCtx(ctx, "received full sync request message") - - defer func() { - if err != nil { - log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err)) - s.syncClient.QueueRequest(senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header)) - return - } else if fullResponse != nil { - cnt := fullResponse.Content.GetFullSyncResponse() - log = log.With(zap.Strings("response heads", cnt.Heads), zap.Int("len(response changes)", len(cnt.Changes))) - log.DebugCtx(ctx, "full sync response sent") - } - }() - - if len(request.Changes) != 0 && !s.alreadyHasHeads(objTree, request.Heads) { - _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ - NewHeads: request.Heads, - RawChanges: request.Changes, - }) - if err != nil { - return - } - } - fullResponse, err = s.syncClient.CreateFullSyncResponse(objTree, request.Heads, request.SnapshotPath) - if err != nil { - return - } - - return s.syncClient.QueueRequest(senderId, treeId, fullResponse) -} - -func (s *syncTreeHandler) handleFullSyncResponse( - ctx context.Context, - senderId string, - response *treechangeproto.TreeFullSyncResponse) (err error) { - var ( - objTree = s.objTree - treeId = s.objTree.Id() - ) - log := log.With( - zap.Strings("heads", response.Heads), - zap.String("treeId", treeId), - zap.String("spaceId", s.spaceId), - zap.Int("len(changes)", len(response.Changes))) - log.DebugCtx(ctx, "received full sync response message") - - defer func() { - if err != nil { - log.ErrorCtx(ctx, "full sync response failed", zap.Error(err)) - } else { - log.DebugCtx(ctx, "full sync response succeeded") - } - }() - - if s.alreadyHasHeads(objTree, response.Heads) { - return - } - - _, err = objTree.AddRawChanges(ctx, objecttree.RawChangesPayload{ - NewHeads: response.Heads, - RawChanges: response.Changes, - }) - return -} - -func (s *syncTreeHandler) alreadyHasHeads(t objecttree.ObjectTree, heads []string) bool { - return slice.UnsortedEquals(t.Heads(), heads) || t.HasChanges(heads...) -} diff --git a/commonspace/object/tree/synctree/synctreehandler_test.go b/commonspace/object/tree/synctree/synctreehandler_test.go index dce08a93..4a65aad5 100644 --- a/commonspace/object/tree/synctree/synctreehandler_test.go +++ b/commonspace/object/tree/synctree/synctreehandler_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "sync" "testing" @@ -110,7 +109,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).Times(2) @@ -139,7 +138,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes() @@ -172,7 +171,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes() @@ -193,7 +192,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h2"}).AnyTimes() @@ -218,7 +217,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Heads().Return([]string{"h1"}).AnyTimes() @@ -251,7 +250,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT().Header().Return(nil) @@ -284,7 +283,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT(). Id().AnyTimes().Return(treeId) @@ -313,7 +312,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg.RequestId = replyId fx.objectTreeMock.EXPECT(). @@ -340,7 +339,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullRequest(fullSyncRequest, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") fx.objectTreeMock.EXPECT(). Id().AnyTimes().Return(treeId) @@ -381,7 +380,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT(). @@ -414,7 +413,7 @@ func TestSyncHandler_HandleFullSyncResponse(t *testing.T) { SnapshotPath: []string{"h1"}, } treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) - objectMsg, _ := syncclient.MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, replyId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(treeId) fx.objectTreeMock.EXPECT(). diff --git a/commonspace/object/tree/synctree/utils_test.go b/commonspace/object/tree/synctree/utils_test.go index a6d553a3..6c295f9c 100644 --- a/commonspace/object/tree/synctree/utils_test.go +++ b/commonspace/object/tree/synctree/utils_test.go @@ -7,7 +7,6 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -89,14 +88,14 @@ type testSyncHandler struct { peerId string aclList list.AclList log *messageLog - syncClient syncclient.SyncClient + syncClient SyncClient builder objecttree.BuildObjectTreeFunc } // createSyncHandler creates a sync handler when a tree is already created func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler { - factory := syncclient.NewRequestFactory() - syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory) + factory := NewRequestFactory() + syncClient := NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) netTree := &broadcastTree{ ObjectTree: objTree, SyncClient: syncClient, @@ -107,8 +106,8 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo // createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) func createEmptySyncHandler(peerId, spaceId string, builder objecttree.BuildObjectTreeFunc, aclList list.AclList, log *messageLog) *testSyncHandler { - factory := syncclient.NewRequestFactory() - syncClient := syncclient.New(spaceId, newTestMessagePool(peerId, log), factory) + factory := NewRequestFactory() + syncClient := NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) batcher := mb.New[protocolMsg](0) return &testSyncHandler{ @@ -140,9 +139,9 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re return } if unmarshalled.Content.GetFullSyncResponse() == nil { - newTreeRequest := syncclient.NewRequestFactory().CreateNewTreeRequest() + newTreeRequest := NewRequestFactory().CreateNewTreeRequest() var objMsg *spacesyncproto.ObjectSyncMessage - objMsg, err = syncclient.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + objMsg, err = MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") if err != nil { return } @@ -167,8 +166,8 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re } h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus()) var objMsg *spacesyncproto.ObjectSyncMessage - newTreeRequest := syncclient.NewRequestFactory().CreateHeadUpdate(netTree, res.Added) - objMsg, err = syncclient.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + newTreeRequest := NewRequestFactory().CreateHeadUpdate(netTree, res.Added) + objMsg, err = MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") if err != nil { return } @@ -278,7 +277,7 @@ func (m *testMessagePool) SendSync(ctx context.Context, peerId string, message * // it is a simplified version of SyncTree which is easier to use in the test environment type broadcastTree struct { objecttree.ObjectTree - syncclient.SyncClient + SyncClient } func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) { diff --git a/commonspace/objectmanager/objectmanager.go b/commonspace/objectmanager/objectmanager.go index 2ce6d3dc..49637818 100644 --- a/commonspace/objectmanager/objectmanager.go +++ b/commonspace/objectmanager/objectmanager.go @@ -17,8 +17,6 @@ var ( ErrSpaceClosed = errors.New("space is closed") ) -const CName = "common.commonspace.objectmanager" - type ObjectManager interface { treemanager.TreeManager AddObject(object syncobjectgetter.SyncObject) @@ -62,7 +60,7 @@ func (o *objectManager) AddObject(object syncobjectgetter.SyncObject) { } func (o *objectManager) Name() string { - return CName + return treemanager.CName } func (o *objectManager) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) { diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index dae786df..44e5308c 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "github.com/anyproto/any-sync/app" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/peer" @@ -30,6 +29,7 @@ var log = logger.NewNamed(CName) type ObjectSync interface { LastUsage() time.Time HandleMessage(ctx context.Context, hm HandleMessage) (err error) + HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) CloseThread(id string) (err error) app.ComponentRunnable } @@ -56,7 +56,6 @@ func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field { type objectSync struct { spaceId string - syncClient syncclient.SyncClient objectGetter syncobjectgetter.SyncObjectGetter configuration nodeconf.NodeConf spaceStorage spacestorage.SpaceStorage @@ -67,8 +66,7 @@ type objectSync struct { } func (s *objectSync) Init(a *app.App) (err error) { - s.syncClient = a.MustComponent(syncclient.CName).(syncclient.SyncClient) - s.spaceStorage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + s.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) s.objectGetter = app.MustComponent[syncobjectgetter.SyncObjectGetter](a) s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) sharedData := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) @@ -103,6 +101,10 @@ func (s *objectSync) LastUsage() time.Time { return time.Time{} } +func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) { + +} + func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { threadId := hm.Message.ObjectId hm.ReceiveTime = time.Now() diff --git a/commonspace/objectsync/synchandler/synchhandler.go b/commonspace/objectsync/synchandler/synchhandler.go index 35aebb1e..090118cd 100644 --- a/commonspace/objectsync/synchandler/synchhandler.go +++ b/commonspace/objectsync/synchandler/synchhandler.go @@ -6,5 +6,6 @@ import ( ) type SyncHandler interface { - HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) + HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) + HandleRequest(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) } diff --git a/commonspace/objecttreebuilder/treebuilder.go b/commonspace/objecttreebuilder/treebuilder.go index f77c6ff5..15f9bdbf 100644 --- a/commonspace/objecttreebuilder/treebuilder.go +++ b/commonspace/objecttreebuilder/treebuilder.go @@ -14,8 +14,8 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/objectsync" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/peermanager" + "github.com/anyproto/any-sync/commonspace/requestmanager" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/syncstatus" @@ -58,10 +58,11 @@ func New() TreeBuilderComponent { } type treeBuilder struct { - syncClient syncclient.SyncClient + syncClient synctree.SyncClient configuration nodeconf.NodeConf headsNotifiable synctree.HeadNotifiable peerManager peermanager.PeerManager + requestManager requestmanager.RequestManager spaceStorage spacestorage.SpaceStorage syncStatus syncstatus.StatusUpdater objectSync objectsync.ObjectSync @@ -81,14 +82,15 @@ func (t *treeBuilder) Init(a *app.App) (err error) { t.treesUsed = state.TreesUsed t.builder = state.TreeBuilderFunc t.aclList = a.MustComponent(syncacl.CName).(*syncacl.SyncAcl) - t.spaceStorage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + t.spaceStorage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) t.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) t.headsNotifiable = a.MustComponent(headsync.CName).(headsync.HeadSync) t.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusUpdater) - t.peerManager = a.MustComponent(peermanager.ManagerName).(peermanager.PeerManager) + t.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) + t.requestManager = a.MustComponent(requestmanager.CName).(requestmanager.RequestManager) t.objectSync = a.MustComponent(objectsync.CName).(objectsync.ObjectSync) - t.syncClient = a.MustComponent(syncclient.CName).(syncclient.SyncClient) t.log = log.With(zap.String("spaceId", t.spaceId)) + t.syncClient = synctree.NewSyncClient(t.spaceId, t.requestManager, t.peerManager) return nil } diff --git a/commonspace/peermanager/peermanager.go b/commonspace/peermanager/peermanager.go index c69e59b3..0a5750bf 100644 --- a/commonspace/peermanager/peermanager.go +++ b/commonspace/peermanager/peermanager.go @@ -9,8 +9,7 @@ import ( ) const ( - ProviderName = "common.commonspace.peermanagerprovider" - ManagerName = "common.commonspace.peermanager" + CName = "common.commonspace.peermanager" ) type PeerManager interface { diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go new file mode 100644 index 00000000..e77b4bdb --- /dev/null +++ b/commonspace/requestmanager/requestmanager.go @@ -0,0 +1,49 @@ +package requestmanager + +import ( + "context" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" +) + +const CName = "common.commonspace.requestmanager" + +var log = logger.NewNamed(CName) + +type RequestManager interface { + app.ComponentRunnable + SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) + QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) +} + +func New() RequestManager { + return &requestManager{} +} + +type requestManager struct { +} + +func (r *requestManager) Init(a *app.App) (err error) { + return +} + +func (r *requestManager) Name() (name string) { + return CName +} + +func (r *requestManager) Run(ctx context.Context) (err error) { + return nil +} + +func (r *requestManager) Close(ctx context.Context) (err error) { + return nil +} + +func (r *requestManager) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + return nil, nil +} + +func (r *requestManager) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + return nil +} diff --git a/commonspace/requestsender/requestsender.go b/commonspace/requestsender/requestsender.go deleted file mode 100644 index d7143b4a..00000000 --- a/commonspace/requestsender/requestsender.go +++ /dev/null @@ -1,49 +0,0 @@ -package requestsender - -import ( - "context" - "github.com/anyproto/any-sync/app" - "github.com/anyproto/any-sync/app/logger" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" -) - -const CName = "common.commonspace.requestsender" - -var log = logger.NewNamed(CName) - -type RequestSender interface { - app.ComponentRunnable - SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) - QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) -} - -func New() RequestSender { - return &requestSender{} -} - -type requestSender struct { -} - -func (r *requestSender) Init(a *app.App) (err error) { - return -} - -func (r *requestSender) Name() (name string) { - return CName -} - -func (r *requestSender) Run(ctx context.Context) (err error) { - return nil -} - -func (r *requestSender) Close(ctx context.Context) (err error) { - return nil -} - -func (r *requestSender) SendRequest(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - return nil, nil -} - -func (r *requestSender) QueueRequest(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - return nil -} diff --git a/commonspace/settings/settings.go b/commonspace/settings/settings.go index dbc93ccc..eab41709 100644 --- a/commonspace/settings/settings.go +++ b/commonspace/settings/settings.go @@ -55,7 +55,7 @@ func (s *settings) Init(a *app.App) (err error) { s.treeBuilder = a.MustComponent(objecttreebuilder.CName).(objecttreebuilder.TreeBuilderComponent) sharedState := a.MustComponent(spacestate.CName).(*spacestate.SpaceState) - s.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) s.spaceIsDeleted = sharedState.SpaceIsDeleted deps := Deps{ diff --git a/commonspace/space.go b/commonspace/space.go index 793e7333..02e6d8a9 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -121,7 +121,7 @@ func (s *space) Init(ctx context.Context) (err error) { s.syncStatus = s.app.MustComponent(syncstatus.CName).(syncstatus.StatusProvider) s.settings = s.app.MustComponent(settings.CName).(settings.Settings) s.objectSync = s.app.MustComponent(objectsync.CName).(objectsync.ObjectSync) - s.storage = s.app.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + s.storage = s.app.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) return nil } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 49b38963..ad536e80 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -16,15 +16,13 @@ import ( "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/objectmanager" "github.com/anyproto/any-sync/commonspace/objectsync" - "github.com/anyproto/any-sync/commonspace/objectsync/syncclient" "github.com/anyproto/any-sync/commonspace/objecttreebuilder" "github.com/anyproto/any-sync/commonspace/peermanager" - "github.com/anyproto/any-sync/commonspace/requestsender" + "github.com/anyproto/any-sync/commonspace/requestmanager" "github.com/anyproto/any-sync/commonspace/settings" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" - "github.com/anyproto/any-sync/commonspace/streamsender" "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/pool" @@ -69,10 +67,10 @@ type spaceService struct { func (s *spaceService) Init(a *app.App) (err error) { s.config = a.MustComponent("config").(config.ConfigGetter).GetSpace() s.account = a.MustComponent(accountservice.CName).(accountservice.Service) - s.storageProvider = a.MustComponent(spacestorage.ProviderName).(spacestorage.SpaceStorageProvider) + s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.treeManager = a.MustComponent(treemanager.CName).(treemanager.TreeManager) - s.peermanagerProvider = a.MustComponent(peermanager.ProviderName).(peermanager.PeerManagerProvider) + s.peermanagerProvider = a.MustComponent(peermanager.CName).(peermanager.PeerManagerProvider) s.pool = a.MustComponent(pool.CName).(pool.Pool) s.metric, _ = a.Component(metric.CName).(metric.Metric) s.app = a @@ -172,12 +170,10 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { Register(peerManager). Register(newCommonStorage(st)). Register(syncacl.New()). - Register(streamsender.New()). - Register(requestsender.New()). + Register(requestmanager.New()). Register(deletionstate.New()). Register(settings.New()). Register(objectmanager.New(s.treeManager)). - Register(syncclient.New()). Register(objecttreebuilder.New()). Register(objectsync.New()). Register(headsync.New()) diff --git a/commonspace/spacestorage/inmemoryprovider.go b/commonspace/spacestorage/inmemoryprovider.go index 1cceb53c..3ddfe02a 100644 --- a/commonspace/spacestorage/inmemoryprovider.go +++ b/commonspace/spacestorage/inmemoryprovider.go @@ -22,7 +22,7 @@ func (i *InMemorySpaceStorageProvider) Init(a *app.App) (err error) { } func (i *InMemorySpaceStorageProvider) Name() (name string) { - return ProviderName + return CName } func (i *InMemorySpaceStorageProvider) WaitSpaceStorage(ctx context.Context, id string) (SpaceStorage, error) { diff --git a/commonspace/spacestorage/inmemorystorage.go b/commonspace/spacestorage/inmemorystorage.go index bc2846fb..db1d1166 100644 --- a/commonspace/spacestorage/inmemorystorage.go +++ b/commonspace/spacestorage/inmemorystorage.go @@ -27,7 +27,7 @@ func (i *InMemorySpaceStorage) Init(a *app.App) (err error) { } func (i *InMemorySpaceStorage) Name() (name string) { - return StorageName + return CName } func NewInMemorySpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error) { diff --git a/commonspace/spacestorage/spacestorage.go b/commonspace/spacestorage/spacestorage.go index b6b73fdc..a4ffaecb 100644 --- a/commonspace/spacestorage/spacestorage.go +++ b/commonspace/spacestorage/spacestorage.go @@ -12,10 +12,7 @@ import ( "github.com/anyproto/any-sync/commonspace/spacesyncproto" ) -const ( - ProviderName = "common.commonspace.spacestorageprovider" - StorageName = "common.commonspace.spacestorage" -) +const CName = "common.commonspace.spacestorage" var ( ErrSpaceStorageExists = errors.New("space storage exists") diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index f26f4881..261a48a3 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -136,7 +136,7 @@ func (p *mockPeerManager) Init(a *app.App) (err error) { } func (p *mockPeerManager) Name() (name string) { - return peermanager.ManagerName + return peermanager.CName } func (p *mockPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { @@ -163,7 +163,7 @@ func (m *mockPeerManagerProvider) Init(a *app.App) (err error) { } func (m *mockPeerManagerProvider) Name() (name string) { - return peermanager.ProviderName + return peermanager.CName } func (m *mockPeerManagerProvider) NewPeerManager(ctx context.Context, spaceId string) (sm peermanager.PeerManager, err error) { diff --git a/commonspace/streamsender/streamsender.go b/commonspace/streamsender/streamsender.go deleted file mode 100644 index 99a45f5c..00000000 --- a/commonspace/streamsender/streamsender.go +++ /dev/null @@ -1,37 +0,0 @@ -package streamsender - -import ( - "github.com/anyproto/any-sync/app" - "github.com/anyproto/any-sync/commonspace/spacesyncproto" -) - -const CName = "common.commonspace.streamsender" - -type StreamSender interface { - app.Component - SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) - Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) -} - -func New() StreamSender { - return &streamSender{} -} - -type streamSender struct { -} - -func (s *streamSender) Init(a *app.App) (err error) { - return -} - -func (s *streamSender) Name() (name string) { - return CName -} - -func (s *streamSender) SendPeer(peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - return nil -} - -func (s *streamSender) Broadcast(msg *spacesyncproto.ObjectSyncMessage) (err error) { - return nil -} diff --git a/commonspace/syncstatus/syncstatus.go b/commonspace/syncstatus/syncstatus.go index acdbc298..2049c42b 100644 --- a/commonspace/syncstatus/syncstatus.go +++ b/commonspace/syncstatus/syncstatus.go @@ -104,7 +104,7 @@ func (s *syncStatusProvider) Init(a *app.App) (err error) { s.updateTimeout = syncTimeout s.spaceId = sharedState.SpaceId s.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) - s.storage = a.MustComponent(spacestorage.StorageName).(spacestorage.SpaceStorage) + s.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) return }