From bc08acd36c63155d8dfbc292c6f5e09be932bc4b Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 3 Feb 2023 12:55:44 +0100 Subject: [PATCH] Fix peer getting in sync tree --- commonspace/object/tree/synctree/synctree.go | 32 +++++++++++++++++--- commonspace/space.go | 4 +++ commonspace/spaceservice.go | 1 + 3 files changed, 33 insertions(+), 4 deletions(-) diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index b73d91f0..0ac3e2a8 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -54,6 +54,10 @@ var log = logger.NewNamed("commonspace.synctree") var buildObjectTree = objecttree.BuildObjectTree var createSyncClient = newSyncClient +type ResponsiblePeersGetter interface { + GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) +} + type BuildDeps struct { SpaceId string ObjectSync objectsync.ObjectSync @@ -65,15 +69,35 @@ type BuildDeps struct { TreeStorage treestorage.TreeStorage TreeUsage *atomic.Int32 SyncStatus syncstatus.StatusUpdater + PeerGetter ResponsiblePeersGetter WaitTreeRemoteSync bool } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { - peerId, err := peer.CtxPeerId(ctx) + getPeer := func(ctx context.Context) (peerId string, err error) { + peerId, err = peer.CtxPeerId(ctx) + if err == nil { + return + } + err = nil + log.WarnCtx(ctx, "peer not found in context, use responsible") + respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx) if err != nil { - log.WarnCtx(ctx, "peer not found in context, use first responsible") - peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] + return + } + if len(respPeers) == 0 { + err = fmt.Errorf("no responsible peers") + return + } + // TODO: maybe we can check different peers here + peerId = respPeers[0].Id() + return + } + + getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { + peerId, err := getPeer(ctx) + if err != nil { + return } newTreeRequest := GetRequestFactory().CreateNewTreeRequest() diff --git a/commonspace/space.go b/commonspace/space.go index 378490d3..4cb02e9f 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -16,6 +16,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" @@ -122,6 +123,7 @@ type space struct { aclList *syncacl.SyncAcl configuration nodeconf.Configuration settingsObject settings.SettingsObject + peerManager peermanager.PeerManager handleQueue multiqueue.MultiQueue[HandleMessage] @@ -295,6 +297,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea SpaceStorage: s.storage, TreeUsage: &s.treesUsed, SyncStatus: s.syncStatus, + PeerGetter: s.peerManager, } return synctree.PutSyncTree(ctx, payload, deps) } @@ -326,6 +329,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t TreeUsage: &s.treesUsed, SyncStatus: s.syncStatus, WaitTreeRemoteSync: opts.WaitTreeRemoteSync, + PeerGetter: s.peerManager, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 80e48ff0..c26839b8 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -139,6 +139,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { cache: getter, account: s.account, configuration: lastConfiguration, + peerManager: peerManager, storage: st, } return sp, nil