Fix peer getting in sync tree

This commit is contained in:
mcrakhman 2023-02-03 12:55:44 +01:00 committed by Mikhail Iudin
parent 8097445e6e
commit bc08acd36c
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
3 changed files with 33 additions and 4 deletions

View File

@ -54,6 +54,10 @@ var log = logger.NewNamed("commonspace.synctree")
var buildObjectTree = objecttree.BuildObjectTree var buildObjectTree = objecttree.BuildObjectTree
var createSyncClient = newSyncClient var createSyncClient = newSyncClient
type ResponsiblePeersGetter interface {
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
}
type BuildDeps struct { type BuildDeps struct {
SpaceId string SpaceId string
ObjectSync objectsync.ObjectSync ObjectSync objectsync.ObjectSync
@ -65,15 +69,35 @@ type BuildDeps struct {
TreeStorage treestorage.TreeStorage TreeStorage treestorage.TreeStorage
TreeUsage *atomic.Int32 TreeUsage *atomic.Int32
SyncStatus syncstatus.StatusUpdater SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter
WaitTreeRemoteSync bool WaitTreeRemoteSync bool
} }
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { getPeer := func(ctx context.Context) (peerId string, err error) {
peerId, err := peer.CtxPeerId(ctx) peerId, err = peer.CtxPeerId(ctx)
if err == nil {
return
}
err = nil
log.WarnCtx(ctx, "peer not found in context, use responsible")
respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx)
if err != nil { if err != nil {
log.WarnCtx(ctx, "peer not found in context, use first responsible") return
peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] }
if len(respPeers) == 0 {
err = fmt.Errorf("no responsible peers")
return
}
// TODO: maybe we can check different peers here
peerId = respPeers[0].Id()
return
}
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
peerId, err := getPeer(ctx)
if err != nil {
return
} }
newTreeRequest := GetRequestFactory().CreateNewTreeRequest() newTreeRequest := GetRequestFactory().CreateNewTreeRequest()

View File

@ -16,6 +16,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
@ -122,6 +123,7 @@ type space struct {
aclList *syncacl.SyncAcl aclList *syncacl.SyncAcl
configuration nodeconf.Configuration configuration nodeconf.Configuration
settingsObject settings.SettingsObject settingsObject settings.SettingsObject
peerManager peermanager.PeerManager
handleQueue multiqueue.MultiQueue[HandleMessage] handleQueue multiqueue.MultiQueue[HandleMessage]
@ -295,6 +297,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
SpaceStorage: s.storage, SpaceStorage: s.storage,
TreeUsage: &s.treesUsed, TreeUsage: &s.treesUsed,
SyncStatus: s.syncStatus, SyncStatus: s.syncStatus,
PeerGetter: s.peerManager,
} }
return synctree.PutSyncTree(ctx, payload, deps) return synctree.PutSyncTree(ctx, payload, deps)
} }
@ -326,6 +329,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
TreeUsage: &s.treesUsed, TreeUsage: &s.treesUsed,
SyncStatus: s.syncStatus, SyncStatus: s.syncStatus,
WaitTreeRemoteSync: opts.WaitTreeRemoteSync, WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
PeerGetter: s.peerManager,
} }
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
} }

View File

@ -139,6 +139,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
cache: getter, cache: getter,
account: s.account, account: s.account,
configuration: lastConfiguration, configuration: lastConfiguration,
peerManager: peerManager,
storage: st, storage: st,
} }
return sp, nil return sp, nil