From 2f899fe95c903e731fbde8d59808caa6b3d07cd4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 5 Feb 2023 11:31:14 +0100 Subject: [PATCH] Move remote getter to separate entity --- commonspace/object/tree/synctree/synctree.go | 120 +-------------- .../object/tree/synctree/treeremotegetter.go | 140 ++++++++++++++++++ 2 files changed, 142 insertions(+), 118 deletions(-) create mode 100644 commonspace/object/tree/synctree/treeremotegetter.go diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 45e4bed4..c63dcc96 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -3,12 +3,10 @@ package synctree import ( "context" "errors" - "fmt" "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" - "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" @@ -16,10 +14,8 @@ import ( "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" - "github.com/gogo/protobuf/proto" "go.uber.org/zap" "sync/atomic" - "time" ) var ( @@ -80,120 +76,8 @@ type BuildDeps struct { } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - getPeers := func(ctx context.Context) (peerIds []string, err error) { - peerId, err := peer.CtxPeerId(ctx) - if err == nil { - peerIds = []string{peerId} - return - } - err = nil - log.WarnCtx(ctx, "peer not found in context, use responsible") - respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx) - if err != nil { - return - } - if len(respPeers) == 0 { - err = fmt.Errorf("no responsible peers") - return - } - for _, p := range respPeers { - peerIds = append(peerIds, p.Id()) - } - return - } - - getTreeRemote := func(peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { - newTreeRequest := GetRequestFactory().CreateNewTreeRequest() - objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "") - if err != nil { - return - } - - resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) - if err != nil { - return - } - - msg = &treechangeproto.TreeSyncMessage{} - err = proto.Unmarshal(resp.Payload, msg) - return - } - - waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { - peerIdx := 0 - Loop: - for { - select { - case <-ctx.Done(): - return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id) - default: - break - } - availablePeers, err := getPeers(ctx) - if err != nil { - if !wait { - return nil, err - } - select { - // wait for peers to connect - case <-time.After(1 * time.Second): - continue Loop - case <-ctx.Done(): - return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id) - } - } - - peerIdx = peerIdx % len(availablePeers) - msg, err = getTreeRemote(availablePeers[peerIdx]) - if err == nil || !wait { - return msg, err - } - peerIdx++ - } - } - - deps.TreeStorage, err = deps.SpaceStorage.TreeStorage(id) - if err == nil { - return buildSyncTree(ctx, false, deps) - } - - if err != nil && err != treestorage.ErrUnknownTreeId { - return - } - - status, err := deps.SpaceStorage.TreeDeletedStatus(id) - if err != nil { - return - } - if status != "" { - err = spacestorage.ErrTreeStorageAlreadyDeleted - return - } - - resp, err := waitTree(deps.WaitTreeRemoteSync) - if err != nil { - return - } - if resp.GetContent().GetFullSyncResponse() == nil { - err = fmt.Errorf("expected to get full sync response, but got something else") - return - } - fullSyncResp := resp.GetContent().GetFullSyncResponse() - - payload := treestorage.TreeStorageCreatePayload{ - RootRawChange: resp.RootChange, - Changes: fullSyncResp.Changes, - Heads: fullSyncResp.Heads, - } - - // basically building tree with in-memory storage and validating that it was without errors - log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree") - err = objecttree.ValidateRawTree(payload, deps.AclList) - if err != nil { - return - } - // now we are sure that we can save it to the storage - deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload) + remoteGetter := treeRemoteGetter{treeId: id, deps: deps} + deps.TreeStorage, err = remoteGetter.getTree(ctx) if err != nil { return } diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go new file mode 100644 index 00000000..561d850c --- /dev/null +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -0,0 +1,140 @@ +package synctree + +import ( + "context" + "fmt" + "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" + "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" + "github.com/anytypeio/any-sync/commonspace/spacestorage" + "github.com/anytypeio/any-sync/net/peer" + "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "time" +) + +type treeRemoteGetter struct { + deps BuildDeps + treeId string +} + +func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter { + return treeRemoteGetter{treeId: treeId, deps: deps} +} + +func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) { + peerId, err := peer.CtxPeerId(ctx) + if err == nil { + peerIds = []string{peerId} + return + } + err = nil + log.WarnCtx(ctx, "peer not found in context, use responsible") + respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx) + if err != nil { + return + } + if len(respPeers) == 0 { + err = fmt.Errorf("no responsible peers") + return + } + for _, p := range respPeers { + peerIds = append(peerIds, p.Id()) + } + return +} + +func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { + newTreeRequest := GetRequestFactory().CreateNewTreeRequest() + objMsg, err := marshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "") + if err != nil { + return + } + + resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) + if err != nil { + return + } + + msg = &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(resp.Payload, msg) + return +} + +func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { + peerIdx := 0 +Loop: + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) + default: + break + } + availablePeers, err := t.getPeers(ctx) + if err != nil { + if !wait { + return nil, err + } + select { + // wait for peers to connect + case <-time.After(1 * time.Second): + continue Loop + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) + } + } + + peerIdx = peerIdx % len(availablePeers) + msg, err = t.treeRequest(ctx, availablePeers[peerIdx]) + if err == nil || !wait { + return msg, err + } + peerIdx++ + } +} + +func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) { + treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId) + if err == nil { + return + } + + if err != nil && err != treestorage.ErrUnknownTreeId { + return + } + + status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId) + if err != nil { + return + } + if status != "" { + err = spacestorage.ErrTreeStorageAlreadyDeleted + return + } + + resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync) + if err != nil { + return + } + if resp.GetContent().GetFullSyncResponse() == nil { + err = fmt.Errorf("expected to get full sync response, but got something else") + return + } + fullSyncResp := resp.GetContent().GetFullSyncResponse() + + payload := treestorage.TreeStorageCreatePayload{ + RootRawChange: resp.RootChange, + Changes: fullSyncResp.Changes, + Heads: fullSyncResp.Heads, + } + + // basically building tree with in-memory storage and validating that it was without errors + log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree") + err = objecttree.ValidateRawTree(payload, t.deps.AclList) + if err != nil { + return + } + // now we are sure that we can save it to the storage + return t.deps.SpaceStorage.CreateTreeStorage(payload) +}