diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index f93de528..dc87a3c4 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -55,16 +55,14 @@ type syncTree struct { var log = logger.NewNamed("common.commonspace.synctree") -var createSyncClient = objectsync.NewSyncClient - type ResponsiblePeersGetter interface { GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) } type BuildDeps struct { SpaceId string - ObjectSync objectsync.ObjectSync - Configuration nodeconf.Configuration + SyncClient objectsync.SyncClient + Configuration nodeconf.NodeConf HeadNotifiable HeadNotifiable Listener updatelistener.UpdateListener AclList list.AclList @@ -99,10 +97,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy if err != nil { return } - syncClient := createSyncClient( - deps.SpaceId, - deps.ObjectSync.MessagePool(), - objectsync.GetRequestFactory()) + syncClient := deps.SyncClient syncTree := &syncTree{ ObjectTree: objTree, syncClient: syncClient, diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go index 9f819c41..bcf547b0 100644 --- a/commonspace/object/tree/synctree/treeremotegetter.go +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -48,12 +48,7 @@ func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err e func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { newTreeRequest := objectsync.GetRequestFactory().CreateNewTreeRequest() - objMsg, err := objectsync.MarshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "") - if err != nil { - return - } - - resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) + resp, err := t.deps.SyncClient.SendSync(ctx, peerId, t.treeId, newTreeRequest) if err != nil { return } diff --git a/commonspace/object/tree/synctree/utils_test.go b/commonspace/object/tree/synctree/utils_test.go index 9d2c4f68..451f7536 100644 --- a/commonspace/object/tree/synctree/utils_test.go +++ b/commonspace/object/tree/synctree/utils_test.go @@ -95,7 +95,7 @@ type testSyncHandler struct { // createSyncHandler creates a sync handler when a tree is already created func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler { factory := objectsync.GetRequestFactory() - syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory) + syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) netTree := &broadcastTree{ ObjectTree: objTree, SyncClient: syncClient, @@ -107,7 +107,7 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo // createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *testSyncHandler { factory := objectsync.GetRequestFactory() - syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory) + syncClient := objectsync.NewSyncClient(spaceId, newTestMessagePool(peerId, log), factory) batcher := mb.New[protocolMsg](0) return &testSyncHandler{ @@ -173,11 +173,11 @@ func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, re return h.manager().Broadcast(context.Background(), objMsg) } -func (h *testSyncHandler) manager() *testPeerManager { +func (h *testSyncHandler) manager() *testMessagePool { if h.SyncHandler != nil { - return h.SyncHandler.(*syncTreeHandler).syncClient.PeerManager().(*testPeerManager) + return h.SyncHandler.(*syncTreeHandler).syncClient.MessagePool().(*testMessagePool) } - return h.syncClient.PeerManager().(*testPeerManager) + return h.syncClient.MessagePool().(*testMessagePool) } func (h *testSyncHandler) tree() *broadcastTree { @@ -218,22 +218,22 @@ func (h *testSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGr }() } -// testPeerManager captures all other handlers and sends messages to them -type testPeerManager struct { +// testMessagePool captures all other handlers and sends messages to them +type testMessagePool struct { peerId string handlers map[string]*testSyncHandler log *messageLog } -func newTestPeerManager(peerId string, log *messageLog) *testPeerManager { - return &testPeerManager{handlers: map[string]*testSyncHandler{}, peerId: peerId, log: log} +func newTestMessagePool(peerId string, log *messageLog) *testMessagePool { + return &testMessagePool{handlers: map[string]*testSyncHandler{}, peerId: peerId, log: log} } -func (m *testPeerManager) addHandler(peerId string, handler *testSyncHandler) { +func (m *testMessagePool) addHandler(peerId string, handler *testSyncHandler) { m.handlers[peerId] = handler } -func (m *testPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { +func (m *testMessagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { pMsg := protocolMsg{ msg: msg, senderId: m.peerId, @@ -243,7 +243,7 @@ func (m *testPeerManager) SendPeer(ctx context.Context, peerId string, msg *spac return m.handlers[peerId].send(context.Background(), pMsg) } -func (m *testPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { +func (m *testMessagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { for _, handler := range m.handlers { pMsg := protocolMsg{ msg: msg, @@ -256,7 +256,19 @@ func (m *testPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.Obj return } -func (m *testPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { +func (m *testMessagePool) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { + panic("should not be called") +} + +func (m *testMessagePool) LastUsage() time.Time { + panic("should not be called") +} + +func (m *testMessagePool) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { + panic("should not be called") +} + +func (m *testMessagePool) SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { panic("should not be called") } diff --git a/commonspace/objectsync/mock_objectsync/mock_objectsync.go b/commonspace/objectsync/mock_objectsync/mock_objectsync.go index aaca46a5..29ea2717 100644 --- a/commonspace/objectsync/mock_objectsync/mock_objectsync.go +++ b/commonspace/objectsync/mock_objectsync/mock_objectsync.go @@ -10,7 +10,8 @@ import ( objecttree "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" treechangeproto "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" - peermanager "github.com/anytypeio/any-sync/commonspace/peermanager" + objectsync "github.com/anytypeio/any-sync/commonspace/objectsync" + spacesyncproto "github.com/anytypeio/any-sync/commonspace/spacesyncproto" gomock "github.com/golang/mock/gomock" ) @@ -109,18 +110,33 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest)) } -// PeerManager mocks base method. -func (m *MockSyncClient) PeerManager() peermanager.PeerManager { +// MessagePool mocks base method. +func (m *MockSyncClient) MessagePool() objectsync.MessagePool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "PeerManager") - ret0, _ := ret[0].(peermanager.PeerManager) + ret := m.ctrl.Call(m, "MessagePool") + ret0, _ := ret[0].(objectsync.MessagePool) return ret0 } -// PeerManager indicates an expected call of PeerManager. -func (mr *MockSyncClientMockRecorder) PeerManager() *gomock.Call { +// MessagePool indicates an expected call of MessagePool. +func (mr *MockSyncClientMockRecorder) MessagePool() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PeerManager", reflect.TypeOf((*MockSyncClient)(nil).PeerManager)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MessagePool", reflect.TypeOf((*MockSyncClient)(nil).MessagePool)) +} + +// SendSync mocks base method. +func (m *MockSyncClient) SendSync(arg0 context.Context, arg1, arg2 string, arg3 *treechangeproto.TreeSyncMessage) (*spacesyncproto.ObjectSyncMessage, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SendSync", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(*spacesyncproto.ObjectSyncMessage) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendSync indicates an expected call of SendSync. +func (mr *MockSyncClientMockRecorder) SendSync(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendSync", reflect.TypeOf((*MockSyncClient)(nil).SendSync), arg0, arg1, arg2, arg3) } // SendWithReply mocks base method. diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index f3c3fd74..65311e14 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -24,7 +24,7 @@ var log = logger.NewNamed("common.commonspace.objectsync") type ObjectSync interface { LastUsage synchandler.SyncHandler - MessagePool() MessagePool + SyncClient() SyncClient Close() (err error) } @@ -59,9 +59,9 @@ func NewObjectSync( cancelSync: cancel, spaceIsDeleted: spaceIsDeleted, configuration: configuration, - syncClient: NewSyncClient(spaceId, peerManager, GetRequestFactory()), } os.messagePool = newMessagePool(peerManager, os.handleMessage) + os.syncClient = NewSyncClient(spaceId, os.messagePool, GetRequestFactory()) return os } @@ -79,7 +79,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message } func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId)) + log := log.With( + zap.String("objectId", msg.ObjectId), + zap.String("requestId", msg.RequestId), + zap.String("replyId", msg.ReplyId)) if s.spaceIsDeleted.Load() { log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree @@ -97,11 +100,11 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp treeMsg := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, treeMsg) if err != nil { - return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.ReplyId) + return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId) } // this means that we don't have the tree locally and therefore can't return it if s.isEmptyFullSyncRequest(treeMsg) { - return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.ReplyId) + return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId) } } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) @@ -112,8 +115,8 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp return obj.HandleMessage(ctx, senderId, msg) } -func (s *objectSync) MessagePool() MessagePool { - return s.messagePool +func (s *objectSync) SyncClient() SyncClient { + return s.syncClient } func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) (err error) { @@ -122,7 +125,7 @@ func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncprot if err != nil { return } - return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.ReplyId) + return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId) } func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) (err error) { diff --git a/commonspace/objectsync/syncclient.go b/commonspace/objectsync/syncclient.go index 4a95cae1..e6fd515e 100644 --- a/commonspace/objectsync/syncclient.go +++ b/commonspace/objectsync/syncclient.go @@ -3,7 +3,6 @@ package objectsync import ( "context" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" - "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" ) @@ -11,21 +10,22 @@ type SyncClient interface { RequestFactory Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) - PeerManager() peermanager.PeerManager + SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) + MessagePool() MessagePool } type syncClient struct { RequestFactory spaceId string - peerManager peermanager.PeerManager + messagePool MessagePool } func NewSyncClient( spaceId string, - peerManager peermanager.PeerManager, + messagePool MessagePool, factory RequestFactory) SyncClient { return &syncClient{ - peerManager: peerManager, + messagePool: messagePool, RequestFactory: factory, spaceId: spaceId, } @@ -36,7 +36,15 @@ func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyn if err != nil { return } - return s.peerManager.Broadcast(ctx, objMsg) + return s.messagePool.Broadcast(ctx, objMsg) +} + +func (s *syncClient) SendSync(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, "") + if err != nil { + return + } + return s.messagePool.SendSync(ctx, peerId, objMsg) } func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) { @@ -44,11 +52,11 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string, if err != nil { return } - return s.peerManager.SendPeer(ctx, peerId, objMsg) + return s.messagePool.SendPeer(ctx, peerId, objMsg) } -func (s *syncClient) PeerManager() peermanager.PeerManager { - return s.peerManager +func (s *syncClient) MessagePool() MessagePool { + return s.messagePool } func MarshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { diff --git a/commonspace/space.go b/commonspace/space.go index de8c8f76..24ca069a 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -179,7 +179,7 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } - s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool()) + s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.SyncClient().MessagePool()) s.treeManager.AddObject(s.aclList) deletionState := settingsstate.NewObjectDeletionState(s.storage) @@ -284,7 +284,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea } deps := synctree.BuildDeps{ SpaceId: s.id, - ObjectSync: s.objectSync, + SyncClient: s.objectSync.SyncClient(), Configuration: s.configuration, HeadNotifiable: s.headSync, Listener: listener, @@ -322,7 +322,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t deps := synctree.BuildDeps{ SpaceId: s.id, - ObjectSync: s.objectSync, + SyncClient: s.objectSync.SyncClient(), Configuration: s.configuration, HeadNotifiable: s.headSync, Listener: opts.Listener,