Simplify ping tree

This commit is contained in:
mcrakhman 2023-02-03 20:12:02 +01:00 committed by Mikhail Iudin
parent 7541375e43
commit 5db8fd8f0b
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
5 changed files with 21 additions and 37 deletions

View File

@ -132,7 +132,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
d.pingTreesInCache(ctx, filteredIds) d.pingTreesInCache(ctx, p.Id(), filteredIds)
d.log.Info("sync done:", zap.Int("newIds", len(newIds)), d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)), zap.Int("changedIds", len(changedIds)),
@ -143,7 +143,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
return return
} }
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { func (d *diffSyncer) pingTreesInCache(ctx context.Context, peerId string, trees []string) {
for _, tId := range trees { for _, tId := range trees {
tree, err := d.cache.GetTree(ctx, d.spaceId, tId) tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
if err != nil { if err != nil {
@ -159,7 +159,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
// it may be already there (i.e. loaded) // it may be already there (i.e. loaded)
// and build func will not be called, thus we won't sync the tree // and build func will not be called, thus we won't sync the tree
// therefore we just do it manually // therefore we just do it manually
if err = syncTree.Ping(ctx); err != nil { if err = syncTree.Ping(ctx, peerId); err != nil {
d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId)) d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId))
} else { } else {
d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId)) d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId))

View File

@ -54,20 +54,6 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1)
} }
// BroadcastAsyncOrSendResponsible mocks base method.
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible.
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1)
}
// CreateFullSyncRequest mocks base method. // CreateFullSyncRequest mocks base method.
func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) { func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -395,17 +381,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call {
} }
// Ping mocks base method. // Ping mocks base method.
func (m *MockSyncTree) Ping(arg0 context.Context) error { func (m *MockSyncTree) Ping(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Ping", arg0) ret := m.ctrl.Call(m, "Ping", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Ping indicates an expected call of Ping. // Ping indicates an expected call of Ping.
func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call { func (mr *MockSyncTreeMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0, arg1)
} }
// RLock mocks base method. // RLock mocks base method.

View File

@ -12,7 +12,6 @@ import (
type SyncClient interface { type SyncClient interface {
RequestFactory RequestFactory
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
} }
@ -52,18 +51,6 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *tree
return s.MessagePool.SendPeer(ctx, peerId, objMsg) return s.MessagePool.SendPeer(ctx, peerId, objMsg)
} }
func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "")
if err != nil {
return
}
if s.configuration.IsResponsible(s.spaceId) {
return s.MessagePool.SendResponsible(ctx, objMsg)
}
return s.Broadcast(ctx, message)
}
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal() payload, err := message.Marshal()
if err != nil { if err != nil {

View File

@ -31,10 +31,15 @@ type HeadNotifiable interface {
UpdateHeads(id string, heads []string) UpdateHeads(id string, heads []string)
} }
type ListenerSetter interface {
SetListener(listener updatelistener.UpdateListener)
}
type SyncTree interface { type SyncTree interface {
objecttree.ObjectTree objecttree.ObjectTree
synchandler.SyncHandler synchandler.SyncHandler
Ping(ctx context.Context) (err error) ListenerSetter
Ping(ctx context.Context, peerId string) (err error)
} }
// SyncTree sends head updates to sync service and also sends new changes to update listener // SyncTree sends head updates to sync service and also sends new changes to update listener
@ -238,6 +243,11 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
return return
} }
func (s *syncTree) SetListener(listener updatelistener.UpdateListener) {
// this should be called under lock
s.listener = listener
}
func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) { func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) {
if err = s.checkAlive(); err != nil { if err = s.checkAlive(); err != nil {
return return
@ -334,11 +344,11 @@ func (s *syncTree) checkAlive() (err error) {
return return
} }
func (s *syncTree) Ping(ctx context.Context) (err error) { func (s *syncTree) Ping(ctx context.Context, peerId string) (err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
headUpdate := s.syncClient.CreateHeadUpdate(s, nil) headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate) return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "")
} }
func (s *syncTree) afterBuild() { func (s *syncTree) afterBuild() {

View File

@ -90,6 +90,7 @@ func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.O
s.updateLastUsage() s.updateLastUsage()
return s.PeerManager.SendResponsible(ctx, msg) return s.PeerManager.SendResponsible(ctx, msg)
} }
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage() s.updateLastUsage()
return s.PeerManager.Broadcast(ctx, msg) return s.PeerManager.Broadcast(ctx, msg)