Merge pull request #21 from anyproto/add-treegetter-get-timeout

This commit is contained in:
Mikhail Rakhmanov 2023-06-13 15:51:32 +02:00 committed by GitHub
commit 759c48c6b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 205 additions and 58 deletions

View File

@ -59,19 +59,19 @@ type ResponsiblePeersGetter interface {
}
type BuildDeps struct {
SpaceId string
SyncClient SyncClient
Configuration nodeconf.NodeConf
HeadNotifiable HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.AclList
SpaceStorage spacestorage.SpaceStorage
TreeStorage treestorage.TreeStorage
OnClose func(id string)
SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter
BuildObjectTree objecttree.BuildObjectTreeFunc
WaitTreeRemoteSync bool
SpaceId string
SyncClient SyncClient
Configuration nodeconf.NodeConf
HeadNotifiable HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.AclList
SpaceStorage spacestorage.SpaceStorage
TreeStorage treestorage.TreeStorage
OnClose func(id string)
SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter
BuildObjectTree objecttree.BuildObjectTreeFunc
RetryTimeout time.Duration
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {

View File

@ -2,7 +2,9 @@ package synctree
import (
"context"
"fmt"
"errors"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
@ -11,7 +13,13 @@ import (
"github.com/anyproto/any-sync/net/rpc/rpcerr"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"time"
)
var (
newRequestTimeout = 1 * time.Second
ErrRetryTimeout = errors.New("failed to retry request")
ErrNoResponsiblePeers = errors.New("no responsible peers")
)
type treeRemoteGetter struct {
@ -36,7 +44,7 @@ func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err e
return
}
if len(respPeers) == 0 {
err = fmt.Errorf("no responsible peers")
err = ErrNoResponsiblePeers
return
}
for _, p := range respPeers {
@ -57,36 +65,30 @@ func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *
return
}
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, retryTimeout time.Duration) (msg *treechangeproto.TreeSyncMessage, err error) {
peerIdx := 0
Loop:
retryCtx, cancel := context.WithTimeout(ctx, retryTimeout)
defer cancel()
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
default:
break
}
availablePeers, err := t.getPeers(ctx)
if err != nil {
if !wait {
if retryTimeout == 0 {
return nil, err
}
select {
// wait for peers to connect
case <-time.After(1 * time.Second):
continue Loop
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
} else {
peerIdx = peerIdx % len(availablePeers)
msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
if err == nil || retryTimeout == 0 {
return msg, err
}
peerIdx++
}
peerIdx = peerIdx % len(availablePeers)
msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
if err == nil || !wait {
return msg, err
select {
case <-time.After(newRequestTimeout):
break
case <-retryCtx.Done():
return nil, ErrRetryTimeout
}
peerIdx++
}
}
@ -109,7 +111,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
}
isRemote = true
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync)
resp, err := t.treeRequestLoop(ctx, t.deps.RetryTimeout)
if err != nil {
return
}

View File

@ -0,0 +1,142 @@
package synctree
import (
"context"
"fmt"
"testing"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/peermanager/mock_peermanager"
"github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/peer/mock_peer"
"github.com/gogo/protobuf/proto"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
)
type treeRemoteGetterFixture struct {
ctrl *gomock.Controller
treeGetter treeRemoteGetter
syncClientMock *mock_synctree.MockSyncClient
peerGetterMock *mock_peermanager.MockPeerManager
}
func newTreeRemoteGetterFixture(t *testing.T) *treeRemoteGetterFixture {
ctrl := gomock.NewController(t)
syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
peerGetterMock := mock_peermanager.NewMockPeerManager(ctrl)
treeGetter := treeRemoteGetter{
deps: BuildDeps{
SyncClient: syncClientMock,
PeerGetter: peerGetterMock,
},
treeId: "treeId",
}
return &treeRemoteGetterFixture{
ctrl: ctrl,
treeGetter: treeGetter,
syncClientMock: syncClientMock,
peerGetterMock: peerGetterMock,
}
}
func (fx *treeRemoteGetterFixture) stop() {
fx.ctrl.Finish()
}
func TestTreeRemoteGetter(t *testing.T) {
newRequestTimeout = 20 * time.Millisecond
retryTimeout := 2 * newRequestTimeout
ctx := context.Background()
peerId := "peerId"
treeRequest := &treechangeproto.TreeSyncMessage{}
treeResponse := &treechangeproto.TreeSyncMessage{
RootChange: &treechangeproto.RawTreeChangeWithId{Id: "id"},
}
marshalled, _ := proto.Marshal(treeResponse)
objectResponse := &spacesyncproto.ObjectSyncMessage{
Payload: marshalled,
}
t.Run("request works", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("request peerId from context", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
ctx := peer.CtxWithPeerId(ctx, peerId)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("request fails", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Return(nil, fmt.Errorf("failed"))
_, err := fx.treeGetter.treeRequestLoop(ctx, 0)
require.Error(t, err)
require.NotEqual(t, ErrRetryTimeout, err)
})
t.Run("retry request success", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(nil, fmt.Errorf("some"))
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("retry get peers success", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{}, nil)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).Times(1).Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).Times(1).Return(objectResponse, nil)
resp, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.NoError(t, err)
require.Equal(t, "id", resp.RootChange.Id)
})
t.Run("retry request fail", func(t *testing.T) {
fx := newTreeRemoteGetterFixture(t)
defer fx.stop()
treeRequest := &treechangeproto.TreeSyncMessage{}
mockPeer := mock_peer.NewMockPeer(fx.ctrl)
mockPeer.EXPECT().Id().AnyTimes().Return(peerId)
fx.peerGetterMock.EXPECT().GetResponsiblePeers(ctx).AnyTimes().Return([]peer.Peer{mockPeer}, nil)
fx.syncClientMock.EXPECT().CreateNewTreeRequest().AnyTimes().Return(treeRequest)
fx.syncClientMock.EXPECT().SendRequest(ctx, peerId, fx.treeGetter.treeId, treeRequest).AnyTimes().Return(nil, fmt.Errorf("some"))
_, err := fx.treeGetter.treeRequestLoop(ctx, retryTimeout)
require.Equal(t, ErrRetryTimeout, err)
})
}

View File

@ -4,6 +4,9 @@ package objecttreebuilder
import (
"context"
"errors"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/headsync"
@ -22,13 +25,12 @@ import (
"github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
"sync/atomic"
)
type BuildTreeOpts struct {
Listener updatelistener.UpdateListener
WaitTreeRemoteSync bool
TreeBuilder objecttree.BuildObjectTreeFunc
Listener updatelistener.UpdateListener
RetryTimeout time.Duration
TreeBuilder objecttree.BuildObjectTreeFunc
}
const CName = "common.commonspace.objecttreebuilder"
@ -110,18 +112,18 @@ func (t *treeBuilder) BuildTree(ctx context.Context, id string, opts BuildTreeOp
treeBuilder = t.builder
}
deps := synctree.BuildDeps{
SpaceId: t.spaceId,
SyncClient: t.syncClient,
Configuration: t.configuration,
HeadNotifiable: t.headsNotifiable,
Listener: opts.Listener,
AclList: t.aclList,
SpaceStorage: t.spaceStorage,
OnClose: t.onClose,
SyncStatus: t.syncStatus,
WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
PeerGetter: t.peerManager,
BuildObjectTree: treeBuilder,
SpaceId: t.spaceId,
SyncClient: t.syncClient,
Configuration: t.configuration,
HeadNotifiable: t.headsNotifiable,
Listener: opts.Listener,
AclList: t.aclList,
SpaceStorage: t.spaceStorage,
OnClose: t.onClose,
SyncStatus: t.syncStatus,
RetryTimeout: opts.RetryTimeout,
PeerGetter: t.peerManager,
BuildObjectTree: treeBuilder,
}
t.treesUsed.Add(1)
t.log.Debug("incrementing counter", zap.String("id", id), zap.Int32("trees", t.treesUsed.Load()))

View File

@ -2,6 +2,8 @@ package requestmanager
import (
"context"
"sync"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/objectsync"
@ -11,7 +13,6 @@ import (
"github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"storj.io/drpc"
"sync"
)
const CName = "common.commonspace.requestmanager"

View File

@ -2,6 +2,8 @@ package settings
import (
"context"
"sync/atomic"
"github.com/anyproto/any-sync/accountservice"
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/deletionstate"
@ -16,7 +18,6 @@ import (
"github.com/anyproto/any-sync/commonspace/spacestorage"
"github.com/anyproto/any-sync/nodeconf"
"go.uber.org/zap"
"sync/atomic"
)
const CName = "common.commonspace.settings"
@ -61,8 +62,7 @@ func (s *settings) Init(a *app.App) (err error) {
deps := Deps{
BuildFunc: func(ctx context.Context, id string, listener updatelistener.UpdateListener) (t synctree.SyncTree, err error) {
res, err := s.treeBuilder.BuildTree(ctx, id, objecttreebuilder.BuildTreeOpts{
Listener: listener,
WaitTreeRemoteSync: false,
Listener: listener,
// space settings document should not have empty data
TreeBuilder: objecttree.BuildObjectTree,
})