From 8e7df9eae5b1dfc07af2530e97b5eae27468f6be Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 20 Jun 2023 12:02:05 +0200 Subject: [PATCH] Sync updates --- .../tree/synctree/protocolintegration_test.go | 7 +- commonspace/object/tree/synctree/synctree.go | 17 +++- .../object/tree/synctree/synctreehandler.go | 90 +++++++++++++------ .../tree/synctree/synctreehandler_test.go | 90 +++++++++++-------- commonspace/objectsync/objectsync.go | 7 +- 5 files changed, 138 insertions(+), 73 deletions(-) diff --git a/commonspace/object/tree/synctree/protocolintegration_test.go b/commonspace/object/tree/synctree/protocolintegration_test.go index 944d6684..4b880c24 100644 --- a/commonspace/object/tree/synctree/protocolintegration_test.go +++ b/commonspace/object/tree/synctree/protocolintegration_test.go @@ -2,6 +2,10 @@ package synctree import ( "context" + "math/rand" + "testing" + "time" + "github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" @@ -10,9 +14,6 @@ import ( "github.com/anyproto/any-sync/util/slice" "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/require" - "math/rand" - "testing" - "time" ) func TestEmptyClientGetsFullHistory(t *testing.T) { diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 52c33ed5..0d4712df 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -205,18 +205,27 @@ func (s *syncTree) Delete() (err error) { } func (s *syncTree) TryClose(objectTTL time.Duration) (bool, error) { - return true, s.Close() + if !s.TryLock() { + return false, nil + } + log.Debug("closing sync tree", zap.String("id", s.Id())) + return true, s.close() } func (s *syncTree) Close() (err error) { log.Debug("closing sync tree", zap.String("id", s.Id())) + s.Lock() + return s.close() +} + +func (s *syncTree) close() (err error) { + defer s.Unlock() defer func() { log.Debug("closed sync tree", zap.Error(err), zap.String("id", s.Id())) }() - s.Lock() - defer s.Unlock() if s.isClosed { - return ErrSyncTreeClosed + err = ErrSyncTreeClosed + return } s.onClose(s.Id()) s.isClosed = true diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 154330f3..8f133c66 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -3,18 +3,21 @@ package synctree import ( "context" "errors" + "sync" + "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/objectsync/synchandler" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/commonspace/syncstatus" + "github.com/anyproto/any-sync/util/slice" "github.com/gogo/protobuf/proto" - "sync" ) var ( ErrMessageIsRequest = errors.New("message is request") ErrMessageIsNotRequest = errors.New("message is not request") + ErrMoreThanOneRequest = errors.New("more than one request for same peer") ) type syncTreeHandler struct { @@ -22,21 +25,23 @@ type syncTreeHandler struct { syncClient SyncClient syncProtocol TreeSyncProtocol syncStatus syncstatus.StatusUpdater - handlerLock sync.Mutex spaceId string - queue ReceiveQueue + + handlerLock sync.Mutex + pendingRequests map[string]struct{} + heads []string } const maxQueueSize = 5 func newSyncTreeHandler(spaceId string, objTree objecttree.ObjectTree, syncClient SyncClient, syncStatus syncstatus.StatusUpdater) synchandler.SyncHandler { return &syncTreeHandler{ - objTree: objTree, - syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient), - syncClient: syncClient, - syncStatus: syncStatus, - spaceId: spaceId, - queue: newReceiveQueue(maxQueueSize), + objTree: objTree, + syncProtocol: newTreeSyncProtocol(spaceId, objTree, syncClient), + syncClient: syncClient, + syncStatus: syncStatus, + spaceId: spaceId, + pendingRequests: make(map[string]struct{}), } } @@ -48,17 +53,35 @@ func (s *syncTreeHandler) HandleRequest(ctx context.Context, senderId string, re } fullSyncRequest := unmarshalled.GetContent().GetFullSyncRequest() if fullSyncRequest == nil { - err = ErrMessageIsNotRequest - return + return nil, ErrMessageIsNotRequest } - s.syncStatus.HeadsReceive(senderId, request.ObjectId, treechangeproto.GetHeads(unmarshalled)) + // setting pending requests + s.handlerLock.Lock() + _, exists := s.pendingRequests[senderId] + if exists { + s.handlerLock.Unlock() + return nil, ErrMoreThanOneRequest + } + s.pendingRequests[senderId] = struct{}{} + s.handlerLock.Unlock() + + response, err = s.handleRequest(ctx, senderId, fullSyncRequest) + + // removing pending requests + s.handlerLock.Lock() + delete(s.pendingRequests, senderId) + s.handlerLock.Unlock() + return +} + +func (s *syncTreeHandler) handleRequest(ctx context.Context, senderId string, fullSyncRequest *treechangeproto.TreeFullSyncRequest) (response *spacesyncproto.ObjectSyncMessage, err error) { s.objTree.Lock() defer s.objTree.Unlock() treeResp, err := s.syncProtocol.FullSyncRequest(ctx, senderId, fullSyncRequest) if err != nil { return } - response, err = MarshallTreeMessage(treeResp, s.spaceId, request.ObjectId, "") + response, err = MarshallTreeMessage(treeResp, s.spaceId, s.objTree.Id(), "") return } @@ -68,28 +91,41 @@ func (s *syncTreeHandler) HandleMessage(ctx context.Context, senderId string, ms if err != nil { return } - s.syncStatus.HeadsReceive(senderId, msg.ObjectId, treechangeproto.GetHeads(unmarshalled)) - - queueFull := s.queue.AddMessage(senderId, unmarshalled, msg.RequestId) - if queueFull { + heads := treechangeproto.GetHeads(unmarshalled) + s.syncStatus.HeadsReceive(senderId, msg.ObjectId, heads) + s.handlerLock.Lock() + // if the update has same heads then returning not to hang on a lock + if unmarshalled.GetContent().GetHeadUpdate() != nil && slice.UnsortedEquals(heads, s.heads) { + s.handlerLock.Unlock() return } - - return s.handleMessage(ctx, senderId) + s.handlerLock.Unlock() + return s.handleMessage(ctx, unmarshalled, senderId) } -func (s *syncTreeHandler) handleMessage(ctx context.Context, senderId string) (err error) { +func (s *syncTreeHandler) handleMessage(ctx context.Context, msg *treechangeproto.TreeSyncMessage, senderId string) (err error) { s.objTree.Lock() defer s.objTree.Unlock() - msg, _, err := s.queue.GetMessage(senderId) - if err != nil { - return - } + var ( + copyHeads = make([]string, 0, len(s.objTree.Heads())) + treeId = s.objTree.Id() + content = msg.GetContent() + ) - defer s.queue.ClearQueue(senderId) + // getting old heads + copyHeads = append(copyHeads, s.objTree.Heads()...) + defer func() { + // checking if something changed + if !slice.UnsortedEquals(copyHeads, s.objTree.Heads()) { + s.handlerLock.Lock() + defer s.handlerLock.Unlock() + s.heads = s.heads[:0] + for _, h := range s.objTree.Heads() { + s.heads = append(s.heads, h) + } + } + }() - treeId := s.objTree.Id() - content := msg.GetContent() switch { case content.GetHeadUpdate() != nil: var syncReq *treechangeproto.TreeSyncMessage diff --git a/commonspace/object/tree/synctree/synctreehandler_test.go b/commonspace/object/tree/synctree/synctreehandler_test.go index f03f5ff1..84f3b237 100644 --- a/commonspace/object/tree/synctree/synctreehandler_test.go +++ b/commonspace/object/tree/synctree/synctreehandler_test.go @@ -2,15 +2,15 @@ package synctree import ( "context" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree" - "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" - "github.com/stretchr/testify/require" "sync" "testing" "github.com/anyproto/any-sync/commonspace/object/tree/objecttree/mock_objecttree" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree/mock_synctree" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" ) type testObjTreeMock struct { @@ -52,7 +52,6 @@ type syncHandlerFixture struct { ctrl *gomock.Controller syncClientMock *mock_synctree.MockSyncClient objectTreeMock *testObjTreeMock - receiveQueueMock ReceiveQueue syncProtocolMock *mock_synctree.MockTreeSyncProtocol spaceId string senderId string @@ -67,20 +66,18 @@ func newSyncHandlerFixture(t *testing.T) *syncHandlerFixture { syncClientMock := mock_synctree.NewMockSyncClient(ctrl) syncProtocolMock := mock_synctree.NewMockTreeSyncProtocol(ctrl) spaceId := "spaceId" - receiveQueue := newReceiveQueue(5) syncHandler := &syncTreeHandler{ - objTree: objectTreeMock, - syncClient: syncClientMock, - syncProtocol: syncProtocolMock, - spaceId: spaceId, - queue: receiveQueue, - syncStatus: syncstatus.NewNoOpSyncStatus(), + objTree: objectTreeMock, + syncClient: syncClientMock, + syncProtocol: syncProtocolMock, + spaceId: spaceId, + syncStatus: syncstatus.NewNoOpSyncStatus(), + pendingRequests: map[string]struct{}{}, } return &syncHandlerFixture{ ctrl: ctrl, objectTreeMock: objectTreeMock, - receiveQueueMock: receiveQueue, syncProtocolMock: syncProtocolMock, syncClientMock: syncClientMock, syncHandler: syncHandler, @@ -97,38 +94,68 @@ func (fx *syncHandlerFixture) stop() { func TestSyncTreeHandler_HandleMessage(t *testing.T) { ctx := context.Background() - t.Run("handle head update message", func(t *testing.T) { + t.Run("handle head update message, heads not equal, request returned", func(t *testing.T) { fx := newSyncHandlerFixture(t) defer fx.stop() treeId := "treeId" chWithId := &treechangeproto.RawTreeChangeWithId{} - headUpdate := &treechangeproto.TreeHeadUpdate{} + headUpdate := &treechangeproto.TreeHeadUpdate{ + Heads: []string{"h3"}, + } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") syncReq := &treechangeproto.TreeSyncMessage{} + fx.syncHandler.heads = []string{"h2"} fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) + fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"}) + fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"}) fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(syncReq, nil) fx.syncClientMock.EXPECT().QueueRequest(fx.senderId, fx.treeId, syncReq).Return(nil) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) require.NoError(t, err) + require.Equal(t, []string{"h3"}, fx.syncHandler.heads) }) - t.Run("handle head update message, empty sync request", func(t *testing.T) { + t.Run("handle head update message, heads equal", func(t *testing.T) { fx := newSyncHandlerFixture(t) defer fx.stop() treeId := "treeId" chWithId := &treechangeproto.RawTreeChangeWithId{} - headUpdate := &treechangeproto.TreeHeadUpdate{} + headUpdate := &treechangeproto.TreeHeadUpdate{ + Heads: []string{"h1"}, + } treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + fx.syncHandler.heads = []string{"h1"} fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) + + err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) + require.NoError(t, err) + }) + + t.Run("handle head update message, empty sync request returned", func(t *testing.T) { + fx := newSyncHandlerFixture(t) + defer fx.stop() + treeId := "treeId" + chWithId := &treechangeproto.RawTreeChangeWithId{} + headUpdate := &treechangeproto.TreeHeadUpdate{ + Heads: []string{"h3"}, + } + treeMsg := treechangeproto.WrapHeadUpdate(headUpdate, chWithId) + objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + + fx.syncHandler.heads = []string{"h2"} + fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) + fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"}) + fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"}) fx.syncProtocolMock.EXPECT().HeadUpdate(ctx, fx.senderId, gomock.Any()).Return(nil, nil) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) require.NoError(t, err) + require.Equal(t, []string{"h3"}, fx.syncHandler.heads) }) t.Run("handle full sync request returns error", func(t *testing.T) { @@ -136,11 +163,15 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) { defer fx.stop() treeId := "treeId" chWithId := &treechangeproto.RawTreeChangeWithId{} - fullRequest := &treechangeproto.TreeFullSyncRequest{} + fullRequest := &treechangeproto.TreeFullSyncRequest{ + Heads: []string{"h3"}, + } treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId) objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + fx.syncHandler.heads = []string{"h2"} fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) + fx.objectTreeMock.EXPECT().Heads().Times(3).Return([]string{"h2"}) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) require.Equal(t, err, ErrMessageIsRequest) @@ -151,11 +182,16 @@ func TestSyncTreeHandler_HandleMessage(t *testing.T) { defer fx.stop() treeId := "treeId" chWithId := &treechangeproto.RawTreeChangeWithId{} - fullSyncResponse := &treechangeproto.TreeFullSyncResponse{} + fullSyncResponse := &treechangeproto.TreeFullSyncResponse{ + Heads: []string{"h3"}, + } treeMsg := treechangeproto.WrapFullResponse(fullSyncResponse, chWithId) objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") + fx.syncHandler.heads = []string{"h2"} fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) + fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h2"}) + fx.objectTreeMock.EXPECT().Heads().Times(2).Return([]string{"h3"}) fx.syncProtocolMock.EXPECT().FullSyncResponse(ctx, fx.senderId, gomock.Any()).Return(nil) err := fx.syncHandler.HandleMessage(ctx, fx.senderId, objectMsg) @@ -184,24 +220,6 @@ func TestSyncTreeHandler_HandleRequest(t *testing.T) { require.NotNil(t, res) }) - t.Run("handle request", func(t *testing.T) { - fx := newSyncHandlerFixture(t) - defer fx.stop() - treeId := "treeId" - chWithId := &treechangeproto.RawTreeChangeWithId{} - fullRequest := &treechangeproto.TreeFullSyncRequest{} - treeMsg := treechangeproto.WrapFullRequest(fullRequest, chWithId) - objectMsg, _ := MarshallTreeMessage(treeMsg, "spaceId", treeId, "") - - syncResp := &treechangeproto.TreeSyncMessage{} - fx.objectTreeMock.EXPECT().Id().AnyTimes().Return(fx.treeId) - fx.syncProtocolMock.EXPECT().FullSyncRequest(ctx, fx.senderId, gomock.Any()).Return(syncResp, nil) - - res, err := fx.syncHandler.HandleRequest(ctx, fx.senderId, objectMsg) - require.NoError(t, err) - require.NotNil(t, res) - }) - t.Run("handle other message", func(t *testing.T) { fx := newSyncHandlerFixture(t) defer fx.stop() diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 0ec7f344..f8125aa8 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -4,6 +4,9 @@ package objectsync import ( "context" "fmt" + "sync/atomic" + "time" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/treemanager" @@ -13,8 +16,6 @@ import ( "github.com/anyproto/any-sync/util/multiqueue" "github.com/cheggaaa/mb/v3" "github.com/gogo/protobuf/proto" - "sync/atomic" - "time" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/object/syncobjectgetter" @@ -79,7 +80,7 @@ func (s *objectSync) Init(a *app.App) (err error) { } s.spaceIsDeleted = sharedData.SpaceIsDeleted s.spaceId = sharedData.SpaceId - s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 100) + s.handleQueue = multiqueue.New[HandleMessage](s.processHandleMessage, 30) return nil }