Change retry logic and add tests

This commit is contained in:
mcrakhman 2023-06-13 15:21:11 +02:00
parent fb1df54941
commit 2aaa8f4a0c
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
6 changed files with 202 additions and 52 deletions

View File

@ -59,19 +59,19 @@ type ResponsiblePeersGetter interface {
} }
type BuildDeps struct { type BuildDeps struct {
SpaceId string SpaceId string
SyncClient SyncClient SyncClient SyncClient
Configuration nodeconf.NodeConf Configuration nodeconf.NodeConf
HeadNotifiable HeadNotifiable HeadNotifiable HeadNotifiable
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener
AclList list.AclList AclList list.AclList
SpaceStorage spacestorage.SpaceStorage SpaceStorage spacestorage.SpaceStorage
TreeStorage treestorage.TreeStorage TreeStorage treestorage.TreeStorage
OnClose func(id string) OnClose func(id string)
SyncStatus syncstatus.StatusUpdater SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter PeerGetter ResponsiblePeersGetter
BuildObjectTree objecttree.BuildObjectTreeFunc BuildObjectTree objecttree.BuildObjectTreeFunc
WaitTreeRemoteSync bool RetryTimeout time.Duration
} }
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {

View File

@ -2,7 +2,10 @@ package synctree
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
@ -11,7 +14,13 @@ import (
"github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.uber.org/zap" "go.uber.org/zap"
"time" )
var (
newRequestTimeout = 1 * time.Second
ErrRetryTimeout = errors.New("failed to retry request")
ErrNoResponsiblePeers = errors.New("no responsible peers")
) )
type treeRemoteGetter struct { type treeRemoteGetter struct {
@ -57,36 +66,32 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *
return return
} }
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, retryTimeout time.Duration) (msg *treechangeproto.TreeSyncMessage, err error) {
peerIdx := 0 peerIdx := 0
Loop: reconnectCtx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()
for { for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
default:
break
}
availablePeers, err := t.getPeers(ctx) availablePeers, err := t.getPeers(ctx)
if err != nil { if err != nil {
if !wait { if retryTimeout == 0 {
return nil, err return nil, err
} }
select { goto Wait
// wait for peers to connect
case <-time.After(1 * time.Second):
continue Loop
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
}
} }
peerIdx = peerIdx % len(availablePeers) peerIdx = peerIdx % len(availablePeers)
msg, err = t.treeRequest(ctx, availablePeers[peerIdx]) msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
if err == nil || !wait { if err == nil || retryTimeout == 0 {
return msg, err return msg, err
} }
peerIdx++ peerIdx++
Wait:
select {
case <-time.After(newRequestTimeout):
break
case <-reconnectCtx.Done():
return nil, ErrRetryTimeout
}
} }
} }
@ -109,7 +114,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
} }
isRemote = true isRemote = true
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync) resp, err := t.treeRequestLoop(ctx, t.deps.RetryTimeout)
if err != nil { if err != nil {
return return
} }

View File

@ -0,0 +1,142 @@
package synctree
import (
"context"
"fmt"
"testing"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager/mock_peermanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/gogo/protobuf/proto"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
type treeRemoteGetterFixture struct {
ctrl *gomock.Controller
treeGetter treeRemoteGetter
syncClientMock *mock_synctree.MockSyncClient
peerGetterMock *mock_peermanager.MockPeerManager
}
func newTreeRemoteGetterFixture(t *testing.T) *treeRemoteGetterFixture {
ctrl := gomock.NewController(t)
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
peerGetterMock := mock_peermanager.NewMockPeerManager(ctrl)
treeGetter := treeRemoteGetter{
deps: BuildDeps{
SyncClient: syncClientMock,
PeerGetter: peerGetterMock,
},
treeId: "treeId",
}
return &treeRemoteGetterFixture{
ctrl: ctrl,
treeGetter: treeGetter,
syncClientMock: syncClientMock,
peerGetterMock: peerGetterMock,
}
}
func (fx *treeRemoteGetterFixture) stop() {
fx.ctrl.Finish()
}
func TestTreeRemoteGetter(t *testing.T) {
newRequestTimeout = 20 * time.Millisecond
retryTimeout := 2 * newRequestTimeout
ctx := context.Background()
peerId := "peerId"
treeRequest := &treechangeproto.TreeSyncMessage{}
treeResponse := &treechangeproto.TreeSyncMessage{
RootChange: &treechangeproto.RawTreeChangeWithId{Id: "id"},
}
marshalled, _ := proto.Marshal(treeResponse)
objectResponse := &spacesyncproto.ObjectSyncMessage{
Payload: marshalled,
}
t.Run("request works", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("request peerId from context", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
ctx := peer.CtxWithPeerId(ctx, peerId)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("request fails", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(nil, fmt.Errorf("failed"))
_, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.Error(t, err)
require.NotEqual(t, ErrRetryTimeout, err)
})
t.Run("retry request success", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, fmt.Errorf("some"))
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("retry get peers success", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{}, nil)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("retry request fail", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
treeRequest := &treechangeproto.TreeSyncMessage{}
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
_, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.Equal(t, ErrRetryTimeout, err)
})
}

View File

@ -4,6 +4,9 @@ package objecttreebuilder
import ( import (
"context" "context"
"errors" "errors"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/headsync" "github.com/anyproto/any-sync/commonspace/headsync"
@ -22,13 +25,12 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap" "go.uber.org/zap"
"sync/atomic"
) )
type BuildTreeOpts struct { type BuildTreeOpts struct {
Listener updatelistener.UpdateListener Listener updatelistener.UpdateListener
WaitTreeRemoteSync bool RetryTimeout time.Duration
TreeBuilder objecttree.BuildObjectTreeFunc TreeBuilder objecttree.BuildObjectTreeFunc
} }
const CName = "common.commonspace.objecttreebuilder" const CName = "common.commonspace.objecttreebuilder"
@ -110,18 +112,18 @@ func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOp
treeBuilder = t.builder treeBuilder = t.builder
} }
deps := synctree.BuildDeps{ deps := synctree.BuildDeps{
SpaceId: t.spaceId, SpaceId: t.spaceId,
SyncClient: t.syncClient, SyncClient: t.syncClient,
Configuration: t.configuration, Configuration: t.configuration,
HeadNotifiable: t.headsNotifiable, HeadNotifiable: t.headsNotifiable,
Listener: opts.Listener, Listener: opts.Listener,
AclList: t.aclList, AclList: t.aclList,
SpaceStorage: t.spaceStorage, SpaceStorage: t.spaceStorage,
OnClose: t.onClose, OnClose: t.onClose,
SyncStatus: t.syncStatus, SyncStatus: t.syncStatus,
WaitTreeRemoteSync: opts.WaitTreeRemoteSync, RetryTimeout: opts.RetryTimeout,
PeerGetter: t.peerManager, PeerGetter: t.peerManager,
BuildObjectTree: treeBuilder, BuildObjectTree: treeBuilder,
} }
t.treesUsed.Add(1) t.treesUsed.Add(1)
t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load())) t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()))

View File

@ -2,6 +2,8 @@ package requestmanager
import ( import (
"context" "context"
"sync"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/objectsync" "github.com/anyproto/any-sync/commonspace/objectsync"
@ -11,7 +13,6 @@ import (
"github.com/anyproto/any-sync/net/streampool" "github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/drpc" "storj.io/drpc"
"sync"
) )
const CName = "common.commonspace.requestmanager" const CName = "common.commonspace.requestmanager"

View File

@ -2,6 +2,8 @@ package settings
import ( import (
"context" "context"
"sync/atomic"
"github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/deletionstate" "github.com/anyproto/any-sync/commonspace/deletionstate"
@ -16,7 +18,6 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap" "go.uber.org/zap"
"sync/atomic"
) )
const CName = "common.commonspace.settings" const CName = "common.commonspace.settings"
@ -61,8 +62,7 @@ func (s *settings) Init(a *app.App) (err error) {
deps := Deps{ deps := Deps{
BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) { BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) {
res, err := s.treeBuilder.BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{ res, err := s.treeBuilder.BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{
Listener: listener, Listener: listener,
WaitTreeRemoteSync: false,
// space settings document should not have empty data // space settings document should not have empty data
TreeBuilder: objecttree.BuildObjectTree, TreeBuilder: objecttree.BuildObjectTree,
}) })