diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 57253531..8b59c743 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -6,7 +6,6 @@ import ( "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/credentialprovider" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree" "github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/settings/settingsstate" @@ -24,6 +23,7 @@ type DiffSyncer interface { RemoveObjects(ids []string) UpdateHeads(id string, heads []string) Init(deletionState settingsstate.ObjectDeletionState) + Close() error } func newDiffSyncer( @@ -39,7 +39,7 @@ func newDiffSyncer( return &diffSyncer{ diff: diff, spaceId: spaceId, - cache: cache, + treeManager: cache, storage: storage, peerManager: peerManager, clientFactory: clientFactory, @@ -53,18 +53,20 @@ type diffSyncer struct { spaceId string diff ldiff.Diff peerManager peermanager.PeerManager - cache treemanager.TreeManager + treeManager treemanager.TreeManager storage spacestorage.SpaceStorage clientFactory spacesyncproto.ClientFactory log logger.CtxLogger deletionState settingsstate.ObjectDeletionState credentialProvider credentialprovider.CredentialProvider syncStatus syncstatus.StatusUpdater + treeSyncer treemanager.TreeSyncer } func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) { d.deletionState = deletionState d.deletionState.AddObserver(d.RemoveObjects) + d.treeSyncer = d.treeManager.NewTreeSyncer(d.spaceId, d.treeManager) } func (d *diffSyncer) RemoveObjects(ids []string) { @@ -124,7 +126,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) if err != nil && err != spacesyncproto.ErrSpaceMissing { if err == spacesyncproto.ErrSpaceIsDeleted { d.log.Debug("got space deleted while syncing") - d.syncTrees(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}) + d.treeSyncer.SyncAll(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}, nil) } d.syncStatus.SetNodesOnline(p.Id(), false) return fmt.Errorf("diff error: %v", err) @@ -137,42 +139,24 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) totalLen := len(newIds) + len(changedIds) + len(removedIds) // not syncing ids which were removed through settings document - filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds) + missingIds := d.deletionState.Filter(newIds) + existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...) - d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - - d.syncTrees(ctx, p.Id(), filteredIds) + d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter) + err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds) + if err != nil { + return err + } d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds)), - zap.Int("already deleted ids", totalLen-len(filteredIds)), + zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)), zap.String("peerId", p.Id()), ) return } -func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) { - for _, tId := range trees { - log := d.log.With(zap.String("treeId", tId)) - tree, err := d.cache.GetTree(ctx, d.spaceId, tId) - if err != nil { - log.WarnCtx(ctx, "can't load tree", zap.Error(err)) - continue - } - syncTree, ok := tree.(synctree.SyncTree) - if !ok { - log.WarnCtx(ctx, "not a sync tree") - continue - } - if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { - log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) - } else { - log.DebugCtx(ctx, "success synctree.SyncWithPeer") - } - } -} - func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl spacesyncproto.DRPCSpaceSyncClient) (err error) { aclStorage, err := d.storage.AclStorage() if err != nil { @@ -235,3 +219,7 @@ func (d *diffSyncer) subscribe(ctx context.Context, peerId string) (err error) { Payload: payload, }) } + +func (d *diffSyncer) Close() error { + return d.treeSyncer.Close() +} diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 3f9f4c1f..fb1ad49c 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -119,6 +119,7 @@ func TestDiffSyncer_Sync(t *testing.T) { factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { return clientMock }) + treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl) credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl) delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) spaceId := "spaceId" @@ -126,21 +127,21 @@ func TestDiffSyncer_Sync(t *testing.T) { l := logger.NewNamed(spaceId) diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l) delState.EXPECT().AddObserver(gomock.Any()) + cacheMock.EXPECT().NewTreeSyncer(spaceId, gomock.Any()).Return(treeSyncerMock) diffSyncer.Init(delState) t.Run("diff syncer sync", func(t *testing.T) { + mPeer := mockPeer{} peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). - Return([]peer.Peer{mockPeer{}}, nil) + Return([]peer.Peer{mPeer}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return([]string{"new"}, []string{"changed"}, nil, nil) - delState.EXPECT().FilterJoin(gomock.Any()).Return([]string{"new", "changed"}) - for _, arg := range []string{"new", "changed"} { - cacheMock.EXPECT(). - GetTree(gomock.Any(), spaceId, arg). - Return(nil, nil) - } + delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1) + delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1) + delState.EXPECT().Filter(nil).Return(nil).Times(1) + treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) @@ -227,16 +228,15 @@ func TestDiffSyncer_Sync(t *testing.T) { }) t.Run("diff syncer sync space is deleted error", func(t *testing.T) { + mPeer := mockPeer{} peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). - Return([]peer.Peer{mockPeer{}}, nil) + Return([]peer.Peer{mPeer}, nil) diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) stMock.EXPECT().SpaceSettingsId().Return("settingsId") - cacheMock.EXPECT(). - GetTree(gomock.Any(), spaceId, "settingsId"). - Return(nil, nil) + treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil) require.NoError(t, diffSyncer.Sync(ctx)) }) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 21f8acb1..ddca3e50 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -136,7 +136,7 @@ func (d *headSync) RemoveObjects(ids []string) { func (d *headSync) Close() (err error) { d.periodicSync.Close() - return nil + return d.syncer.Close() } func (d *headSync) fillDiff(objectIds []string) { diff --git a/commonspace/headsync/headsync_test.go b/commonspace/headsync/headsync_test.go index c803edc4..ae2c419a 100644 --- a/commonspace/headsync/headsync_test.go +++ b/commonspace/headsync/headsync_test.go @@ -65,6 +65,7 @@ func TestDiffService(t *testing.T) { t.Run("close", func(t *testing.T) { pSyncMock.EXPECT().Close() + syncer.EXPECT().Close() service.Close() }) } diff --git a/commonspace/headsync/mock_headsync/mock_headsync.go b/commonspace/headsync/mock_headsync/mock_headsync.go index 2ad6ac81..7df2fe64 100644 --- a/commonspace/headsync/mock_headsync/mock_headsync.go +++ b/commonspace/headsync/mock_headsync/mock_headsync.go @@ -35,6 +35,20 @@ func (m *MockDiffSyncer) EXPECT() *MockDiffSyncerMockRecorder { return m.recorder } +// Close mocks base method. +func (m *MockDiffSyncer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockDiffSyncerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDiffSyncer)(nil).Close)) +} + // Init mocks base method. func (m *MockDiffSyncer) Init(arg0 settingsstate.ObjectDeletionState) { m.ctrl.T.Helper() diff --git a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go index 7e321468..ee285aac 100644 --- a/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go +++ b/commonspace/object/treemanager/mock_treemanager/mock_treemanager.go @@ -1,5 +1,5 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager) +// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager,TreeSyncer) // Package mock_treemanager is a generated GoMock package. package mock_treemanager @@ -10,6 +10,7 @@ import ( app "github.com/anyproto/any-sync/app" objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" + treemanager "github.com/anyproto/any-sync/commonspace/object/treemanager" gomock "github.com/golang/mock/gomock" ) @@ -121,6 +122,20 @@ func (mr *MockTreeManagerMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockTreeManager)(nil).Name)) } +// NewTreeSyncer mocks base method. +func (m *MockTreeManager) NewTreeSyncer(arg0 string, arg1 treemanager.TreeManager) treemanager.TreeSyncer { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "NewTreeSyncer", arg0, arg1) + ret0, _ := ret[0].(treemanager.TreeSyncer) + return ret0 +} + +// NewTreeSyncer indicates an expected call of NewTreeSyncer. +func (mr *MockTreeManagerMockRecorder) NewTreeSyncer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTreeSyncer", reflect.TypeOf((*MockTreeManager)(nil).NewTreeSyncer), arg0, arg1) +} + // Run mocks base method. func (m *MockTreeManager) Run(arg0 context.Context) error { m.ctrl.T.Helper() @@ -134,3 +149,66 @@ func (mr *MockTreeManagerMockRecorder) Run(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0) } + +// MockTreeSyncer is a mock of TreeSyncer interface. +type MockTreeSyncer struct { + ctrl *gomock.Controller + recorder *MockTreeSyncerMockRecorder +} + +// MockTreeSyncerMockRecorder is the mock recorder for MockTreeSyncer. +type MockTreeSyncerMockRecorder struct { + mock *MockTreeSyncer +} + +// NewMockTreeSyncer creates a new mock instance. +func NewMockTreeSyncer(ctrl *gomock.Controller) *MockTreeSyncer { + mock := &MockTreeSyncer{ctrl: ctrl} + mock.recorder = &MockTreeSyncerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTreeSyncer) EXPECT() *MockTreeSyncerMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockTreeSyncer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockTreeSyncerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockTreeSyncer)(nil).Close)) +} + +// Init mocks base method. +func (m *MockTreeSyncer) Init() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Init") +} + +// Init indicates an expected call of Init. +func (mr *MockTreeSyncerMockRecorder) Init() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockTreeSyncer)(nil).Init)) +} + +// SyncAll mocks base method. +func (m *MockTreeSyncer) SyncAll(arg0 context.Context, arg1 string, arg2, arg3 []string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncAll", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncAll indicates an expected call of SyncAll. +func (mr *MockTreeSyncerMockRecorder) SyncAll(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncAll", reflect.TypeOf((*MockTreeSyncer)(nil).SyncAll), arg0, arg1, arg2, arg3) +} diff --git a/commonspace/object/treemanager/treemanager.go b/commonspace/object/treemanager/treemanager.go index 3fd9ab4c..eeaa50af 100644 --- a/commonspace/object/treemanager/treemanager.go +++ b/commonspace/object/treemanager/treemanager.go @@ -1,4 +1,4 @@ -//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager +//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager,TreeSyncer package treemanager import ( @@ -14,4 +14,11 @@ type TreeManager interface { GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error DeleteTree(ctx context.Context, spaceId, treeId string) error + NewTreeSyncer(spaceId string, treeManager TreeManager) TreeSyncer +} + +type TreeSyncer interface { + Init() + SyncAll(ctx context.Context, peerId string, existing, missing []string) error + Close() error } diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index db6a15e4..19098ace 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -53,9 +53,11 @@ func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageH func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { s.updateLastUsage() - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Minute) - defer cancel() + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Minute) + defer cancel() + } newCounter := s.counter.Add(1) msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter) log.InfoCtx(ctx, "mpool sendSync", zap.String("requestId", msg.RequestId)) diff --git a/commonspace/settings/settingsstate/deletionstate.go b/commonspace/settings/settingsstate/deletionstate.go index f4e1d7ee..f36f4fd0 100644 --- a/commonspace/settings/settingsstate/deletionstate.go +++ b/commonspace/settings/settingsstate/deletionstate.go @@ -16,7 +16,7 @@ type ObjectDeletionState interface { GetQueued() (ids []string) Delete(id string) (err error) Exists(id string) bool - FilterJoin(ids ...[]string) (filtered []string) + Filter(ids []string) (filtered []string) } type objectDeletionState struct { @@ -113,19 +113,14 @@ func (st *objectDeletionState) Exists(id string) bool { return st.exists(id) } -func (st *objectDeletionState) FilterJoin(ids ...[]string) (filtered []string) { +func (st *objectDeletionState) Filter(ids []string) (filtered []string) { st.RLock() defer st.RUnlock() - filter := func(ids []string) { - for _, id := range ids { - if !st.exists(id) { - filtered = append(filtered, id) - } + for _, id := range ids { + if !st.exists(id) { + filtered = append(filtered, id) } } - for _, arr := range ids { - filter(arr) - } return } diff --git a/commonspace/settings/settingsstate/deletionstate_test.go b/commonspace/settings/settingsstate/deletionstate_test.go index 878a81b0..ca2ea679 100644 --- a/commonspace/settings/settingsstate/deletionstate_test.go +++ b/commonspace/settings/settingsstate/deletionstate_test.go @@ -80,8 +80,8 @@ func TestDeletionState_FilterJoin(t *testing.T) { fx.delState.queued["id1"] = struct{}{} fx.delState.queued["id2"] = struct{}{} - filtered := fx.delState.FilterJoin([]string{"id1"}, []string{"id3", "id2"}, []string{"id4"}) - require.Equal(t, []string{"id3", "id4"}, filtered) + filtered := fx.delState.Filter([]string{"id3", "id2"}) + require.Equal(t, []string{"id3"}, filtered) } func TestDeletionState_AddObserver(t *testing.T) { diff --git a/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go b/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go index 41bc3b16..0bc9bf23 100644 --- a/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go +++ b/commonspace/settings/settingsstate/mock_settingsstate/mock_settingsstate.go @@ -87,22 +87,18 @@ func (mr *MockObjectDeletionStateMockRecorder) Exists(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockObjectDeletionState)(nil).Exists), arg0) } -// FilterJoin mocks base method. -func (m *MockObjectDeletionState) FilterJoin(arg0 ...[]string) []string { +// Filter mocks base method. +func (m *MockObjectDeletionState) Filter(arg0 []string) []string { m.ctrl.T.Helper() - varargs := []interface{}{} - for _, a := range arg0 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "FilterJoin", varargs...) + ret := m.ctrl.Call(m, "Filter", arg0) ret0, _ := ret[0].([]string) return ret0 } -// FilterJoin indicates an expected call of FilterJoin. -func (mr *MockObjectDeletionStateMockRecorder) FilterJoin(arg0 ...interface{}) *gomock.Call { +// Filter indicates an expected call of Filter. +func (mr *MockObjectDeletionStateMockRecorder) Filter(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterJoin", reflect.TypeOf((*MockObjectDeletionState)(nil).FilterJoin), arg0...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockObjectDeletionState)(nil).Filter), arg0) } // GetQueued mocks base method. diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 10140622..000d571c 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -217,6 +217,20 @@ func (m *mockConfig) GetSpace() Config { // Mock TreeManager // +type noOpSyncer struct { +} + +func (n noOpSyncer) Init() { +} + +func (n noOpSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { + return nil +} + +func (n noOpSyncer) Close() error { + return nil +} + type mockTreeManager struct { space Space cache ocache.OCache @@ -224,6 +238,10 @@ type mockTreeManager struct { markedIds []string } +func (t *mockTreeManager) NewTreeSyncer(spaceId string, treeManager treemanager.TreeManager) treemanager.TreeSyncer { + return noOpSyncer{} +} + func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { t.markedIds = append(t.markedIds, treeId) return nil diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index f0f36001..0bff0765 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -6,33 +6,38 @@ import ( "go.uber.org/zap" ) -// newExecPool creates new execPool +// NewExecPool creates new ExecPool // workers - how many processes will execute tasks // maxSize - limit for queue size -func newExecPool(workers, maxSize int) *execPool { - ss := &execPool{ - batch: mb.New[func()](maxSize), - } - for i := 0; i < workers; i++ { - go ss.sendLoop() +func NewExecPool(workers, maxSize int) *ExecPool { + ss := &ExecPool{ + workers: workers, + batch: mb.New[func()](maxSize), } return ss } -// execPool needed for parallel execution of the incoming send tasks -type execPool struct { - batch *mb.MB[func()] +// ExecPool needed for parallel execution of the incoming send tasks +type ExecPool struct { + workers int + batch *mb.MB[func()] } -func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) { +func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) { return ss.batch.Add(ctx, f...) } -func (ss *execPool) TryAdd(f ...func()) (err error) { +func (ss *ExecPool) TryAdd(f ...func()) (err error) { return ss.batch.TryAdd(f...) } -func (ss *execPool) sendLoop() { +func (ss *ExecPool) Run() { + for i := 0; i < ss.workers; i++ { + go ss.sendLoop() + } +} + +func (ss *ExecPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) if err != nil { @@ -43,6 +48,6 @@ func (ss *execPool) sendLoop() { } } -func (ss *execPool) Close() (err error) { +func (ss *ExecPool) Close() (err error) { return ss.batch.Close() } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 7384b845..59ee6e4c 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -58,7 +58,7 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - dial *execPool + dial *ExecPool mu sync.Mutex writeQueueSize int lastStreamId uint32 diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 30cc44e0..a5bb8f62 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -33,6 +33,7 @@ type service struct { } func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { + pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize) sp := &streamPool{ handler: h, writeQueueSize: conf.SendQueueSize, @@ -40,8 +41,9 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), + dial: pl, } + pl.Run() if s.metric != nil { registerMetrics(s.metric.Registry(), sp, "") }