Move remote getter to separate entity

This commit is contained in:
mcrakhman 2023-02-05 11:31:14 +01:00
parent d3362ba85e
commit d3108b5e6c
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
2 changed files with 142 additions and 118 deletions

View File

@ -3,12 +3,10 @@ package synctree
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
@ -16,10 +14,8 @@ import (
"github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/nodeconf"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"sync/atomic"
"time"
)
var (
@ -80,120 +76,8 @@ type BuildDeps struct {
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
getPeers := func(ctx context.Context) (peerIds []string, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err == nil {
peerIds = []string{peerId}
return
}
err = nil
log.WarnCtx(ctx, "peer not found in context, use responsible")
respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx)
if err != nil {
return
}
if len(respPeers) == 0 {
err = fmt.Errorf("no responsible peers")
return
}
for _, p := range respPeers {
peerIds = append(peerIds, p.Id())
}
return
}
getTreeRemote := func(peerId string) (msg *treechangeproto.TreeSyncMessage, err error) {
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "")
if err != nil {
return
}
resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
if err != nil {
return
}
msg = &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(resp.Payload, msg)
return
}
waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
peerIdx := 0
Loop:
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id)
default:
break
}
availablePeers, err := getPeers(ctx)
if err != nil {
if !wait {
return nil, err
}
select {
// wait for peers to connect
case <-time.After(1 * time.Second):
continue Loop
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id)
}
}
peerIdx = peerIdx % len(availablePeers)
msg, err = getTreeRemote(availablePeers[peerIdx])
if err == nil || !wait {
return msg, err
}
peerIdx++
}
}
deps.TreeStorage, err = deps.SpaceStorage.TreeStorage(id)
if err == nil {
return buildSyncTree(ctx, false, deps)
}
if err != nil && err != treestorage.ErrUnknownTreeId {
return
}
status, err := deps.SpaceStorage.TreeDeletedStatus(id)
if err != nil {
return
}
if status != "" {
err = spacestorage.ErrTreeStorageAlreadyDeleted
return
}
resp, err := waitTree(deps.WaitTreeRemoteSync)
if err != nil {
return
}
if resp.GetContent().GetFullSyncResponse() == nil {
err = fmt.Errorf("expected to get full sync response, but got something else")
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := treestorage.TreeStorageCreatePayload{
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with in-memory storage and validating that it was without errors
log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree")
err = objecttree.ValidateRawTree(payload, deps.AclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload)
remoteGetter := treeRemoteGetter{treeId: id, deps: deps}
deps.TreeStorage, err = remoteGetter.getTree(ctx)
if err != nil {
return
}

View File

@ -0,0 +1,140 @@
package synctree
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/net/peer"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"time"
)
type treeRemoteGetter struct {
deps BuildDeps
treeId string
}
func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter {
return treeRemoteGetter{treeId: treeId, deps: deps}
}
func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err == nil {
peerIds = []string{peerId}
return
}
err = nil
log.WarnCtx(ctx, "peer not found in context, use responsible")
respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx)
if err != nil {
return
}
if len(respPeers) == 0 {
err = fmt.Errorf("no responsible peers")
return
}
for _, p := range respPeers {
peerIds = append(peerIds, p.Id())
}
return
}
func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) {
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "")
if err != nil {
return
}
resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
if err != nil {
return
}
msg = &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(resp.Payload, msg)
return
}
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
peerIdx := 0
Loop:
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
default:
break
}
availablePeers, err := t.getPeers(ctx)
if err != nil {
if !wait {
return nil, err
}
select {
// wait for peers to connect
case <-time.After(1 * time.Second):
continue Loop
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
}
}
peerIdx = peerIdx % len(availablePeers)
msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
if err == nil || !wait {
return msg, err
}
peerIdx++
}
}
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) {
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
if err == nil {
return
}
if err != nil && err != treestorage.ErrUnknownTreeId {
return
}
status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId)
if err != nil {
return
}
if status != "" {
err = spacestorage.ErrTreeStorageAlreadyDeleted
return
}
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync)
if err != nil {
return
}
if resp.GetContent().GetFullSyncResponse() == nil {
err = fmt.Errorf("expected to get full sync response, but got something else")
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := treestorage.TreeStorageCreatePayload{
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with in-memory storage and validating that it was without errors
log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree")
err = objecttree.ValidateRawTree(payload, t.deps.AclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
return t.deps.SpaceStorage.CreateTreeStorage(payload)
}