Merge pull request #32 from anyproto/sync-improvements

This commit is contained in:
Mikhail Rakhmanov 2023-06-20 13:24:47 +02:00 committed by GitHub
commit 1b47a54f87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 138 additions and 73 deletions

View File

@ -2,6 +2,10 @@ package synctree
import ( import (
"context" "context"
"math/rand"
"testing"
"time"
"github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/accountdata"
"github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/acl/list"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
@ -10,9 +14,6 @@ import (
"github.com/anyproto/any-sync/util/slice" "github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"math/rand"
"testing"
"time"
) )
func TestEmptyClientGetsFullHistory(t *testing.T) { func TestEmptyClientGetsFullHistory(t *testing.T) {

View File

@ -205,18 +205,27 @@ func (s *syncTree) Delete() (err error) {
} }
func (s *syncTree) TryClose(objectTTL time.Duration) (bool, error) { func (s *syncTree) TryClose(objectTTL time.Duration) (bool, error) {
return true, s.Close() if !s.TryLock() {
return false, nil
}
log.Debug("closing sync tree", zap.String("id", s.Id()))
return true, s.close()
} }
func (s *syncTree) Close() (err error) { func (s *syncTree) Close() (err error) {
log.Debug("closing sync tree", zap.String("id", s.Id())) log.Debug("closing sync tree", zap.String("id", s.Id()))
s.Lock()
return s.close()
}
func (s *syncTree) close() (err error) {
defer s.Unlock()
defer func() { defer func() {
log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id())) log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id()))
}() }()
s.Lock()
defer s.Unlock()
if s.isClosed { if s.isClosed {
return ErrSyncTreeClosed err = ErrSyncTreeClosed
return
} }
s.onClose(s.Id()) s.onClose(s.Id())
s.isClosed = true s.isClosed = true

View File

@ -3,18 +3,21 @@ package synctree
import ( import (
"context" "context"
"errors" "errors"
"sync"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler"
"github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/spacesyncproto"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/anyproto/any-sync/util/slice"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"sync"
) )
var ( var (
ErrMessageIsRequest = errors.New("message is request") ErrMessageIsRequest = errors.New("message is request")
ErrMessageIsNotRequest = errors.New("message is not request") ErrMessageIsNotRequest = errors.New("message is not request")
ErrMoreThanOneRequest = errors.New("more than one request for same peer")
) )
type syncTreeHandler struct { type syncTreeHandler struct {
@ -22,9 +25,11 @@ type syncTreeHandler struct {
syncClient SyncClient syncClient SyncClient
syncProtocol TreeSyncProtocol syncProtocol TreeSyncProtocol
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
handlerLock sync.Mutex
spaceId string spaceId string
queue ReceiveQueue
handlerLock sync.Mutex
pendingRequests map[string]struct{}
heads []string
} }
const maxQueueSize = 5 const maxQueueSize = 5
@ -36,7 +41,7 @@ func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClien
syncClient: syncClient, syncClient: syncClient,
syncStatus: syncStatus, syncStatus: syncStatus,
spaceId: spaceId, spaceId: spaceId,
queue: newReceiveQueue(maxQueueSize), pendingRequests: make(map[string]struct{}),
} }
} }
@ -48,17 +53,35 @@ func (s *syncTreeHandler) HandleRequest(ctx context.Context, senderId string, re
} }
fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest() fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest()
if fullSyncRequest == nil { if fullSyncRequest == nil {
err = ErrMessageIsNotRequest return nil, ErrMessageIsNotRequest
return
} }
s.syncStatus.HeadsReceive(senderId, request.ObjectId, treechangeproto.GetHeads(unmarshalled)) // setting pending requests
s.handlerLock.Lock()
_, exists := s.pendingRequests[senderId]
if exists {
s.handlerLock.Unlock()
return nil, ErrMoreThanOneRequest
}
s.pendingRequests[senderId] = struct{}{}
s.handlerLock.Unlock()
response, err = s.handleRequest(ctx, senderId, fullSyncRequest)
// removing pending requests
s.handlerLock.Lock()
delete(s.pendingRequests, senderId)
s.handlerLock.Unlock()
return
}
func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fullSyncRequest *treechangeproto.TreeFullSyncRequest) (response *spacesyncproto.ObjectSyncMessage, err error) {
s.objTree.Lock() s.objTree.Lock()
defer s.objTree.Unlock() defer s.objTree.Unlock()
treeResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest) treeResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest)
if err != nil { if err != nil {
return return
} }
response, err = MarshallTreeMessage(treeResp, s.spaceId, request.ObjectId, "") response, err = MarshallTreeMessage(treeResp, s.spaceId, s.objTree.Id(), "")
return return
} }
@ -68,28 +91,41 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms
if err != nil { if err != nil {
return return
} }
s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) heads := treechangeproto.GetHeads(unmarshalled)
s.syncStatus.HeadsReceive(senderId, msg.ObjectId, heads)
queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.RequestId) s.handlerLock.Lock()
if queueFull { // if the update has same heads then returning not to hang on a lock
if unmarshalled.GetContent().GetHeadUpdate() != nil && slice.UnsortedEquals(heads, s.heads) {
s.handlerLock.Unlock()
return return
} }
s.handlerLock.Unlock()
return s.handleMessage(ctx, senderId) return s.handleMessage(ctx, unmarshalled, senderId)
} }
func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) { func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, senderId string) (err error) {
s.objTree.Lock() s.objTree.Lock()
defer s.objTree.Unlock() defer s.objTree.Unlock()
msg, _, err := s.queue.GetMessage(senderId) var (
if err != nil { copyHeads = make([]string, 0, len(s.objTree.Heads()))
return treeId = s.objTree.Id()
content = msg.GetContent()
)
// getting old heads
copyHeads = append(copyHeads, s.objTree.Heads()...)
defer func() {
// checking if something changed
if !slice.UnsortedEquals(copyHeads, s.objTree.Heads()) {
s.handlerLock.Lock()
defer s.handlerLock.Unlock()
s.heads = s.heads[:0]
for _, h := range s.objTree.Heads() {
s.heads = append(s.heads, h)
} }
}
}()
defer s.queue.ClearQueue(senderId)
treeId := s.objTree.Id()
content := msg.GetContent()
switch { switch {
case content.GetHeadUpdate() != nil: case content.GetHeadUpdate() != nil:
var syncReq *treechangeproto.TreeSyncMessage var syncReq *treechangeproto.TreeSyncMessage

View File

@ -2,15 +2,15 @@ package synctree
import ( import (
"context" "context"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/stretchr/testify/require"
"sync" "sync"
"testing" "testing"
"github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/commonspace/syncstatus"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
) )
type testObjTreeMock struct { type testObjTreeMock struct {
@ -52,7 +52,6 @@ type syncHandlerFixture struct {
ctrl *gomock.Controller ctrl *gomock.Controller
syncClientMock *mock_synctree.MockSyncClient syncClientMock *mock_synctree.MockSyncClient
objectTreeMock *testObjTreeMock objectTreeMock *testObjTreeMock
receiveQueueMock ReceiveQueue
syncProtocolMock *mock_synctree.MockTreeSyncProtocol syncProtocolMock *mock_synctree.MockTreeSyncProtocol
spaceId string spaceId string
senderId string senderId string
@ -67,20 +66,18 @@ func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture {
syncClientMock := mock_synctree.NewMockSyncClient(ctrl) syncClientMock := mock_synctree.NewMockSyncClient(ctrl)
syncProtocolMock := mock_synctree.NewMockTreeSyncProtocol(ctrl) syncProtocolMock := mock_synctree.NewMockTreeSyncProtocol(ctrl)
spaceId := "spaceId" spaceId := "spaceId"
receiveQueue := newReceiveQueue(5)
syncHandler := &syncTreeHandler{ syncHandler := &syncTreeHandler{
objTree: objectTreeMock, objTree: objectTreeMock,
syncClient: syncClientMock, syncClient: syncClientMock,
syncProtocol: syncProtocolMock, syncProtocol: syncProtocolMock,
spaceId: spaceId, spaceId: spaceId,
queue: receiveQueue,
syncStatus: syncstatus.NewNoOpSyncStatus(), syncStatus: syncstatus.NewNoOpSyncStatus(),
pendingRequests: map[string]struct{}{},
} }
return &syncHandlerFixture{ return &syncHandlerFixture{
ctrl: ctrl, ctrl: ctrl,
objectTreeMock: objectTreeMock, objectTreeMock: objectTreeMock,
receiveQueueMock: receiveQueue,
syncProtocolMock: syncProtocolMock, syncProtocolMock: syncProtocolMock,
syncClientMock: syncClientMock, syncClientMock: syncClientMock,
syncHandler: syncHandler, syncHandler: syncHandler,
@ -97,38 +94,68 @@ func (fx *syncHandlerFixture) stop() {
func TestSyncTreeHandler_HandleMessage(t *testing.T) { func TestSyncTreeHandler_HandleMessage(t *testing.T) {
ctx := context.Background() ctx := context.Background()
t.Run("handle head update message", func(t *testing.T) { t.Run("handle head update message, heads not equal, request returned", func(t *testing.T) {
fx := newSyncHandlerFixture(t) fx := newSyncHandlerFixture(t)
defer fx.stop() defer fx.stop()
treeId := "treeId" treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{} chWithId := &treechangeproto.RawTreeChangeWithId{}
headUpdate := &treechangeproto.TreeHeadUpdate{} headUpdate := &treechangeproto.TreeHeadUpdate{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
syncReq := &treechangeproto.TreeSyncMessage{} syncReq := &treechangeproto.TreeSyncMessage{}
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"})
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"})
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(syncReq, nil) fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(syncReq, nil)
fx.syncClientMock.EXPECT().QueueRequest(fx.senderId, fx.treeId, syncReq).Return(nil) fx.syncClientMock.EXPECT().QueueRequest(fx.senderId, fx.treeId, syncReq).Return(nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []string{"h3"}, fx.syncHandler.heads)
}) })
t.Run("handle head update message, empty sync request", func(t *testing.T) { t.Run("handle head update message, heads equal", func(t *testing.T) {
fx := newSyncHandlerFixture(t) fx := newSyncHandlerFixture(t)
defer fx.stop() defer fx.stop()
treeId := "treeId" treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{} chWithId := &treechangeproto.RawTreeChangeWithId{}
headUpdate := &treechangeproto.TreeHeadUpdate{} headUpdate := &treechangeproto.TreeHeadUpdate{
Heads: []string{"h1"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.syncHandler.heads = []string{"h1"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
})
t.Run("handle head update message, empty sync request returned", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
headUpdate := &treechangeproto.TreeHeadUpdate{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"})
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"})
fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(nil, nil) fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(nil, nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, []string{"h3"}, fx.syncHandler.heads)
}) })
t.Run("handle full sync request returns error", func(t *testing.T) { t.Run("handle full sync request returns error", func(t *testing.T) {
@ -136,11 +163,15 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
defer fx.stop() defer fx.stop()
treeId := "treeId" treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{} chWithId := &treechangeproto.RawTreeChangeWithId{}
fullRequest := &treechangeproto.TreeFullSyncRequest{} fullRequest := &treechangeproto.TreeFullSyncRequest{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId) treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(3).Return([]string{"h2"})
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
require.Equal(t, err, ErrMessageIsRequest) require.Equal(t, err, ErrMessageIsRequest)
@ -151,11 +182,16 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) {
defer fx.stop() defer fx.stop()
treeId := "treeId" treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{} chWithId := &treechangeproto.RawTreeChangeWithId{}
fullSyncResponse := &treechangeproto.TreeFullSyncResponse{} fullSyncResponse := &treechangeproto.TreeFullSyncResponse{
Heads: []string{"h3"},
}
treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
fx.syncHandler.heads = []string{"h2"}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"})
fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"})
fx.syncProtocolMock.EXPECT().FullSyncResponse(ctx, fx.senderId, gomock.Any()).Return(nil) fx.syncProtocolMock.EXPECT().FullSyncResponse(ctx, fx.senderId, gomock.Any()).Return(nil)
err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg)
@ -184,24 +220,6 @@ func TestSyncTreeHandler_HandleRequest(t *testing.T) {
require.NotNil(t, res) require.NotNil(t, res)
}) })
t.Run("handle request", func(t *testing.T) {
fx := newSyncHandlerFixture(t)
defer fx.stop()
treeId := "treeId"
chWithId := &treechangeproto.RawTreeChangeWithId{}
fullRequest := &treechangeproto.TreeFullSyncRequest{}
treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId)
objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "")
syncResp := &treechangeproto.TreeSyncMessage{}
fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId)
fx.syncProtocolMock.EXPECT().FullSyncRequest(ctx, fx.senderId, gomock.Any()).Return(syncResp, nil)
res, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg)
require.NoError(t, err)
require.NotNil(t, res)
})
t.Run("handle other message", func(t *testing.T) { t.Run("handle other message", func(t *testing.T) {
fx := newSyncHandlerFixture(t) fx := newSyncHandlerFixture(t)
defer fx.stop() defer fx.stop()

View File

@ -4,6 +4,9 @@ package objectsync
import ( import (
"context" "context"
"fmt" "fmt"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/object/treemanager"
@ -13,8 +16,6 @@ import (
"github.com/anyproto/any-sync/util/multiqueue" "github.com/anyproto/any-sync/util/multiqueue"
"github.com/cheggaaa/mb/v3" "github.com/cheggaaa/mb/v3"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"sync/atomic"
"time"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/syncobjectgetter" "github.com/anyproto/any-sync/commonspace/object/syncobjectgetter"
@ -79,7 +80,7 @@ func (s *objectSync) Init(a *app.App) (err error) {
} }
s.spaceIsDeleted = sharedData.SpaceIsDeleted s.spaceIsDeleted = sharedData.SpaceIsDeleted
s.spaceId = sharedData.SpaceId s.spaceId = sharedData.SpaceId
s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 100) s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 30)
return nil return nil
} }