From 2ab43e2b690b8ecc7032d069e5a38d35378666f0 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 24 May 2023 14:54:10 +0200 Subject: [PATCH 1/6] Space headsync delayed start --- commonspace/headsync/headsync.go | 12 ++++++++++-- commonspace/headsync/headsync_test.go | 3 +++ commonspace/space.go | 5 +++++ 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 21f8acb1..4d7d0395 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -29,6 +29,7 @@ type TreeHeads struct { type HeadSync interface { Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) + Run() UpdateHeads(id string, heads []string) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) @@ -48,6 +49,7 @@ type headSync struct { syncer DiffSyncer configuration nodeconf.NodeConf spaceIsDeleted *atomic.Bool + isRunning bool syncPeriod int } @@ -93,7 +95,11 @@ func NewHeadSync( func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) { d.fillDiff(objectIds) d.syncer.Init(deletionState) +} + +func (d *headSync) Run() { d.periodicSync.Run() + d.isRunning = true } func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { @@ -135,8 +141,10 @@ func (d *headSync) RemoveObjects(ids []string) { } func (d *headSync) Close() (err error) { - d.periodicSync.Close() - return nil + if d.isRunning { + d.periodicSync.Close() + } + return } func (d *headSync) fillDiff(objectIds []string) { diff --git a/commonspace/headsync/headsync_test.go b/commonspace/headsync/headsync_test.go index c803edc4..3f83cdab 100644 --- a/commonspace/headsync/headsync_test.go +++ b/commonspace/headsync/headsync_test.go @@ -51,6 +51,7 @@ func TestDiffService(t *testing.T) { storageMock.EXPECT().WriteSpaceHash(hash) pSyncMock.EXPECT().Run() service.Init([]string{initId}, delState) + service.Run() }) t.Run("update heads", func(t *testing.T) { @@ -64,7 +65,9 @@ func TestDiffService(t *testing.T) { }) t.Run("close", func(t *testing.T) { + pSyncMock.EXPECT().Run() pSyncMock.EXPECT().Close() + service.Run() service.Close() }) } diff --git a/commonspace/space.go b/commonspace/space.go index 9a22f453..332a0c0e 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -96,6 +96,7 @@ func NewSpaceId(id string, repKey uint64) string { type Space interface { Id() string Init(ctx context.Context) error + StartHeadSync() StoredIds() []string DebugAllHeads() []headsync.TreeHeads @@ -231,6 +232,10 @@ func (s *space) Init(ctx context.Context) (err error) { return nil } +func (s *space) StartHeadSync() { + s.headSync.Run() +} + func (s *space) ObjectSync() objectsync.ObjectSync { return s.objectSync } From c485baaadff6499f7e98c7b363c1f809083d2b22 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 24 May 2023 15:05:21 +0200 Subject: [PATCH 2/6] Simplify interface --- commonspace/space.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/commonspace/space.go b/commonspace/space.go index 332a0c0e..9a22f453 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -96,7 +96,6 @@ func NewSpaceId(id string, repKey uint64) string { type Space interface { Id() string Init(ctx context.Context) error - StartHeadSync() StoredIds() []string DebugAllHeads() []headsync.TreeHeads @@ -232,10 +231,6 @@ func (s *space) Init(ctx context.Context) (err error) { return nil } -func (s *space) StartHeadSync() { - s.headSync.Run() -} - func (s *space) ObjectSync() objectsync.ObjectSync { return s.objectSync } From a5e2bea04c2b1dd59eb06b8965d5288fc9e96a7c Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 24 May 2023 21:47:59 +0200 Subject: [PATCH 3/6] Update message pool to include optional context deadline --- commonspace/objectsync/msgpool.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index db6a15e4..19098ace 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -53,9 +53,11 @@ func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageH func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { s.updateLastUsage() - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() + } newCounter := s.counter.Add(1) msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter) log.InfoCtx(ctx, "mpool sendSync", zap.String("requestId", msg.RequestId)) From 5b553f1a8d248272be90abb3e6bea851cfb5dfeb Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 25 May 2023 20:23:19 +0200 Subject: [PATCH 4/6] Change headsync logic --- commonspace/headsync/diffsyncer.go | 13 ++++++++----- commonspace/headsync/diffsyncer_test.go | 10 ++++------ commonspace/headsync/headsync.go | 10 +--------- commonspace/headsync/headsync_test.go | 3 --- .../mock_treemanager/mock_treemanager.go | 14 ++++++++++++++ commonspace/object/treemanager/treemanager.go | 1 + .../settings/settingsstate/deletionstate.go | 15 +++++---------- .../settings/settingsstate/deletionstate_test.go | 4 ++-- .../mock_settingsstate/mock_settingsstate.go | 16 ++++++---------- commonspace/spaceutils_test.go | 4 ++++ 10 files changed, 45 insertions(+), 45 deletions(-) diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 57253531..21c97cca 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -137,16 +137,19 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) totalLen := len(newIds) + len(changedIds) + len(removedIds) // not syncing ids which were removed through settings document - filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds) + missingIds := d.deletionState.Filter(newIds) + existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...) - d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - - d.syncTrees(ctx, p.Id(), filteredIds) + d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter) + err = d.cache.SyncTrees(ctx, existingIds, missingIds) + if err != nil { + return err + } d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds)), - zap.Int("already deleted ids", totalLen-len(filteredIds)), + zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)), zap.String("peerId", p.Id()), ) return diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 3f9f4c1f..0580fa7d 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -135,12 +135,10 @@ func TestDiffSyncer_Sync(t *testing.T) { diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return([]string{"new"}, []string{"changed"}, nil, nil) - delState.EXPECT().FilterJoin(gomock.Any()).Return([]string{"new", "changed"}) - for _, arg := range []string{"new", "changed"} { - cacheMock.EXPECT(). - GetTree(gomock.Any(), spaceId, arg). - Return(nil, nil) - } + delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1) + delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1) + delState.EXPECT().Filter(nil).Return(nil).Times(1) + cacheMock.EXPECT().SyncTrees(gomock.Any(), []string{"changed"}, []string{"new"}).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 4d7d0395..cdb67e0b 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -29,7 +29,6 @@ type TreeHeads struct { type HeadSync interface { Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) - Run() UpdateHeads(id string, heads []string) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) @@ -49,7 +48,6 @@ type headSync struct { syncer DiffSyncer configuration nodeconf.NodeConf spaceIsDeleted *atomic.Bool - isRunning bool syncPeriod int } @@ -95,11 +93,7 @@ func NewHeadSync( func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) { d.fillDiff(objectIds) d.syncer.Init(deletionState) -} - -func (d *headSync) Run() { d.periodicSync.Run() - d.isRunning = true } func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { @@ -141,9 +135,7 @@ func (d *headSync) RemoveObjects(ids []string) { } func (d *headSync) Close() (err error) { - if d.isRunning { - d.periodicSync.Close() - } + d.periodicSync.Close() return } diff --git a/commonspace/headsync/headsync_test.go b/commonspace/headsync/headsync_test.go index 3f83cdab..c803edc4 100644 --- a/commonspace/headsync/headsync_test.go +++ b/commonspace/headsync/headsync_test.go @@ -51,7 +51,6 @@ func TestDiffService(t *testing.T) { storageMock.EXPECT().WriteSpaceHash(hash) pSyncMock.EXPECT().Run() service.Init([]string{initId}, delState) - service.Run() }) t.Run("update heads", func(t *testing.T) { @@ -65,9 +64,7 @@ func TestDiffService(t *testing.T) { }) t.Run("close", func(t *testing.T) { - pSyncMock.EXPECT().Run() pSyncMock.EXPECT().Close() - service.Run() service.Close() }) } diff --git a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go index 7e321468..1a1c1aab 100644 --- a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go +++ b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go @@ -134,3 +134,17 @@ func (mr *MockTreeManagerMockRecorder) Run(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0) } + +// SyncTrees mocks base method. +func (m *MockTreeManager) SyncTrees(arg0 context.Context, arg1, arg2 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncTrees", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncTrees indicates an expected call of SyncTrees. +func (mr *MockTreeManagerMockRecorder) SyncTrees(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncTrees", reflect.TypeOf((*MockTreeManager)(nil).SyncTrees), arg0, arg1, arg2) +} diff --git a/commonspace/object/treemanager/treemanager.go b/commonspace/object/treemanager/treemanager.go index 3fd9ab4c..554e0ac7 100644 --- a/commonspace/object/treemanager/treemanager.go +++ b/commonspace/object/treemanager/treemanager.go @@ -14,4 +14,5 @@ type TreeManager interface { GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error DeleteTree(ctx context.Context, spaceId, treeId string) error + SyncTrees(ctx context.Context, exiting, removed []string) error } diff --git a/commonspace/settings/settingsstate/deletionstate.go b/commonspace/settings/settingsstate/deletionstate.go index f4e1d7ee..f36f4fd0 100644 --- a/commonspace/settings/settingsstate/deletionstate.go +++ b/commonspace/settings/settingsstate/deletionstate.go @@ -16,7 +16,7 @@ type ObjectDeletionState interface { GetQueued() (ids []string) Delete(id string) (err error) Exists(id string) bool - FilterJoin(ids ...[]string) (filtered []string) + Filter(ids []string) (filtered []string) } type objectDeletionState struct { @@ -113,19 +113,14 @@ func (st *objectDeletionState) Exists(id string) bool { return st.exists(id) } -func (st *objectDeletionState) FilterJoin(ids ...[]string) (filtered []string) { +func (st *objectDeletionState) Filter(ids []string) (filtered []string) { st.RLock() defer st.RUnlock() - filter := func(ids []string) { - for _, id := range ids { - if !st.exists(id) { - filtered = append(filtered, id) - } + for _, id := range ids { + if !st.exists(id) { + filtered = append(filtered, id) } } - for _, arr := range ids { - filter(arr) - } return } diff --git a/commonspace/settings/settingsstate/deletionstate_test.go b/commonspace/settings/settingsstate/deletionstate_test.go index 878a81b0..ca2ea679 100644 --- a/commonspace/settings/settingsstate/deletionstate_test.go +++ b/commonspace/settings/settingsstate/deletionstate_test.go @@ -80,8 +80,8 @@ func TestDeletionState_FilterJoin(t *testing.T) { fx.delState.queued["id1"] = struct{}{} fx.delState.queued["id2"] = struct{}{} - filtered := fx.delState.FilterJoin([]string{"id1"}, []string{"id3", "id2"}, []string{"id4"}) - require.Equal(t, []string{"id3", "id4"}, filtered) + filtered := fx.delState.Filter([]string{"id3", "id2"}) + require.Equal(t, []string{"id3"}, filtered) } func TestDeletionState_AddObserver(t *testing.T) { diff --git a/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go b/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go index 41bc3b16..0bc9bf23 100644 --- a/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go +++ b/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go @@ -87,22 +87,18 @@ func (mr *MockObjectDeletionStateMockRecorder) Exists(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockObjectDeletionState)(nil).Exists), arg0) } -// FilterJoin mocks base method. -func (m *MockObjectDeletionState) FilterJoin(arg0 ...[]string) []string { +// Filter mocks base method. +func (m *MockObjectDeletionState) Filter(arg0 []string) []string { m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range arg0 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "FilterJoin", varargs...) + ret := m.ctrl.Call(m, "Filter", arg0) ret0, _ := ret[0].([]string) return ret0 } -// FilterJoin indicates an expected call of FilterJoin. -func (mr *MockObjectDeletionStateMockRecorder) FilterJoin(arg0 ...interface{}) *gomock.Call { +// Filter indicates an expected call of Filter. +func (mr *MockObjectDeletionStateMockRecorder) Filter(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterJoin", reflect.TypeOf((*MockObjectDeletionState)(nil).FilterJoin), arg0...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockObjectDeletionState)(nil).Filter), arg0) } // GetQueued mocks base method. diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 10140622..04dda12c 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -224,6 +224,10 @@ type mockTreeManager struct { markedIds []string } +func (t *mockTreeManager) SyncTrees(ctx context.Context, exiting, removed []string) error { + return nil +} + func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { t.markedIds = append(t.markedIds, treeId) return nil From f0a3edd798ef0c41bee3a7fb9ed2d3f5f4ce20f4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 26 May 2023 09:49:09 +0200 Subject: [PATCH 5/6] Add tree syncer --- commonspace/headsync/diffsyncer.go | 37 ++--- commonspace/headsync/diffsyncer_test.go | 14 +- commonspace/headsync/headsync.go | 2 +- commonspace/headsync/headsync_test.go | 1 + .../headsync/mock_headsync/mock_headsync.go | 14 ++ .../mock_treemanager/mock_treemanager.go | 78 ++++++++- commonspace/object/treemanager/treemanager.go | 10 +- commonspace/object/treemanager/treesyncer.go | 157 ++++++++++++++++++ commonspace/spaceutils_test.go | 4 +- net/streampool/sendpool.go | 18 +- net/streampool/streampool.go | 2 +- net/streampool/streampoolservice.go | 2 +- 12 files changed, 284 insertions(+), 55 deletions(-) create mode 100644 commonspace/object/treemanager/treesyncer.go diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 21c97cca..e9eb2abe 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -6,7 +6,6 @@ import ( "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/credentialprovider" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/settings/settingsstate" @@ -24,6 +23,7 @@ type DiffSyncer interface { RemoveObjects(ids []string) UpdateHeads(id string, heads []string) Init(deletionState settingsstate.ObjectDeletionState) + Close() error } func newDiffSyncer( @@ -39,7 +39,7 @@ func newDiffSyncer( return &diffSyncer{ diff: diff, spaceId: spaceId, - cache: cache, + treeManager: cache, storage: storage, peerManager: peerManager, clientFactory: clientFactory, @@ -53,18 +53,20 @@ type diffSyncer struct { spaceId string diff ldiff.Diff peerManager peermanager.PeerManager - cache treemanager.TreeManager + treeManager treemanager.TreeManager storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory log logger.CtxLogger deletionState settingsstate.ObjectDeletionState credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusUpdater + treeSyncer treemanager.TreeSyncer } func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) { d.deletionState = deletionState d.deletionState.AddObserver(d.RemoveObjects) + d.treeSyncer = d.treeManager.NewTreeSyncer(d.spaceId) } func (d *diffSyncer) RemoveObjects(ids []string) { @@ -124,7 +126,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) if err != nil && err != spacesyncproto.ErrSpaceMissing { if err == spacesyncproto.ErrSpaceIsDeleted { d.log.Debug("got space deleted while syncing") - d.syncTrees(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}) + d.treeSyncer.SyncAll(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}, nil) } d.syncStatus.SetNodesOnline(p.Id(), false) return fmt.Errorf("diff error: %v", err) @@ -142,7 +144,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter) - err = d.cache.SyncTrees(ctx, existingIds, missingIds) + err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds) if err != nil { return err } @@ -155,27 +157,6 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) return } -func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) { - for _, tId := range trees { - log := d.log.With(zap.String("treeId", tId)) - tree, err := d.cache.GetTree(ctx, d.spaceId, tId) - if err != nil { - log.WarnCtx(ctx, "can't load tree", zap.Error(err)) - continue - } - syncTree, ok := tree.(synctree.SyncTree) - if !ok { - log.WarnCtx(ctx, "not a sync tree") - continue - } - if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { - log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) - } else { - log.DebugCtx(ctx, "success synctree.SyncWithPeer") - } - } -} - func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl spacesyncproto.DRPCSpaceSyncClient) (err error) { aclStorage, err := d.storage.AclStorage() if err != nil { @@ -238,3 +219,7 @@ func (d *diffSyncer) subscribe(ctx context.Context, peerId string) (err error) { Payload: payload, }) } + +func (d *diffSyncer) Close() error { + return d.treeSyncer.Close() +} diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 0580fa7d..cafb7a5f 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -119,6 +119,7 @@ func TestDiffSyncer_Sync(t *testing.T) { factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { return clientMock }) + treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl) credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl) delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) spaceId := "spaceId" @@ -126,19 +127,21 @@ func TestDiffSyncer_Sync(t *testing.T) { l := logger.NewNamed(spaceId) diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l) delState.EXPECT().AddObserver(gomock.Any()) + cacheMock.EXPECT().NewTreeSyncer(spaceId).Return(treeSyncerMock) diffSyncer.Init(delState) t.Run("diff syncer sync", func(t *testing.T) { + mPeer := mockPeer{} peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). - Return([]peer.Peer{mockPeer{}}, nil) + Return([]peer.Peer{mPeer}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return([]string{"new"}, []string{"changed"}, nil, nil) delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1) delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1) delState.EXPECT().Filter(nil).Return(nil).Times(1) - cacheMock.EXPECT().SyncTrees(gomock.Any(), []string{"changed"}, []string{"new"}).Return(nil) + treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) @@ -225,16 +228,15 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync space is deleted error", func(t *testing.T) { + mPeer := mockPeer{} peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). - Return([]peer.Peer{mockPeer{}}, nil) + Return([]peer.Peer{mPeer}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) stMock.EXPECT().SpaceSettingsId().Return("settingsId") - cacheMock.EXPECT(). - GetTree(gomock.Any(), spaceId, "settingsId"). - Return(nil, nil) + treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index cdb67e0b..ddca3e50 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -136,7 +136,7 @@ func (d *headSync) RemoveObjects(ids []string) { func (d *headSync) Close() (err error) { d.periodicSync.Close() - return + return d.syncer.Close() } func (d *headSync) fillDiff(objectIds []string) { diff --git a/commonspace/headsync/headsync_test.go b/commonspace/headsync/headsync_test.go index c803edc4..ae2c419a 100644 --- a/commonspace/headsync/headsync_test.go +++ b/commonspace/headsync/headsync_test.go @@ -65,6 +65,7 @@ func TestDiffService(t *testing.T) { t.Run("close", func(t *testing.T) { pSyncMock.EXPECT().Close() + syncer.EXPECT().Close() service.Close() }) } diff --git a/commonspace/headsync/mock_headsync/mock_headsync.go b/commonspace/headsync/mock_headsync/mock_headsync.go index 2ad6ac81..7df2fe64 100644 --- a/commonspace/headsync/mock_headsync/mock_headsync.go +++ b/commonspace/headsync/mock_headsync/mock_headsync.go @@ -35,6 +35,20 @@ func (m *MockDiffSyncer) EXPECT() *MockDiffSyncerMockRecorder { return m.recorder } +// Close mocks base method. +func (m *MockDiffSyncer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockDiffSyncerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDiffSyncer)(nil).Close)) +} + // Init mocks base method. func (m *MockDiffSyncer) Init(arg0 settingsstate.ObjectDeletionState) { m.ctrl.T.Helper() diff --git a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go index 1a1c1aab..e1035ae1 100644 --- a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go +++ b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager) +// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager,TreeSyncer) // Package mock_treemanager is a generated GoMock package. package mock_treemanager @@ -10,6 +10,7 @@ import ( app "github.com/anyproto/any-sync/app" objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + treemanager "github.com/anyproto/any-sync/commonspace/object/treemanager" gomock "github.com/golang/mock/gomock" ) @@ -121,6 +122,20 @@ func (mr *MockTreeManagerMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockTreeManager)(nil).Name)) } +// NewTreeSyncer mocks base method. +func (m *MockTreeManager) NewTreeSyncer(arg0 string) treemanager.TreeSyncer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewTreeSyncer", arg0) + ret0, _ := ret[0].(treemanager.TreeSyncer) + return ret0 +} + +// NewTreeSyncer indicates an expected call of NewTreeSyncer. +func (mr *MockTreeManagerMockRecorder) NewTreeSyncer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTreeSyncer", reflect.TypeOf((*MockTreeManager)(nil).NewTreeSyncer), arg0) +} + // Run mocks base method. func (m *MockTreeManager) Run(arg0 context.Context) error { m.ctrl.T.Helper() @@ -135,16 +150,65 @@ func (mr *MockTreeManagerMockRecorder) Run(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0) } -// SyncTrees mocks base method. -func (m *MockTreeManager) SyncTrees(arg0 context.Context, arg1, arg2 []string) error { +// MockTreeSyncer is a mock of TreeSyncer interface. +type MockTreeSyncer struct { + ctrl *gomock.Controller + recorder *MockTreeSyncerMockRecorder +} + +// MockTreeSyncerMockRecorder is the mock recorder for MockTreeSyncer. +type MockTreeSyncerMockRecorder struct { + mock *MockTreeSyncer +} + +// NewMockTreeSyncer creates a new mock instance. +func NewMockTreeSyncer(ctrl *gomock.Controller) *MockTreeSyncer { + mock := &MockTreeSyncer{ctrl: ctrl} + mock.recorder = &MockTreeSyncerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTreeSyncer) EXPECT() *MockTreeSyncerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockTreeSyncer) Close() error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SyncTrees", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) return ret0 } -// SyncTrees indicates an expected call of SyncTrees. -func (mr *MockTreeManagerMockRecorder) SyncTrees(arg0, arg1, arg2 interface{}) *gomock.Call { +// Close indicates an expected call of Close. +func (mr *MockTreeSyncerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncTrees", reflect.TypeOf((*MockTreeManager)(nil).SyncTrees), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockTreeSyncer)(nil).Close)) +} + +// Init mocks base method. +func (m *MockTreeSyncer) Init() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init") +} + +// Init indicates an expected call of Init. +func (mr *MockTreeSyncerMockRecorder) Init() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockTreeSyncer)(nil).Init)) +} + +// SyncAll mocks base method. +func (m *MockTreeSyncer) SyncAll(arg0 context.Context, arg1 string, arg2, arg3 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncAll", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncAll indicates an expected call of SyncAll. +func (mr *MockTreeSyncerMockRecorder) SyncAll(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncAll", reflect.TypeOf((*MockTreeSyncer)(nil).SyncAll), arg0, arg1, arg2, arg3) } diff --git a/commonspace/object/treemanager/treemanager.go b/commonspace/object/treemanager/treemanager.go index 554e0ac7..d0a79e51 100644 --- a/commonspace/object/treemanager/treemanager.go +++ b/commonspace/object/treemanager/treemanager.go @@ -1,4 +1,4 @@ -//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager +//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager,TreeSyncer package treemanager import ( @@ -14,5 +14,11 @@ type TreeManager interface { GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error DeleteTree(ctx context.Context, spaceId, treeId string) error - SyncTrees(ctx context.Context, exiting, removed []string) error + NewTreeSyncer(spaceId string) TreeSyncer +} + +type TreeSyncer interface { + Init() + SyncAll(ctx context.Context, peerId string, existing, missing []string) error + Close() error } diff --git a/commonspace/object/treemanager/treesyncer.go b/commonspace/object/treemanager/treesyncer.go new file mode 100644 index 00000000..de47c439 --- /dev/null +++ b/commonspace/object/treemanager/treesyncer.go @@ -0,0 +1,157 @@ +package treemanager + +import ( + "context" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/commonspace/object/tree/synctree" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/streampool" + "go.uber.org/zap" + "sync" + "time" +) + +type executor struct { + pool *streampool.ExecPool + objs map[string]struct{} + sync.Mutex +} + +func newExecutor(workers, size int) *executor { + return &executor{ + pool: streampool.NewExecPool(workers, size), + objs: map[string]struct{}{}, + } +} + +func (e *executor) tryAdd(id string, action func()) (err error) { + if _, exists := e.objs[id]; exists { + return nil + } + e.Lock() + defer e.Unlock() + e.objs[id] = struct{}{} + return e.pool.TryAdd(func() { + action() + e.Lock() + defer e.Unlock() + delete(e.objs, id) + }) +} + +func (e *executor) close() { + e.pool.Close() +} + +type treeSyncer struct { + sync.Mutex + log logger.CtxLogger + size int + requests int + spaceId string + timeout time.Duration + requestPools map[string]*executor + headPools map[string]*executor + treeManager TreeManager + isRunning bool +} + +func NewTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager TreeManager, log logger.CtxLogger) TreeSyncer { + return &treeSyncer{ + log: log, + requests: concurrentReqs, + spaceId: spaceId, + timeout: timeout, + requestPools: map[string]*executor{}, + headPools: map[string]*executor{}, + treeManager: treeManager, + } +} + +func (t *treeSyncer) Init() { + t.Lock() + defer t.Unlock() + t.isRunning = true +} + +func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { + t.Lock() + defer t.Unlock() + if !t.isRunning { + return nil + } + reqExec, exists := t.requestPools[peerId] + if !exists { + reqExec = newExecutor(t.requests, t.size) + t.requestPools[peerId] = reqExec + } + headExec, exists := t.headPools[peerId] + if !exists { + headExec = newExecutor(1, t.size) + t.requestPools[peerId] = headExec + } + for _, id := range existing { + err := headExec.tryAdd(id, func() { + t.updateTree(peerId, id) + }) + if err != nil { + t.log.Error("failed to add to head queue", zap.Error(err)) + } + } + for _, id := range missing { + err := reqExec.tryAdd(id, func() { + t.requestTree(peerId, id) + }) + if err != nil { + t.log.Error("failed to add to request queue", zap.Error(err)) + } + } + return nil +} + +func (t *treeSyncer) requestTree(peerId, id string) { + log := t.log.With(zap.String("treeId", id)) + ctx := peer.CtxWithPeerId(context.Background(), peerId) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + _, err := t.treeManager.GetTree(ctx, t.spaceId, id) + if err != nil { + log.WarnCtx(ctx, "can't load missing tree", zap.Error(err)) + } else { + log.DebugCtx(ctx, "loaded missing tree") + } +} + +func (t *treeSyncer) updateTree(peerId, id string) { + log := t.log.With(zap.String("treeId", id)) + ctx := peer.CtxWithPeerId(context.Background(), peerId) + ctx, cancel := context.WithTimeout(ctx, t.timeout) + defer cancel() + tr, err := t.treeManager.GetTree(ctx, t.spaceId, id) + if err != nil { + log.WarnCtx(ctx, "can't load existing tree", zap.Error(err)) + return + } + syncTree, ok := tr.(synctree.SyncTree) + if !ok { + log.WarnCtx(ctx, "not a sync tree") + } + if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { + log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) + } else { + log.DebugCtx(ctx, "success synctree.SyncWithPeer") + } +} + +func (t *treeSyncer) Close() error { + t.Lock() + defer t.Unlock() + t.isRunning = false + for _, pool := range t.headPools { + pool.close() + } + for _, pool := range t.requestPools { + pool.close() + } + return nil +} diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 04dda12c..8ff77e0c 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -224,8 +224,8 @@ type mockTreeManager struct { markedIds []string } -func (t *mockTreeManager) SyncTrees(ctx context.Context, exiting, removed []string) error { - return nil +func (t *mockTreeManager) NewTreeSyncer(spaceId string) treemanager.TreeSyncer { + return treemanager.NewTreeSyncer(spaceId, time.Second, 10, t, log) } func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index f0f36001..6071778b 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -6,11 +6,11 @@ import ( "go.uber.org/zap" ) -// newExecPool creates new execPool +// NewExecPool creates new ExecPool // workers - how many processes will execute tasks // maxSize - limit for queue size -func newExecPool(workers, maxSize int) *execPool { - ss := &execPool{ +func NewExecPool(workers, maxSize int) *ExecPool { + ss := &ExecPool{ batch: mb.New[func()](maxSize), } for i := 0; i < workers; i++ { @@ -19,20 +19,20 @@ func newExecPool(workers, maxSize int) *execPool { return ss } -// execPool needed for parallel execution of the incoming send tasks -type execPool struct { +// ExecPool needed for parallel execution of the incoming send tasks +type ExecPool struct { batch *mb.MB[func()] } -func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) { +func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) { return ss.batch.Add(ctx, f...) } -func (ss *execPool) TryAdd(f ...func()) (err error) { +func (ss *ExecPool) TryAdd(f ...func()) (err error) { return ss.batch.TryAdd(f...) } -func (ss *execPool) sendLoop() { +func (ss *ExecPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) if err != nil { @@ -43,6 +43,6 @@ func (ss *execPool) sendLoop() { } } -func (ss *execPool) Close() (err error) { +func (ss *ExecPool) Close() (err error) { return ss.batch.Close() } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 7384b845..59ee6e4c 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -58,7 +58,7 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - dial *execPool + dial *ExecPool mu sync.Mutex writeQueueSize int lastStreamId uint32 diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 30cc44e0..220d6177 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -40,7 +40,7 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), + dial: NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize), } if s.metric != nil { registerMetrics(s.metric.Registry(), sp, "") From 556f03ed7fa40c14bc95b614a8bbca3c5336acda Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 26 May 2023 11:29:21 +0200 Subject: [PATCH 6/6] Expose more methods in exec pool --- commonspace/object/treemanager/treesyncer.go | 157 ------------------- commonspace/spaceutils_test.go | 16 +- net/streampool/sendpool.go | 15 +- net/streampool/streampoolservice.go | 4 +- 4 files changed, 28 insertions(+), 164 deletions(-) delete mode 100644 commonspace/object/treemanager/treesyncer.go diff --git a/commonspace/object/treemanager/treesyncer.go b/commonspace/object/treemanager/treesyncer.go deleted file mode 100644 index de47c439..00000000 --- a/commonspace/object/treemanager/treesyncer.go +++ /dev/null @@ -1,157 +0,0 @@ -package treemanager - -import ( - "context" - "github.com/anyproto/any-sync/app/logger" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree" - "github.com/anyproto/any-sync/net/peer" - "github.com/anyproto/any-sync/net/streampool" - "go.uber.org/zap" - "sync" - "time" -) - -type executor struct { - pool *streampool.ExecPool - objs map[string]struct{} - sync.Mutex -} - -func newExecutor(workers, size int) *executor { - return &executor{ - pool: streampool.NewExecPool(workers, size), - objs: map[string]struct{}{}, - } -} - -func (e *executor) tryAdd(id string, action func()) (err error) { - if _, exists := e.objs[id]; exists { - return nil - } - e.Lock() - defer e.Unlock() - e.objs[id] = struct{}{} - return e.pool.TryAdd(func() { - action() - e.Lock() - defer e.Unlock() - delete(e.objs, id) - }) -} - -func (e *executor) close() { - e.pool.Close() -} - -type treeSyncer struct { - sync.Mutex - log logger.CtxLogger - size int - requests int - spaceId string - timeout time.Duration - requestPools map[string]*executor - headPools map[string]*executor - treeManager TreeManager - isRunning bool -} - -func NewTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager TreeManager, log logger.CtxLogger) TreeSyncer { - return &treeSyncer{ - log: log, - requests: concurrentReqs, - spaceId: spaceId, - timeout: timeout, - requestPools: map[string]*executor{}, - headPools: map[string]*executor{}, - treeManager: treeManager, - } -} - -func (t *treeSyncer) Init() { - t.Lock() - defer t.Unlock() - t.isRunning = true -} - -func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { - t.Lock() - defer t.Unlock() - if !t.isRunning { - return nil - } - reqExec, exists := t.requestPools[peerId] - if !exists { - reqExec = newExecutor(t.requests, t.size) - t.requestPools[peerId] = reqExec - } - headExec, exists := t.headPools[peerId] - if !exists { - headExec = newExecutor(1, t.size) - t.requestPools[peerId] = headExec - } - for _, id := range existing { - err := headExec.tryAdd(id, func() { - t.updateTree(peerId, id) - }) - if err != nil { - t.log.Error("failed to add to head queue", zap.Error(err)) - } - } - for _, id := range missing { - err := reqExec.tryAdd(id, func() { - t.requestTree(peerId, id) - }) - if err != nil { - t.log.Error("failed to add to request queue", zap.Error(err)) - } - } - return nil -} - -func (t *treeSyncer) requestTree(peerId, id string) { - log := t.log.With(zap.String("treeId", id)) - ctx := peer.CtxWithPeerId(context.Background(), peerId) - ctx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - _, err := t.treeManager.GetTree(ctx, t.spaceId, id) - if err != nil { - log.WarnCtx(ctx, "can't load missing tree", zap.Error(err)) - } else { - log.DebugCtx(ctx, "loaded missing tree") - } -} - -func (t *treeSyncer) updateTree(peerId, id string) { - log := t.log.With(zap.String("treeId", id)) - ctx := peer.CtxWithPeerId(context.Background(), peerId) - ctx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - tr, err := t.treeManager.GetTree(ctx, t.spaceId, id) - if err != nil { - log.WarnCtx(ctx, "can't load existing tree", zap.Error(err)) - return - } - syncTree, ok := tr.(synctree.SyncTree) - if !ok { - log.WarnCtx(ctx, "not a sync tree") - } - if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { - log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) - } else { - log.DebugCtx(ctx, "success synctree.SyncWithPeer") - } -} - -func (t *treeSyncer) Close() error { - t.Lock() - defer t.Unlock() - t.isRunning = false - for _, pool := range t.headPools { - pool.close() - } - for _, pool := range t.requestPools { - pool.close() - } - return nil -} diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 8ff77e0c..0fc0977f 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -217,6 +217,20 @@ func (m *mockConfig) GetSpace() Config { // Mock TreeManager // +type noOpSyncer struct { +} + +func (n noOpSyncer) Init() { +} + +func (n noOpSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { + return nil +} + +func (n noOpSyncer) Close() error { + return nil +} + type mockTreeManager struct { space Space cache ocache.OCache @@ -225,7 +239,7 @@ type mockTreeManager struct { } func (t *mockTreeManager) NewTreeSyncer(spaceId string) treemanager.TreeSyncer { - return treemanager.NewTreeSyncer(spaceId, time.Second, 10, t, log) + return noOpSyncer{} } func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index 6071778b..0bff0765 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -11,17 +11,16 @@ import ( // maxSize - limit for queue size func NewExecPool(workers, maxSize int) *ExecPool { ss := &ExecPool{ - batch: mb.New[func()](maxSize), - } - for i := 0; i < workers; i++ { - go ss.sendLoop() + workers: workers, + batch: mb.New[func()](maxSize), } return ss } // ExecPool needed for parallel execution of the incoming send tasks type ExecPool struct { - batch *mb.MB[func()] + workers int + batch *mb.MB[func()] } func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) { @@ -32,6 +31,12 @@ func (ss *ExecPool) TryAdd(f ...func()) (err error) { return ss.batch.TryAdd(f...) } +func (ss *ExecPool) Run() { + for i := 0; i < ss.workers; i++ { + go ss.sendLoop() + } +} + func (ss *ExecPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 220d6177..a5bb8f62 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -33,6 +33,7 @@ type service struct { } func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { + pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize) sp := &streamPool{ handler: h, writeQueueSize: conf.SendQueueSize, @@ -40,8 +41,9 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - dial: NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize), + dial: pl, } + pl.Run() if s.metric != nil { registerMetrics(s.metric.Registry(), sp, "") }