diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 9db4d3db..8168abe8 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -132,7 +132,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - d.pingTreesInCache(ctx, filteredIds) + d.pingTreesInCache(ctx, p.Id(), filteredIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), @@ -143,7 +143,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) return } -func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { +func (d *diffSyncer) pingTreesInCache(ctx context.Context, peerId string, trees []string) { for _, tId := range trees { tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { @@ -159,7 +159,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - if err = syncTree.Ping(ctx); err != nil { + if err = syncTree.Ping(ctx, peerId); err != nil { d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId)) } else { d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId)) diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 8218aa2e..f17ab9ac 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -54,20 +54,6 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1) } -// BroadcastAsyncOrSendResponsible mocks base method. -func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. -func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1) -} - // CreateFullSyncRequest mocks base method. func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) { m.ctrl.T.Helper() @@ -395,17 +381,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { } // Ping mocks base method. -func (m *MockSyncTree) Ping(arg0 context.Context) error { +func (m *MockSyncTree) Ping(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ping", arg0) + ret := m.ctrl.Call(m, "Ping", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Ping indicates an expected call of Ping. -func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call { +func (mr *MockSyncTreeMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0, arg1) } // RLock mocks base method. diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index e858f190..6ad98c7c 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -12,7 +12,6 @@ import ( type SyncClient interface { RequestFactory Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) - BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) } @@ -52,18 +51,6 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *tree return s.MessagePool.SendPeer(ctx, peerId, objMsg) } -func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "") - if err != nil { - return - } - - if s.configuration.IsResponsible(s.spaceId) { - return s.MessagePool.SendResponsible(ctx, objMsg) - } - return s.Broadcast(ctx, message) -} - func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { payload, err := message.Marshal() if err != nil { diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 21dc8add..1c3e4c48 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -31,10 +31,15 @@ type HeadNotifiable interface { UpdateHeads(id string, heads []string) } +type ListenerSetter interface { + SetListener(listener updatelistener.UpdateListener) +} + type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler - Ping(ctx context.Context) (err error) + ListenerSetter + Ping(ctx context.Context, peerId string) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -238,6 +243,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy return } +func (s *syncTree) SetListener(listener updatelistener.UpdateListener) { + // this should be called under lock + s.listener = listener +} + func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) { if err = s.checkAlive(); err != nil { return @@ -334,11 +344,11 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) Ping(ctx context.Context) (err error) { +func (s *syncTree) Ping(ctx context.Context, peerId string) (err error) { s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate) + return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "") } func (s *syncTree) afterBuild() { diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 47ba14c4..4d4f11e1 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -90,6 +90,7 @@ func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.O s.updateLastUsage() return s.PeerManager.SendResponsible(ctx, msg) } + func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() return s.PeerManager.Broadcast(ctx, msg)