Move remote getter to separate entity
This commit is contained in:
parent
2185f18d70
commit
2f899fe95c
@ -3,12 +3,10 @@ package synctree
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||
@ -16,10 +14,8 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/nodeconf"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -80,120 +76,8 @@ type BuildDeps struct {
|
||||
}
|
||||
|
||||
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
||||
getPeers := func(ctx context.Context) (peerIds []string, err error) {
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
if err == nil {
|
||||
peerIds = []string{peerId}
|
||||
return
|
||||
}
|
||||
err = nil
|
||||
log.WarnCtx(ctx, "peer not found in context, use responsible")
|
||||
respPeers, err := deps.PeerGetter.GetResponsiblePeers(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(respPeers) == 0 {
|
||||
err = fmt.Errorf("no responsible peers")
|
||||
return
|
||||
}
|
||||
for _, p := range respPeers {
|
||||
peerIds = append(peerIds, p.Id())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
getTreeRemote := func(peerId string) (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
|
||||
objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg = &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(resp.Payload, msg)
|
||||
return
|
||||
}
|
||||
|
||||
waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
peerIdx := 0
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id)
|
||||
default:
|
||||
break
|
||||
}
|
||||
availablePeers, err := getPeers(ctx)
|
||||
if err != nil {
|
||||
if !wait {
|
||||
return nil, err
|
||||
}
|
||||
select {
|
||||
// wait for peers to connect
|
||||
case <-time.After(1 * time.Second):
|
||||
continue Loop
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", id)
|
||||
}
|
||||
}
|
||||
|
||||
peerIdx = peerIdx % len(availablePeers)
|
||||
msg, err = getTreeRemote(availablePeers[peerIdx])
|
||||
if err == nil || !wait {
|
||||
return msg, err
|
||||
}
|
||||
peerIdx++
|
||||
}
|
||||
}
|
||||
|
||||
deps.TreeStorage, err = deps.SpaceStorage.TreeStorage(id)
|
||||
if err == nil {
|
||||
return buildSyncTree(ctx, false, deps)
|
||||
}
|
||||
|
||||
if err != nil && err != treestorage.ErrUnknownTreeId {
|
||||
return
|
||||
}
|
||||
|
||||
status, err := deps.SpaceStorage.TreeDeletedStatus(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if status != "" {
|
||||
err = spacestorage.ErrTreeStorageAlreadyDeleted
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := waitTree(deps.WaitTreeRemoteSync)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if resp.GetContent().GetFullSyncResponse() == nil {
|
||||
err = fmt.Errorf("expected to get full sync response, but got something else")
|
||||
return
|
||||
}
|
||||
fullSyncResp := resp.GetContent().GetFullSyncResponse()
|
||||
|
||||
payload := treestorage.TreeStorageCreatePayload{
|
||||
RootRawChange: resp.RootChange,
|
||||
Changes: fullSyncResp.Changes,
|
||||
Heads: fullSyncResp.Heads,
|
||||
}
|
||||
|
||||
// basically building tree with in-memory storage and validating that it was without errors
|
||||
log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree")
|
||||
err = objecttree.ValidateRawTree(payload, deps.AclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// now we are sure that we can save it to the storage
|
||||
deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload)
|
||||
remoteGetter := treeRemoteGetter{treeId: id, deps: deps}
|
||||
deps.TreeStorage, err = remoteGetter.getTree(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
140
commonspace/object/tree/synctree/treeremotegetter.go
Normal file
140
commonspace/object/tree/synctree/treeremotegetter.go
Normal file
@ -0,0 +1,140 @@
|
||||
package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"go.uber.org/zap"
|
||||
"time"
|
||||
)
|
||||
|
||||
type treeRemoteGetter struct {
|
||||
deps BuildDeps
|
||||
treeId string
|
||||
}
|
||||
|
||||
func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter {
|
||||
return treeRemoteGetter{treeId: treeId, deps: deps}
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) {
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
if err == nil {
|
||||
peerIds = []string{peerId}
|
||||
return
|
||||
}
|
||||
err = nil
|
||||
log.WarnCtx(ctx, "peer not found in context, use responsible")
|
||||
respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if len(respPeers) == 0 {
|
||||
err = fmt.Errorf("no responsible peers")
|
||||
return
|
||||
}
|
||||
for _, p := range respPeers {
|
||||
peerIds = append(peerIds, p.Id())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
|
||||
objMsg, err := marshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg = &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(resp.Payload, msg)
|
||||
return
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
peerIdx := 0
|
||||
Loop:
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
|
||||
default:
|
||||
break
|
||||
}
|
||||
availablePeers, err := t.getPeers(ctx)
|
||||
if err != nil {
|
||||
if !wait {
|
||||
return nil, err
|
||||
}
|
||||
select {
|
||||
// wait for peers to connect
|
||||
case <-time.After(1 * time.Second):
|
||||
continue Loop
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
|
||||
}
|
||||
}
|
||||
|
||||
peerIdx = peerIdx % len(availablePeers)
|
||||
msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
|
||||
if err == nil || !wait {
|
||||
return msg, err
|
||||
}
|
||||
peerIdx++
|
||||
}
|
||||
}
|
||||
|
||||
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) {
|
||||
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err != nil && err != treestorage.ErrUnknownTreeId {
|
||||
return
|
||||
}
|
||||
|
||||
status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if status != "" {
|
||||
err = spacestorage.ErrTreeStorageAlreadyDeleted
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if resp.GetContent().GetFullSyncResponse() == nil {
|
||||
err = fmt.Errorf("expected to get full sync response, but got something else")
|
||||
return
|
||||
}
|
||||
fullSyncResp := resp.GetContent().GetFullSyncResponse()
|
||||
|
||||
payload := treestorage.TreeStorageCreatePayload{
|
||||
RootRawChange: resp.RootChange,
|
||||
Changes: fullSyncResp.Changes,
|
||||
Heads: fullSyncResp.Heads,
|
||||
}
|
||||
|
||||
// basically building tree with in-memory storage and validating that it was without errors
|
||||
log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree")
|
||||
err = objecttree.ValidateRawTree(payload, t.deps.AclList)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
// now we are sure that we can save it to the storage
|
||||
return t.deps.SpaceStorage.CreateTreeStorage(payload)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user