From 36307440863fce0e0972251bd446dab2246c7ab4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 1 Feb 2023 23:21:02 +0100 Subject: [PATCH 01/11] Helper methods for local streams etc --- net/dialer/dialer.go | 10 ++++++++++ net/pool/pool_test.go | 4 ++++ net/streampool/streampool.go | 13 +++++++++++++ 3 files changed, 27 insertions(+) diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 24718286..2076f959 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -34,6 +34,7 @@ func New() Dialer { type Dialer interface { Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) UpdateAddrs(addrs map[string][]string) + SetPeerAddrs(peerId string, addrs []string) app.Component } @@ -62,6 +63,15 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) { d.mu.Unlock() } +func (d *dialer) SetPeerAddrs(peerId string, addrs []string) { + d.mu.Lock() + defer d.mu.Unlock() + if d.peerAddrs == nil { + return + } + d.peerAddrs[peerId] = addrs +} + func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) { d.mu.RLock() defer d.mu.RUnlock() diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index 94fde050..f913333c 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -160,6 +160,10 @@ func (d *dialerMock) UpdateAddrs(addrs map[string][]string) { return } +func (d *dialerMock) SetPeerAddrs(peerId string, addrs []string) { + return +} + func (d *dialerMock) Init(a *app.App) (err error) { return } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 8319e972..01527d14 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -40,6 +40,8 @@ type StreamPool interface { AddTagsCtx(ctx context.Context, tags ...string) error // RemoveTagsCtx removes tags from stream, stream will be extracted from ctx RemoveTagsCtx(ctx context.Context, tags ...string) error + // Streams gets all streams for specific tags + Streams(tags ...string) (streams []drpc.Stream) // Close closes all streams Close() error } @@ -73,6 +75,17 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st }() } +func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) { + s.mu.Lock() + defer s.mu.Unlock() + for _, tag := range tags { + for _, id := range s.streamIdsByTag[tag] { + streams = append(streams, s.streams[id].stream) + } + } + return +} + func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream { s.mu.Lock() defer s.mu.Unlock() From 9ba0620cab5c8fd6f00b1dddb6518be40c87a64a Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 12:55:44 +0100 Subject: [PATCH 02/11] Fix peer getting in sync tree --- commonspace/object/tree/synctree/synctree.go | 32 +++++++++++++++++--- commonspace/space.go | 4 +++ commonspace/spaceservice.go | 1 + 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index e2e1c2b1..3a8f1744 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -59,6 +59,10 @@ var log = logger.NewNamed("commonspace.synctree") var buildObjectTree = objecttree.BuildObjectTree var createSyncClient = newSyncClient +type ResponsiblePeersGetter interface { + GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) +} + type BuildDeps struct { SpaceId string ObjectSync objectsync.ObjectSync @@ -70,15 +74,35 @@ type BuildDeps struct { TreeStorage treestorage.TreeStorage TreeUsage *atomic.Int32 SyncStatus syncstatus.StatusUpdater + PeerGetter ResponsiblePeersGetter WaitTreeRemoteSync bool } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { - peerId, err := peer.CtxPeerId(ctx) + getPeer := func(ctx context.Context) (peerId string, err error) { + peerId, err = peer.CtxPeerId(ctx) + if err == nil { + return + } + err = nil + log.WarnCtx(ctx, "peer not found in context, use responsible") + respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx) if err != nil { - log.WarnCtx(ctx, "peer not found in context, use first responsible") - peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] + return + } + if len(respPeers) == 0 { + err = fmt.Errorf("no responsible peers") + return + } + // TODO: maybe we can check different peers here + peerId = respPeers[0].Id() + return + } + + getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { + peerId, err := getPeer(ctx) + if err != nil { + return } newTreeRequest := GetRequestFactory().CreateNewTreeRequest() diff --git a/commonspace/space.go b/commonspace/space.go index 378490d3..4cb02e9f 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -16,6 +16,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" @@ -122,6 +123,7 @@ type space struct { aclList *syncacl.SyncAcl configuration nodeconf.Configuration settingsObject settings.SettingsObject + peerManager peermanager.PeerManager handleQueue multiqueue.MultiQueue[HandleMessage] @@ -295,6 +297,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea SpaceStorage: s.storage, TreeUsage: &s.treesUsed, SyncStatus: s.syncStatus, + PeerGetter: s.peerManager, } return synctree.PutSyncTree(ctx, payload, deps) } @@ -326,6 +329,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t TreeUsage: &s.treesUsed, SyncStatus: s.syncStatus, WaitTreeRemoteSync: opts.WaitTreeRemoteSync, + PeerGetter: s.peerManager, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 80e48ff0..c26839b8 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -139,6 +139,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { cache: getter, account: s.account, configuration: lastConfiguration, + peerManager: peerManager, storage: st, } return sp, nil From 2da9f84907fe3e0bf352046962989a0ea8af93c1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 14:49:05 +0100 Subject: [PATCH 03/11] Switch peers when getting tree --- commonspace/object/tree/synctree/synctree.go | 31 ++++++++++++-------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 3a8f1744..a9dab5fb 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -79,9 +79,10 @@ type BuildDeps struct { } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - getPeer := func(ctx context.Context) (peerId string, err error) { - peerId, err = peer.CtxPeerId(ctx) + getPeers := func(ctx context.Context) (peerIds []string, err error) { + peerId, err := peer.CtxPeerId(ctx) if err == nil { + peerIds = []string{peerId} return } err = nil @@ -94,17 +95,13 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t err = fmt.Errorf("no responsible peers") return } - // TODO: maybe we can check different peers here - peerId = respPeers[0].Id() + for _, p := range respPeers { + peerIds = append(peerIds, p.Id()) + } return } - getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { - peerId, err := getPeer(ctx) - if err != nil { - return - } - + getTreeRemote := func(peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { newTreeRequest := GetRequestFactory().CreateNewTreeRequest() objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "") if err != nil { @@ -122,14 +119,22 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t } waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { - if !wait { - return getTreeRemote() + availablePeers, err := getPeers(ctx) + if err != nil { + return } + + if !wait { + return getTreeRemote(availablePeers[0]) + } + peerIdx := 0 for { - msg, err = getTreeRemote() + peerIdx = peerIdx % len(availablePeers) + msg, err = getTreeRemote(availablePeers[peerIdx]) if err == nil { return } + peerIdx++ select { case <-ctx.Done(): err = fmt.Errorf("waiting for object %s interrupted, context closed", id) From fad981daee3ac54a9878d08660a1efbb43331573 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 14:55:30 +0100 Subject: [PATCH 04/11] Update responsible peers --- commonspace/object/tree/synctree/synctree.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index a9dab5fb..0a6e3211 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -134,6 +134,11 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t if err == nil { return } + // updating peers in case new peers arrived while we were waiting + availablePeers, err = getPeers(ctx) + if err != nil { + return + } peerIdx++ select { case <-ctx.Done(): From a193906d94850681a1bf1900dd60022df359c6b1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 15:36:12 +0100 Subject: [PATCH 05/11] Add wait for available peers to appear --- commonspace/object/tree/synctree/synctree.go | 44 ++++++++++---------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 0a6e3211..286f23ec 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -19,6 +19,7 @@ import ( "github.com/gogo/protobuf/proto" "go.uber.org/zap" "sync/atomic" + "time" ) var ( @@ -119,34 +120,35 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t } waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { - availablePeers, err := getPeers(ctx) - if err != nil { - return - } - - if !wait { - return getTreeRemote(availablePeers[0]) - } peerIdx := 0 + Loop: for { - peerIdx = peerIdx % len(availablePeers) - msg, err = getTreeRemote(availablePeers[peerIdx]) - if err == nil { - return - } - // updating peers in case new peers arrived while we were waiting - availablePeers, err = getPeers(ctx) - if err != nil { - return - } - peerIdx++ select { case <-ctx.Done(): - err = fmt.Errorf("waiting for object %s interrupted, context closed", id) - return + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id) default: break } + availablePeers, err := getPeers(ctx) + if err != nil { + if !wait { + return nil, err + } + select { + // wait for peers to connect + case <-time.After(1 * time.Second): + continue Loop + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id) + } + } + + peerIdx = peerIdx % len(availablePeers) + msg, err = getTreeRemote(availablePeers[peerIdx]) + if err == nil || !wait { + return msg, err + } + peerIdx++ } } From ea48365d2f45841eda5af608a3922e9ec3d72002 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 20:12:02 +0100 Subject: [PATCH 06/11] Simplify ping tree --- commonspace/headsync/diffsyncer.go | 6 ++--- .../synctree/mock_synctree/mock_synctree.go | 22 ++++--------------- .../object/tree/synctree/syncclient.go | 13 ----------- commonspace/object/tree/synctree/synctree.go | 6 ++--- commonspace/objectsync/msgpool.go | 1 + 5 files changed, 11 insertions(+), 37 deletions(-) diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 9db4d3db..8168abe8 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -132,7 +132,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - d.pingTreesInCache(ctx, filteredIds) + d.pingTreesInCache(ctx, p.Id(), filteredIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), @@ -143,7 +143,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) return } -func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { +func (d *diffSyncer) pingTreesInCache(ctx context.Context, peerId string, trees []string) { for _, tId := range trees { tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { @@ -159,7 +159,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - if err = syncTree.Ping(ctx); err != nil { + if err = syncTree.Ping(ctx, peerId); err != nil { d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId)) } else { d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId)) diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 8218aa2e..f17ab9ac 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -54,20 +54,6 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1) } -// BroadcastAsyncOrSendResponsible mocks base method. -func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. -func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1) -} - // CreateFullSyncRequest mocks base method. func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) { m.ctrl.T.Helper() @@ -395,17 +381,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { } // Ping mocks base method. -func (m *MockSyncTree) Ping(arg0 context.Context) error { +func (m *MockSyncTree) Ping(arg0 context.Context, arg1 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ping", arg0) + ret := m.ctrl.Call(m, "Ping", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // Ping indicates an expected call of Ping. -func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call { +func (mr *MockSyncTreeMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0, arg1) } // RLock mocks base method. diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index e858f190..6ad98c7c 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -12,7 +12,6 @@ import ( type SyncClient interface { RequestFactory Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) - BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) } @@ -52,18 +51,6 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *tree return s.MessagePool.SendPeer(ctx, peerId, objMsg) } -func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "") - if err != nil { - return - } - - if s.configuration.IsResponsible(s.spaceId) { - return s.MessagePool.SendResponsible(ctx, objMsg) - } - return s.Broadcast(ctx, message) -} - func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { payload, err := message.Marshal() if err != nil { diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 286f23ec..1c3e4c48 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -39,7 +39,7 @@ type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler ListenerSetter - Ping(ctx context.Context) (err error) + Ping(ctx context.Context, peerId string) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -344,11 +344,11 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) Ping(ctx context.Context) (err error) { +func (s *syncTree) Ping(ctx context.Context, peerId string) (err error) { s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate) + return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "") } func (s *syncTree) afterBuild() { diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 47ba14c4..4d4f11e1 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -90,6 +90,7 @@ func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.O s.updateLastUsage() return s.PeerManager.SendResponsible(ctx, msg) } + func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() return s.PeerManager.Broadcast(ctx, msg) From 27b1c948e9913b843f87d58cf5b9295c01547a97 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 20:15:55 +0100 Subject: [PATCH 07/11] Remove SendResponsible --- commonspace/objectsync/msgpool.go | 5 ----- .../mock_peermanager/mock_peermanager.go | 14 -------------- commonspace/peermanager/peermanager.go | 2 -- 3 files changed, 21 deletions(-) diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 4d4f11e1..c47fc30d 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -86,11 +86,6 @@ func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyn return s.PeerManager.SendPeer(ctx, peerId, msg) } -func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { - s.updateLastUsage() - return s.PeerManager.SendResponsible(ctx, msg) -} - func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() return s.PeerManager.Broadcast(ctx, msg) diff --git a/commonspace/peermanager/mock_peermanager/mock_peermanager.go b/commonspace/peermanager/mock_peermanager/mock_peermanager.go index ae97fe7b..f7d0ce74 100644 --- a/commonspace/peermanager/mock_peermanager/mock_peermanager.go +++ b/commonspace/peermanager/mock_peermanager/mock_peermanager.go @@ -78,17 +78,3 @@ func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2) } - -// SendResponsible mocks base method. -func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendResponsible indicates an expected call of SendResponsible. -func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1) -} diff --git a/commonspace/peermanager/peermanager.go b/commonspace/peermanager/peermanager.go index 373c3a51..676d000c 100644 --- a/commonspace/peermanager/peermanager.go +++ b/commonspace/peermanager/peermanager.go @@ -13,8 +13,6 @@ const CName = "common.commonspace.peermanager" type PeerManager interface { // SendPeer sends a message to a stream by peerId SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) - // SendResponsible sends a message to responsible peers streams - SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) // Broadcast sends a message to all subscribed peers Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) // GetResponsiblePeers dials or gets from cache responsible peers to unary operations From 9c696ad96a3cdc1c9ef7cf738ad758778bfc7a05 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 4 Feb 2023 17:38:27 +0100 Subject: [PATCH 08/11] Rename sync tree --- commonspace/headsync/diffsyncer.go | 10 +++---- .../synctree/mock_synctree/mock_synctree.go | 28 +++++++++---------- commonspace/object/tree/synctree/synctree.go | 4 +-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 8168abe8..5ec3ce98 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -132,7 +132,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - d.pingTreesInCache(ctx, p.Id(), filteredIds) + d.syncTrees(ctx, p.Id(), filteredIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), @@ -143,7 +143,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) return } -func (d *diffSyncer) pingTreesInCache(ctx context.Context, peerId string, trees []string) { +func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) { for _, tId := range trees { tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { @@ -159,10 +159,10 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, peerId string, trees // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - if err = syncTree.Ping(ctx, peerId); err != nil { - d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId)) + if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { + d.log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err), zap.String("treeId", tId)) } else { - d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId)) + d.log.DebugCtx(ctx, "success synctree.SyncWithPeer", zap.String("treeId", tId)) } } } diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index f17ab9ac..992ae5c6 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -380,20 +380,6 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncTree)(nil).Lock)) } -// Ping mocks base method. -func (m *MockSyncTree) Ping(arg0 context.Context, arg1 string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ping", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// Ping indicates an expected call of Ping. -func (mr *MockSyncTreeMockRecorder) Ping(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0, arg1) -} - // RLock mocks base method. func (m *MockSyncTree) RLock() { m.ctrl.T.Helper() @@ -472,6 +458,20 @@ func (mr *MockSyncTreeMockRecorder) Storage() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockSyncTree)(nil).Storage)) } +// SyncWithPeer mocks base method. +func (m *MockSyncTree) SyncWithPeer(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncWithPeer", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncWithPeer indicates an expected call of SyncWithPeer. +func (mr *MockSyncTreeMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1) +} + // Unlock mocks base method. func (m *MockSyncTree) Unlock() { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 1c3e4c48..45e4bed4 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -39,7 +39,7 @@ type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler ListenerSetter - Ping(ctx context.Context, peerId string) (err error) + SyncWithPeer(ctx context.Context, peerId string) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -344,7 +344,7 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) Ping(ctx context.Context, peerId string) (err error) { +func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) { s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) From d3362ba85e19c4e5b42e8d8629f7f2121c82547e Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 4 Feb 2023 17:41:20 +0100 Subject: [PATCH 09/11] Update batcher to 3.0.1 --- go.mod | 4 ++-- go.sum | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index ea57e167..312fec98 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/anytypeio/go-chash v0.0.2 github.com/awalterschulze/gographviz v2.0.3+incompatible github.com/cespare/xxhash v1.1.0 - github.com/cheggaaa/mb/v3 v3.0.0 + github.com/cheggaaa/mb/v3 v3.0.1 github.com/goccy/go-graphviz v0.0.9 github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 @@ -28,7 +28,6 @@ require ( github.com/stretchr/testify v1.8.1 github.com/zeebo/blake3 v0.2.3 github.com/zeebo/errs v1.3.0 - go.uber.org/atomic v1.10.0 go.uber.org/zap v1.24.0 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/net v0.3.0 @@ -94,6 +93,7 @@ require ( github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect go.opentelemetry.io/otel v1.11.2 // indirect go.opentelemetry.io/otel/trace v1.11.2 // indirect + go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.9.0 // indirect golang.org/x/crypto v0.4.0 // indirect golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect diff --git a/go.sum b/go.sum index 7f61f8fe..1eb28b79 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cheggaaa/mb/v3 v3.0.0 h1:+FkV4fAefQfJSsfMtWC9cnSrVYKd3TXcerPTwRuWWfE= -github.com/cheggaaa/mb/v3 v3.0.0/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= +github.com/cheggaaa/mb/v3 v3.0.1 h1:BuEOipGTqybXYi5KXVCpqhR1LWN2lrurq6UrH+VBhXc= +github.com/cheggaaa/mb/v3 v3.0.1/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= From d3108b5e6c8647eaa8f6ebf1b7c2ac8e7ed846a8 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 5 Feb 2023 11:31:14 +0100 Subject: [PATCH 10/11] Move remote getter to separate entity --- commonspace/object/tree/synctree/synctree.go | 120 +-------------- .../object/tree/synctree/treeremotegetter.go | 140 ++++++++++++++++++ 2 files changed, 142 insertions(+), 118 deletions(-) create mode 100644 commonspace/object/tree/synctree/treeremotegetter.go diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 45e4bed4..c63dcc96 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -3,12 +3,10 @@ package synctree import ( "context" "errors" - "fmt" "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" - "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" @@ -16,10 +14,8 @@ import ( "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" - "github.com/gogo/protobuf/proto" "go.uber.org/zap" "sync/atomic" - "time" ) var ( @@ -80,120 +76,8 @@ type BuildDeps struct { } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - getPeers := func(ctx context.Context) (peerIds []string, err error) { - peerId, err := peer.CtxPeerId(ctx) - if err == nil { - peerIds = []string{peerId} - return - } - err = nil - log.WarnCtx(ctx, "peer not found in context, use responsible") - respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx) - if err != nil { - return - } - if len(respPeers) == 0 { - err = fmt.Errorf("no responsible peers") - return - } - for _, p := range respPeers { - peerIds = append(peerIds, p.Id()) - } - return - } - - getTreeRemote := func(peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { - newTreeRequest := GetRequestFactory().CreateNewTreeRequest() - objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "") - if err != nil { - return - } - - resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) - if err != nil { - return - } - - msg = &treechangeproto.TreeSyncMessage{} - err = proto.Unmarshal(resp.Payload, msg) - return - } - - waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { - peerIdx := 0 - Loop: - for { - select { - case <-ctx.Done(): - return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id) - default: - break - } - availablePeers, err := getPeers(ctx) - if err != nil { - if !wait { - return nil, err - } - select { - // wait for peers to connect - case <-time.After(1 * time.Second): - continue Loop - case <-ctx.Done(): - return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id) - } - } - - peerIdx = peerIdx % len(availablePeers) - msg, err = getTreeRemote(availablePeers[peerIdx]) - if err == nil || !wait { - return msg, err - } - peerIdx++ - } - } - - deps.TreeStorage, err = deps.SpaceStorage.TreeStorage(id) - if err == nil { - return buildSyncTree(ctx, false, deps) - } - - if err != nil && err != treestorage.ErrUnknownTreeId { - return - } - - status, err := deps.SpaceStorage.TreeDeletedStatus(id) - if err != nil { - return - } - if status != "" { - err = spacestorage.ErrTreeStorageAlreadyDeleted - return - } - - resp, err := waitTree(deps.WaitTreeRemoteSync) - if err != nil { - return - } - if resp.GetContent().GetFullSyncResponse() == nil { - err = fmt.Errorf("expected to get full sync response, but got something else") - return - } - fullSyncResp := resp.GetContent().GetFullSyncResponse() - - payload := treestorage.TreeStorageCreatePayload{ - RootRawChange: resp.RootChange, - Changes: fullSyncResp.Changes, - Heads: fullSyncResp.Heads, - } - - // basically building tree with in-memory storage and validating that it was without errors - log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree") - err = objecttree.ValidateRawTree(payload, deps.AclList) - if err != nil { - return - } - // now we are sure that we can save it to the storage - deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload) + remoteGetter := treeRemoteGetter{treeId: id, deps: deps} + deps.TreeStorage, err = remoteGetter.getTree(ctx) if err != nil { return } diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go new file mode 100644 index 00000000..561d850c --- /dev/null +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -0,0 +1,140 @@ +package synctree + +import ( + "context" + "fmt" + "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" + "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" + "github.com/anytypeio/any-sync/commonspace/spacestorage" + "github.com/anytypeio/any-sync/net/peer" + "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "time" +) + +type treeRemoteGetter struct { + deps BuildDeps + treeId string +} + +func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter { + return treeRemoteGetter{treeId: treeId, deps: deps} +} + +func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) { + peerId, err := peer.CtxPeerId(ctx) + if err == nil { + peerIds = []string{peerId} + return + } + err = nil + log.WarnCtx(ctx, "peer not found in context, use responsible") + respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx) + if err != nil { + return + } + if len(respPeers) == 0 { + err = fmt.Errorf("no responsible peers") + return + } + for _, p := range respPeers { + peerIds = append(peerIds, p.Id()) + } + return +} + +func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { + newTreeRequest := GetRequestFactory().CreateNewTreeRequest() + objMsg, err := marshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "") + if err != nil { + return + } + + resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) + if err != nil { + return + } + + msg = &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(resp.Payload, msg) + return +} + +func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { + peerIdx := 0 +Loop: + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) + default: + break + } + availablePeers, err := t.getPeers(ctx) + if err != nil { + if !wait { + return nil, err + } + select { + // wait for peers to connect + case <-time.After(1 * time.Second): + continue Loop + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) + } + } + + peerIdx = peerIdx % len(availablePeers) + msg, err = t.treeRequest(ctx, availablePeers[peerIdx]) + if err == nil || !wait { + return msg, err + } + peerIdx++ + } +} + +func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) { + treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId) + if err == nil { + return + } + + if err != nil && err != treestorage.ErrUnknownTreeId { + return + } + + status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId) + if err != nil { + return + } + if status != "" { + err = spacestorage.ErrTreeStorageAlreadyDeleted + return + } + + resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync) + if err != nil { + return + } + if resp.GetContent().GetFullSyncResponse() == nil { + err = fmt.Errorf("expected to get full sync response, but got something else") + return + } + fullSyncResp := resp.GetContent().GetFullSyncResponse() + + payload := treestorage.TreeStorageCreatePayload{ + RootRawChange: resp.RootChange, + Changes: fullSyncResp.Changes, + Heads: fullSyncResp.Heads, + } + + // basically building tree with in-memory storage and validating that it was without errors + log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree") + err = objecttree.ValidateRawTree(payload, t.deps.AclList) + if err != nil { + return + } + // now we are sure that we can save it to the storage + return t.deps.SpaceStorage.CreateTreeStorage(payload) +} From b2140f3c30de2a9183b8fe422048b0ed68044396 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Sun, 5 Feb 2023 17:29:55 +0300 Subject: [PATCH 11/11] fix win build --- commonspace/object/tree/objecttree/treegraph.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/commonspace/object/tree/objecttree/treegraph.go b/commonspace/object/tree/objecttree/treegraph.go index 9bb9611e..722ef975 100644 --- a/commonspace/object/tree/objecttree/treegraph.go +++ b/commonspace/object/tree/objecttree/treegraph.go @@ -1,6 +1,5 @@ -//go:build ((!linux && !darwin) || android || ios || nographviz) && !amd64 -// +build !linux,!darwin android ios nographviz -// +build !amd64 +//go:build (!linux && !darwin) || android || ios || nographviz || windows +// +build !linux,!darwin android ios nographviz windows package objecttree