diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 21c97cca..e9eb2abe 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -6,7 +6,6 @@ import ( "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/credentialprovider" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/settings/settingsstate" @@ -24,6 +23,7 @@ type DiffSyncer interface { RemoveObjects(ids []string) UpdateHeads(id string, heads []string) Init(deletionState settingsstate.ObjectDeletionState) + Close() error } func newDiffSyncer( @@ -39,7 +39,7 @@ func newDiffSyncer( return &diffSyncer{ diff: diff, spaceId: spaceId, - cache: cache, + treeManager: cache, storage: storage, peerManager: peerManager, clientFactory: clientFactory, @@ -53,18 +53,20 @@ type diffSyncer struct { spaceId string diff ldiff.Diff peerManager peermanager.PeerManager - cache treemanager.TreeManager + treeManager treemanager.TreeManager storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory log logger.CtxLogger deletionState settingsstate.ObjectDeletionState credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusUpdater + treeSyncer treemanager.TreeSyncer } func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) { d.deletionState = deletionState d.deletionState.AddObserver(d.RemoveObjects) + d.treeSyncer = d.treeManager.NewTreeSyncer(d.spaceId) } func (d *diffSyncer) RemoveObjects(ids []string) { @@ -124,7 +126,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) if err != nil && err != spacesyncproto.ErrSpaceMissing { if err == spacesyncproto.ErrSpaceIsDeleted { d.log.Debug("got space deleted while syncing") - d.syncTrees(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}) + d.treeSyncer.SyncAll(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}, nil) } d.syncStatus.SetNodesOnline(p.Id(), false) return fmt.Errorf("diff error: %v", err) @@ -142,7 +144,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter) - err = d.cache.SyncTrees(ctx, existingIds, missingIds) + err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds) if err != nil { return err } @@ -155,27 +157,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) return } -func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) { - for _, tId := range trees { - log := d.log.With(zap.String("treeId", tId)) - tree, err := d.cache.GetTree(ctx, d.spaceId, tId) - if err != nil { - log.WarnCtx(ctx, "can't load tree", zap.Error(err)) - continue - } - syncTree, ok := tree.(synctree.SyncTree) - if !ok { - log.WarnCtx(ctx, "not a sync tree") - continue - } - if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { - log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) - } else { - log.DebugCtx(ctx, "success synctree.SyncWithPeer") - } - } -} - func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl spacesyncproto.DRPCSpaceSyncClient) (err error) { aclStorage, err := d.storage.AclStorage() if err != nil { @@ -238,3 +219,7 @@ func (d *diffSyncer) subscribe(ctx context.Context, peerId string) (err error) { Payload: payload, }) } + +func (d *diffSyncer) Close() error { + return d.treeSyncer.Close() +} diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 0580fa7d..cafb7a5f 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -119,6 +119,7 @@ func TestDiffSyncer_Sync(t *testing.T) { factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { return clientMock }) + treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl) credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl) delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) spaceId := "spaceId" @@ -126,19 +127,21 @@ func TestDiffSyncer_Sync(t *testing.T) { l := logger.NewNamed(spaceId) diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l) delState.EXPECT().AddObserver(gomock.Any()) + cacheMock.EXPECT().NewTreeSyncer(spaceId).Return(treeSyncerMock) diffSyncer.Init(delState) t.Run("diff syncer sync", func(t *testing.T) { + mPeer := mockPeer{} peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). - Return([]peer.Peer{mockPeer{}}, nil) + Return([]peer.Peer{mPeer}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return([]string{"new"}, []string{"changed"}, nil, nil) delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1) delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1) delState.EXPECT().Filter(nil).Return(nil).Times(1) - cacheMock.EXPECT().SyncTrees(gomock.Any(), []string{"changed"}, []string{"new"}).Return(nil) + treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) @@ -225,16 +228,15 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync space is deleted error", func(t *testing.T) { + mPeer := mockPeer{} peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). - Return([]peer.Peer{mockPeer{}}, nil) + Return([]peer.Peer{mPeer}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) stMock.EXPECT().SpaceSettingsId().Return("settingsId") - cacheMock.EXPECT(). - GetTree(gomock.Any(), spaceId, "settingsId"). - Return(nil, nil) + treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index cdb67e0b..ddca3e50 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -136,7 +136,7 @@ func (d *headSync) RemoveObjects(ids []string) { func (d *headSync) Close() (err error) { d.periodicSync.Close() - return + return d.syncer.Close() } func (d *headSync) fillDiff(objectIds []string) { diff --git a/commonspace/headsync/headsync_test.go b/commonspace/headsync/headsync_test.go index c803edc4..ae2c419a 100644 --- a/commonspace/headsync/headsync_test.go +++ b/commonspace/headsync/headsync_test.go @@ -65,6 +65,7 @@ func TestDiffService(t *testing.T) { t.Run("close", func(t *testing.T) { pSyncMock.EXPECT().Close() + syncer.EXPECT().Close() service.Close() }) } diff --git a/commonspace/headsync/mock_headsync/mock_headsync.go b/commonspace/headsync/mock_headsync/mock_headsync.go index 2ad6ac81..7df2fe64 100644 --- a/commonspace/headsync/mock_headsync/mock_headsync.go +++ b/commonspace/headsync/mock_headsync/mock_headsync.go @@ -35,6 +35,20 @@ func (m *MockDiffSyncer) EXPECT() *MockDiffSyncerMockRecorder { return m.recorder } +// Close mocks base method. +func (m *MockDiffSyncer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockDiffSyncerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDiffSyncer)(nil).Close)) +} + // Init mocks base method. func (m *MockDiffSyncer) Init(arg0 settingsstate.ObjectDeletionState) { m.ctrl.T.Helper() diff --git a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go index 1a1c1aab..e1035ae1 100644 --- a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go +++ b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager) +// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager,TreeSyncer) // Package mock_treemanager is a generated GoMock package. package mock_treemanager @@ -10,6 +10,7 @@ import ( app "github.com/anyproto/any-sync/app" objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + treemanager "github.com/anyproto/any-sync/commonspace/object/treemanager" gomock "github.com/golang/mock/gomock" ) @@ -121,6 +122,20 @@ func (mr *MockTreeManagerMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockTreeManager)(nil).Name)) } +// NewTreeSyncer mocks base method. +func (m *MockTreeManager) NewTreeSyncer(arg0 string) treemanager.TreeSyncer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewTreeSyncer", arg0) + ret0, _ := ret[0].(treemanager.TreeSyncer) + return ret0 +} + +// NewTreeSyncer indicates an expected call of NewTreeSyncer. +func (mr *MockTreeManagerMockRecorder) NewTreeSyncer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTreeSyncer", reflect.TypeOf((*MockTreeManager)(nil).NewTreeSyncer), arg0) +} + // Run mocks base method. func (m *MockTreeManager) Run(arg0 context.Context) error { m.ctrl.T.Helper() @@ -135,16 +150,65 @@ func (mr *MockTreeManagerMockRecorder) Run(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0) } -// SyncTrees mocks base method. -func (m *MockTreeManager) SyncTrees(arg0 context.Context, arg1, arg2 []string) error { +// MockTreeSyncer is a mock of TreeSyncer interface. +type MockTreeSyncer struct { + ctrl *gomock.Controller + recorder *MockTreeSyncerMockRecorder +} + +// MockTreeSyncerMockRecorder is the mock recorder for MockTreeSyncer. +type MockTreeSyncerMockRecorder struct { + mock *MockTreeSyncer +} + +// NewMockTreeSyncer creates a new mock instance. +func NewMockTreeSyncer(ctrl *gomock.Controller) *MockTreeSyncer { + mock := &MockTreeSyncer{ctrl: ctrl} + mock.recorder = &MockTreeSyncerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTreeSyncer) EXPECT() *MockTreeSyncerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockTreeSyncer) Close() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SyncTrees", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) return ret0 } -// SyncTrees indicates an expected call of SyncTrees. -func (mr *MockTreeManagerMockRecorder) SyncTrees(arg0, arg1, arg2 interface{}) *gomock.Call { +// Close indicates an expected call of Close. +func (mr *MockTreeSyncerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncTrees", reflect.TypeOf((*MockTreeManager)(nil).SyncTrees), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockTreeSyncer)(nil).Close)) +} + +// Init mocks base method. +func (m *MockTreeSyncer) Init() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init") +} + +// Init indicates an expected call of Init. +func (mr *MockTreeSyncerMockRecorder) Init() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockTreeSyncer)(nil).Init)) +} + +// SyncAll mocks base method. +func (m *MockTreeSyncer) SyncAll(arg0 context.Context, arg1 string, arg2, arg3 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncAll", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncAll indicates an expected call of SyncAll. +func (mr *MockTreeSyncerMockRecorder) SyncAll(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncAll", reflect.TypeOf((*MockTreeSyncer)(nil).SyncAll), arg0, arg1, arg2, arg3) } diff --git a/commonspace/object/treemanager/treemanager.go b/commonspace/object/treemanager/treemanager.go index 554e0ac7..d0a79e51 100644 --- a/commonspace/object/treemanager/treemanager.go +++ b/commonspace/object/treemanager/treemanager.go @@ -1,4 +1,4 @@ -//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager +//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager,TreeSyncer package treemanager import ( @@ -14,5 +14,11 @@ type TreeManager interface { GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error DeleteTree(ctx context.Context, spaceId, treeId string) error - SyncTrees(ctx context.Context, exiting, removed []string) error + NewTreeSyncer(spaceId string) TreeSyncer +} + +type TreeSyncer interface { + Init() + SyncAll(ctx context.Context, peerId string, existing, missing []string) error + Close() error } diff --git a/commonspace/object/treemanager/treesyncer.go b/commonspace/object/treemanager/treesyncer.go new file mode 100644 index 00000000..de47c439 --- /dev/null +++ b/commonspace/object/treemanager/treesyncer.go @@ -0,0 +1,157 @@ +package treemanager + +import ( + "context" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/streampool" + "go.uber.org/zap" + "sync" + "time" +) + +type executor struct { + pool *streampool.ExecPool + objs map[string]struct{} + sync.Mutex +} + +func newExecutor(workers, size int) *executor { + return &executor{ + pool: streampool.NewExecPool(workers, size), + objs: map[string]struct{}{}, + } +} + +func (e *executor) tryAdd(id string, action func()) (err error) { + if _, exists := e.objs[id]; exists { + return nil + } + e.Lock() + defer e.Unlock() + e.objs[id] = struct{}{} + return e.pool.TryAdd(func() { + action() + e.Lock() + defer e.Unlock() + delete(e.objs, id) + }) +} + +func (e *executor) close() { + e.pool.Close() +} + +type treeSyncer struct { + sync.Mutex + log logger.CtxLogger + size int + requests int + spaceId string + timeout time.Duration + requestPools map[string]*executor + headPools map[string]*executor + treeManager TreeManager + isRunning bool +} + +func NewTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager TreeManager, log logger.CtxLogger) TreeSyncer { + return &treeSyncer{ + log: log, + requests: concurrentReqs, + spaceId: spaceId, + timeout: timeout, + requestPools: map[string]*executor{}, + headPools: map[string]*executor{}, + treeManager: treeManager, + } +} + +func (t *treeSyncer) Init() { + t.Lock() + defer t.Unlock() + t.isRunning = true +} + +func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { + t.Lock() + defer t.Unlock() + if !t.isRunning { + return nil + } + reqExec, exists := t.requestPools[peerId] + if !exists { + reqExec = newExecutor(t.requests, t.size) + t.requestPools[peerId] = reqExec + } + headExec, exists := t.headPools[peerId] + if !exists { + headExec = newExecutor(1, t.size) + t.requestPools[peerId] = headExec + } + for _, id := range existing { + err := headExec.tryAdd(id, func() { + t.updateTree(peerId, id) + }) + if err != nil { + t.log.Error("failed to add to head queue", zap.Error(err)) + } + } + for _, id := range missing { + err := reqExec.tryAdd(id, func() { + t.requestTree(peerId, id) + }) + if err != nil { + t.log.Error("failed to add to request queue", zap.Error(err)) + } + } + return nil +} + +func (t *treeSyncer) requestTree(peerId, id string) { + log := t.log.With(zap.String("treeId", id)) + ctx := peer.CtxWithPeerId(context.Background(), peerId) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + _, err := t.treeManager.GetTree(ctx, t.spaceId, id) + if err != nil { + log.WarnCtx(ctx, "can't load missing tree", zap.Error(err)) + } else { + log.DebugCtx(ctx, "loaded missing tree") + } +} + +func (t *treeSyncer) updateTree(peerId, id string) { + log := t.log.With(zap.String("treeId", id)) + ctx := peer.CtxWithPeerId(context.Background(), peerId) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + tr, err := t.treeManager.GetTree(ctx, t.spaceId, id) + if err != nil { + log.WarnCtx(ctx, "can't load existing tree", zap.Error(err)) + return + } + syncTree, ok := tr.(synctree.SyncTree) + if !ok { + log.WarnCtx(ctx, "not a sync tree") + } + if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { + log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) + } else { + log.DebugCtx(ctx, "success synctree.SyncWithPeer") + } +} + +func (t *treeSyncer) Close() error { + t.Lock() + defer t.Unlock() + t.isRunning = false + for _, pool := range t.headPools { + pool.close() + } + for _, pool := range t.requestPools { + pool.close() + } + return nil +} diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 04dda12c..8ff77e0c 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -224,8 +224,8 @@ type mockTreeManager struct { markedIds []string } -func (t *mockTreeManager) SyncTrees(ctx context.Context, exiting, removed []string) error { - return nil +func (t *mockTreeManager) NewTreeSyncer(spaceId string) treemanager.TreeSyncer { + return treemanager.NewTreeSyncer(spaceId, time.Second, 10, t, log) } func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index f0f36001..6071778b 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -6,11 +6,11 @@ import ( "go.uber.org/zap" ) -// newExecPool creates new execPool +// NewExecPool creates new ExecPool // workers - how many processes will execute tasks // maxSize - limit for queue size -func newExecPool(workers, maxSize int) *execPool { - ss := &execPool{ +func NewExecPool(workers, maxSize int) *ExecPool { + ss := &ExecPool{ batch: mb.New[func()](maxSize), } for i := 0; i < workers; i++ { @@ -19,20 +19,20 @@ func newExecPool(workers, maxSize int) *execPool { return ss } -// execPool needed for parallel execution of the incoming send tasks -type execPool struct { +// ExecPool needed for parallel execution of the incoming send tasks +type ExecPool struct { batch *mb.MB[func()] } -func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) { +func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) { return ss.batch.Add(ctx, f...) } -func (ss *execPool) TryAdd(f ...func()) (err error) { +func (ss *ExecPool) TryAdd(f ...func()) (err error) { return ss.batch.TryAdd(f...) } -func (ss *execPool) sendLoop() { +func (ss *ExecPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) if err != nil { @@ -43,6 +43,6 @@ func (ss *execPool) sendLoop() { } } -func (ss *execPool) Close() (err error) { +func (ss *ExecPool) Close() (err error) { return ss.batch.Close() } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 7384b845..59ee6e4c 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -58,7 +58,7 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - dial *execPool + dial *ExecPool mu sync.Mutex writeQueueSize int lastStreamId uint32 diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 30cc44e0..220d6177 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -40,7 +40,7 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), + dial: NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize), } if s.metric != nil { registerMetrics(s.metric.Registry(), sp, "")