From 2aaa8f4a0c2fa905baf56c50422458c343d8e08f Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 13 Jun 2023 15:21:11 +0200 Subject: [PATCH] Change retry logic and add tests --- commonspace/object/tree/synctree/synctree.go | 26 ++-- .../object/tree/synctree/treeremotegetter.go | 43 +++--- .../tree/synctree/treeremotegetter_test.go | 142 ++++++++++++++++++ commonspace/objecttreebuilder/treebuilder.go | 34 +++-- commonspace/requestmanager/requestmanager.go | 3 +- commonspace/settings/settings.go | 6 +- 6 files changed, 202 insertions(+), 52 deletions(-) create mode 100644 commonspace/object/tree/synctree/treeremotegetter_test.go diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index cd6ff7a2..a21121bf 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -59,19 +59,19 @@ type ResponsiblePeersGetter interface { } type BuildDeps struct { - SpaceId string - SyncClient SyncClient - Configuration nodeconf.NodeConf - HeadNotifiable HeadNotifiable - Listener updatelistener.UpdateListener - AclList list.AclList - SpaceStorage spacestorage.SpaceStorage - TreeStorage treestorage.TreeStorage - OnClose func(id string) - SyncStatus syncstatus.StatusUpdater - PeerGetter ResponsiblePeersGetter - BuildObjectTree objecttree.BuildObjectTreeFunc - WaitTreeRemoteSync bool + SpaceId string + SyncClient SyncClient + Configuration nodeconf.NodeConf + HeadNotifiable HeadNotifiable + Listener updatelistener.UpdateListener + AclList list.AclList + SpaceStorage spacestorage.SpaceStorage + TreeStorage treestorage.TreeStorage + OnClose func(id string) + SyncStatus syncstatus.StatusUpdater + PeerGetter ResponsiblePeersGetter + BuildObjectTree objecttree.BuildObjectTreeFunc + RetryTimeout time.Duration } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go index bee53231..843a37b4 100644 --- a/commonspace/object/tree/synctree/treeremotegetter.go +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -2,7 +2,10 @@ package synctree import ( "context" + "errors" "fmt" + "time" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" @@ -11,7 +14,13 @@ import ( "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/gogo/protobuf/proto" "go.uber.org/zap" - "time" +) + +var ( + newRequestTimeout = 1 * time.Second + + ErrRetryTimeout = errors.New("failed to retry request") + ErrNoResponsiblePeers = errors.New("no responsible peers") ) type treeRemoteGetter struct { @@ -57,36 +66,32 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg * return } -func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { +func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, retryTimeout time.Duration) (msg *treechangeproto.TreeSyncMessage, err error) { peerIdx := 0 -Loop: + reconnectCtx, cancel := context.WithTimeout(ctx, retryTimeout) + defer cancel() for { - select { - case <-ctx.Done(): - return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) - default: - break - } availablePeers, err := t.getPeers(ctx) if err != nil { - if !wait { + if retryTimeout == 0 { return nil, err } - select { - // wait for peers to connect - case <-time.After(1 * time.Second): - continue Loop - case <-ctx.Done(): - return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) - } + goto Wait } peerIdx = peerIdx % len(availablePeers) msg, err = t.treeRequest(ctx, availablePeers[peerIdx]) - if err == nil || !wait { + if err == nil || retryTimeout == 0 { return msg, err } peerIdx++ + Wait: + select { + case <-time.After(newRequestTimeout): + break + case <-reconnectCtx.Done(): + return nil, ErrRetryTimeout + } } } @@ -109,7 +114,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage. } isRemote = true - resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync) + resp, err := t.treeRequestLoop(ctx, t.deps.RetryTimeout) if err != nil { return } diff --git a/commonspace/object/tree/synctree/treeremotegetter_test.go b/commonspace/object/tree/synctree/treeremotegetter_test.go new file mode 100644 index 00000000..1594f491 --- /dev/null +++ b/commonspace/object/tree/synctree/treeremotegetter_test.go @@ -0,0 +1,142 @@ +package synctree + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anyproto/any-sync/commonspace/peermanager/mock_peermanager" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/peer/mock_peer" + "github.com/gogo/protobuf/proto" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" +) + +type treeRemoteGetterFixture struct { + ctrl *gomock.Controller + + treeGetter treeRemoteGetter + syncClientMock *mock_synctree.MockSyncClient + peerGetterMock *mock_peermanager.MockPeerManager +} + +func newTreeRemoteGetterFixture(t *testing.T) *treeRemoteGetterFixture { + ctrl := gomock.NewController(t) + syncClientMock := mock_synctree.NewMockSyncClient(ctrl) + peerGetterMock := mock_peermanager.NewMockPeerManager(ctrl) + treeGetter := treeRemoteGetter{ + deps: BuildDeps{ + SyncClient: syncClientMock, + PeerGetter: peerGetterMock, + }, + treeId: "treeId", + } + return &treeRemoteGetterFixture{ + ctrl: ctrl, + treeGetter: treeGetter, + syncClientMock: syncClientMock, + peerGetterMock: peerGetterMock, + } +} + +func (fx *treeRemoteGetterFixture) stop() { + fx.ctrl.Finish() +} + +func TestTreeRemoteGetter(t *testing.T) { + newRequestTimeout = 20 * time.Millisecond + retryTimeout := 2 * newRequestTimeout + ctx := context.Background() + peerId := "peerId" + treeRequest := &treechangeproto.TreeSyncMessage{} + treeResponse := &treechangeproto.TreeSyncMessage{ + RootChange: &treechangeproto.RawTreeChangeWithId{Id: "id"}, + } + marshalled, _ := proto.Marshal(treeResponse) + objectResponse := &spacesyncproto.ObjectSyncMessage{ + Payload: marshalled, + } + + t.Run("request works", func(t *testing.T) { + fx := newTreeRemoteGetterFixture(t) + defer fx.stop() + mockPeer := mock_peer.NewMockPeer(fx.ctrl) + mockPeer.EXPECT().Id().AnyTimes().Return(peerId) + fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil) + fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil) + resp, err := fx.treeGetter.treeRequestLoop(ctx, 0) + require.NoError(t, err) + require.Equal(t, "id", resp.RootChange.Id) + }) + + t.Run("request peerId from context", func(t *testing.T) { + fx := newTreeRemoteGetterFixture(t) + defer fx.stop() + ctx := peer.CtxWithPeerId(ctx, peerId) + fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil) + resp, err := fx.treeGetter.treeRequestLoop(ctx, 0) + require.NoError(t, err) + require.Equal(t, "id", resp.RootChange.Id) + }) + + t.Run("request fails", func(t *testing.T) { + fx := newTreeRemoteGetterFixture(t) + defer fx.stop() + mockPeer := mock_peer.NewMockPeer(fx.ctrl) + mockPeer.EXPECT().Id().AnyTimes().Return(peerId) + fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil) + fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(nil, fmt.Errorf("failed")) + _, err := fx.treeGetter.treeRequestLoop(ctx, 0) + require.Error(t, err) + require.NotEqual(t, ErrRetryTimeout, err) + }) + + t.Run("retry request success", func(t *testing.T) { + fx := newTreeRemoteGetterFixture(t) + defer fx.stop() + mockPeer := mock_peer.NewMockPeer(fx.ctrl) + mockPeer.EXPECT().Id().AnyTimes().Return(peerId) + fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil) + fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, fmt.Errorf("some")) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil) + resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) + require.NoError(t, err) + require.Equal(t, "id", resp.RootChange.Id) + }) + + t.Run("retry get peers success", func(t *testing.T) { + fx := newTreeRemoteGetterFixture(t) + defer fx.stop() + mockPeer := mock_peer.NewMockPeer(fx.ctrl) + mockPeer.EXPECT().Id().AnyTimes().Return(peerId) + fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{}, nil) + fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{mockPeer}, nil) + fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil) + resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) + require.NoError(t, err) + require.Equal(t, "id", resp.RootChange.Id) + }) + + t.Run("retry request fail", func(t *testing.T) { + fx := newTreeRemoteGetterFixture(t) + defer fx.stop() + treeRequest := &treechangeproto.TreeSyncMessage{} + mockPeer := mock_peer.NewMockPeer(fx.ctrl) + mockPeer.EXPECT().Id().AnyTimes().Return(peerId) + fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil) + fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest) + fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some")) + _, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout) + require.Equal(t, ErrRetryTimeout, err) + }) +} diff --git a/commonspace/objecttreebuilder/treebuilder.go b/commonspace/objecttreebuilder/treebuilder.go index 058efe16..9b7f9ab0 100644 --- a/commonspace/objecttreebuilder/treebuilder.go +++ b/commonspace/objecttreebuilder/treebuilder.go @@ -4,6 +4,9 @@ package objecttreebuilder import ( "context" "errors" + "sync/atomic" + "time" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/headsync" @@ -22,13 +25,12 @@ import ( "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/nodeconf" "go.uber.org/zap" - "sync/atomic" ) type BuildTreeOpts struct { - Listener updatelistener.UpdateListener - WaitTreeRemoteSync bool - TreeBuilder objecttree.BuildObjectTreeFunc + Listener updatelistener.UpdateListener + RetryTimeout time.Duration + TreeBuilder objecttree.BuildObjectTreeFunc } const CName = "common.commonspace.objecttreebuilder" @@ -110,18 +112,18 @@ func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOp treeBuilder = t.builder } deps := synctree.BuildDeps{ - SpaceId: t.spaceId, - SyncClient: t.syncClient, - Configuration: t.configuration, - HeadNotifiable: t.headsNotifiable, - Listener: opts.Listener, - AclList: t.aclList, - SpaceStorage: t.spaceStorage, - OnClose: t.onClose, - SyncStatus: t.syncStatus, - WaitTreeRemoteSync: opts.WaitTreeRemoteSync, - PeerGetter: t.peerManager, - BuildObjectTree: treeBuilder, + SpaceId: t.spaceId, + SyncClient: t.syncClient, + Configuration: t.configuration, + HeadNotifiable: t.headsNotifiable, + Listener: opts.Listener, + AclList: t.aclList, + SpaceStorage: t.spaceStorage, + OnClose: t.onClose, + SyncStatus: t.syncStatus, + RetryTimeout: opts.RetryTimeout, + PeerGetter: t.peerManager, + BuildObjectTree: treeBuilder, } t.treesUsed.Add(1) t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load())) diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index d72ea394..ea7a2b05 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -2,6 +2,8 @@ package requestmanager import ( "context" + "sync" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/objectsync" @@ -11,7 +13,6 @@ import ( "github.com/anyproto/any-sync/net/streampool" "go.uber.org/zap" "storj.io/drpc" - "sync" ) const CName = "common.commonspace.requestmanager" diff --git a/commonspace/settings/settings.go b/commonspace/settings/settings.go index eab41709..c993c715 100644 --- a/commonspace/settings/settings.go +++ b/commonspace/settings/settings.go @@ -2,6 +2,8 @@ package settings import ( "context" + "sync/atomic" + "github.com/anyproto/any-sync/accountservice" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/deletionstate" @@ -16,7 +18,6 @@ import ( "github.com/anyproto/any-sync/commonspace/spacestorage" "github.com/anyproto/any-sync/nodeconf" "go.uber.org/zap" - "sync/atomic" ) const CName = "common.commonspace.settings" @@ -61,8 +62,7 @@ func (s *settings) Init(a *app.App) (err error) { deps := Deps{ BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) { res, err := s.treeBuilder.BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{ - Listener: listener, - WaitTreeRemoteSync: false, + Listener: listener, // space settings document should not have empty data TreeBuilder: objecttree.BuildObjectTree, })