Merge branch 'main' of github.com:anyproto/any-sync into yamux

This commit is contained in:
Sergey Cherepanov 2023-05-26 20:42:06 +02:00
commit 12a7dfe05a
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
15 changed files with 191 additions and 85 deletions

View File

@ -6,7 +6,6 @@ import (
"github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/ldiff"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/credentialprovider" "github.com/anyproto/any-sync/commonspace/credentialprovider"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/commonspace/object/treemanager" "github.com/anyproto/any-sync/commonspace/object/treemanager"
"github.com/anyproto/any-sync/commonspace/peermanager" "github.com/anyproto/any-sync/commonspace/peermanager"
"github.com/anyproto/any-sync/commonspace/settings/settingsstate" "github.com/anyproto/any-sync/commonspace/settings/settingsstate"
@ -24,6 +23,7 @@ type DiffSyncer interface {
RemoveObjects(ids []string) RemoveObjects(ids []string)
UpdateHeads(id string, heads []string) UpdateHeads(id string, heads []string)
Init(deletionState settingsstate.ObjectDeletionState) Init(deletionState settingsstate.ObjectDeletionState)
Close() error
} }
func newDiffSyncer( func newDiffSyncer(
@ -39,7 +39,7 @@ func newDiffSyncer(
return &diffSyncer{ return &diffSyncer{
diff: diff, diff: diff,
spaceId: spaceId, spaceId: spaceId,
cache: cache, treeManager: cache,
storage: storage, storage: storage,
peerManager: peerManager, peerManager: peerManager,
clientFactory: clientFactory, clientFactory: clientFactory,
@ -53,18 +53,20 @@ type diffSyncer struct {
spaceId string spaceId string
diff ldiff.Diff diff ldiff.Diff
peerManager peermanager.PeerManager peerManager peermanager.PeerManager
cache treemanager.TreeManager treeManager treemanager.TreeManager
storage spacestorage.SpaceStorage storage spacestorage.SpaceStorage
clientFactory spacesyncproto.ClientFactory clientFactory spacesyncproto.ClientFactory
log logger.CtxLogger log logger.CtxLogger
deletionState settingsstate.ObjectDeletionState deletionState settingsstate.ObjectDeletionState
credentialProvider credentialprovider.CredentialProvider credentialProvider credentialprovider.CredentialProvider
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
treeSyncer treemanager.TreeSyncer
} }
func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) { func (d *diffSyncer) Init(deletionState settingsstate.ObjectDeletionState) {
d.deletionState = deletionState d.deletionState = deletionState
d.deletionState.AddObserver(d.RemoveObjects) d.deletionState.AddObserver(d.RemoveObjects)
d.treeSyncer = d.treeManager.NewTreeSyncer(d.spaceId, d.treeManager)
} }
func (d *diffSyncer) RemoveObjects(ids []string) { func (d *diffSyncer) RemoveObjects(ids []string) {
@ -124,7 +126,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
if err != nil && err != spacesyncproto.ErrSpaceMissing { if err != nil && err != spacesyncproto.ErrSpaceMissing {
if err == spacesyncproto.ErrSpaceIsDeleted { if err == spacesyncproto.ErrSpaceIsDeleted {
d.log.Debug("got space deleted while syncing") d.log.Debug("got space deleted while syncing")
d.syncTrees(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}) d.treeSyncer.SyncAll(ctx, p.Id(), []string{d.storage.SpaceSettingsId()}, nil)
} }
d.syncStatus.SetNodesOnline(p.Id(), false) d.syncStatus.SetNodesOnline(p.Id(), false)
return fmt.Errorf("diff error: %v", err) return fmt.Errorf("diff error: %v", err)
@ -137,42 +139,24 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
totalLen := len(newIds) + len(changedIds) + len(removedIds) totalLen := len(newIds) + len(changedIds) + len(removedIds)
// not syncing ids which were removed through settings document // not syncing ids which were removed through settings document
filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds) missingIds := d.deletionState.Filter(newIds)
existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...)
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter)
d.syncTrees(ctx, p.Id(), filteredIds)
err = d.treeSyncer.SyncAll(ctx, p.Id(), existingIds, missingIds)
if err != nil {
return err
}
d.log.Info("sync done:", zap.Int("newIds", len(newIds)), d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)), zap.Int("changedIds", len(changedIds)),
zap.Int("removedIds", len(removedIds)), zap.Int("removedIds", len(removedIds)),
zap.Int("already deleted ids", totalLen-len(filteredIds)), zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)),
zap.String("peerId", p.Id()), zap.String("peerId", p.Id()),
) )
return return
} }
func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) {
for _, tId := range trees {
log := d.log.With(zap.String("treeId", tId))
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
if err != nil {
log.WarnCtx(ctx, "can't load tree", zap.Error(err))
continue
}
syncTree, ok := tree.(synctree.SyncTree)
if !ok {
log.WarnCtx(ctx, "not a sync tree")
continue
}
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err))
} else {
log.DebugCtx(ctx, "success synctree.SyncWithPeer")
}
}
}
func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl spacesyncproto.DRPCSpaceSyncClient) (err error) { func (d *diffSyncer) sendPushSpaceRequest(ctx context.Context, peerId string, cl spacesyncproto.DRPCSpaceSyncClient) (err error) {
aclStorage, err := d.storage.AclStorage() aclStorage, err := d.storage.AclStorage()
if err != nil { if err != nil {
@ -235,3 +219,7 @@ func (d *diffSyncer) subscribe(ctx context.Context, peerId string) (err error) {
Payload: payload, Payload: payload,
}) })
} }
func (d *diffSyncer) Close() error {
return d.treeSyncer.Close()
}

View File

@ -119,6 +119,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { factory := spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient {
return clientMock return clientMock
}) })
treeSyncerMock := mock_treemanager.NewMockTreeSyncer(ctrl)
credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl) credentialProvider := mock_credentialprovider.NewMockCredentialProvider(ctrl)
delState := mock_settingsstate.NewMockObjectDeletionState(ctrl) delState := mock_settingsstate.NewMockObjectDeletionState(ctrl)
spaceId := "spaceId" spaceId := "spaceId"
@ -126,21 +127,21 @@ func TestDiffSyncer_Sync(t *testing.T) {
l := logger.NewNamed(spaceId) l := logger.NewNamed(spaceId)
diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l) diffSyncer := newDiffSyncer(spaceId, diffMock, peerManagerMock, cacheMock, stMock, factory, syncstatus.NewNoOpSyncStatus(), credentialProvider, l)
delState.EXPECT().AddObserver(gomock.Any()) delState.EXPECT().AddObserver(gomock.Any())
cacheMock.EXPECT().NewTreeSyncer(spaceId, gomock.Any()).Return(treeSyncerMock)
diffSyncer.Init(delState) diffSyncer.Init(delState)
t.Run("diff syncer sync", func(t *testing.T) { t.Run("diff syncer sync", func(t *testing.T) {
mPeer := mockPeer{}
peerManagerMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()). GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil) Return([]peer.Peer{mPeer}, nil)
diffMock.EXPECT(). diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
Return([]string{"new"}, []string{"changed"}, nil, nil) Return([]string{"new"}, []string{"changed"}, nil, nil)
delState.EXPECT().FilterJoin(gomock.Any()).Return([]string{"new", "changed"}) delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1)
for _, arg := range []string{"new", "changed"} { delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1)
cacheMock.EXPECT(). delState.EXPECT().Filter(nil).Return(nil).Times(1)
GetTree(gomock.Any(), spaceId, arg). treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"changed"}, []string{"new"}).Return(nil)
Return(nil, nil)
}
require.NoError(t, diffSyncer.Sync(ctx)) require.NoError(t, diffSyncer.Sync(ctx))
}) })
@ -227,16 +228,15 @@ func TestDiffSyncer_Sync(t *testing.T) {
}) })
t.Run("diff syncer sync space is deleted error", func(t *testing.T) { t.Run("diff syncer sync space is deleted error", func(t *testing.T) {
mPeer := mockPeer{}
peerManagerMock.EXPECT(). peerManagerMock.EXPECT().
GetResponsiblePeers(gomock.Any()). GetResponsiblePeers(gomock.Any()).
Return([]peer.Peer{mockPeer{}}, nil) Return([]peer.Peer{mPeer}, nil)
diffMock.EXPECT(). diffMock.EXPECT().
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
stMock.EXPECT().SpaceSettingsId().Return("settingsId") stMock.EXPECT().SpaceSettingsId().Return("settingsId")
cacheMock.EXPECT(). treeSyncerMock.EXPECT().SyncAll(gomock.Any(), mPeer.Id(), []string{"settingsId"}, nil).Return(nil)
GetTree(gomock.Any(), spaceId, "settingsId").
Return(nil, nil)
require.NoError(t, diffSyncer.Sync(ctx)) require.NoError(t, diffSyncer.Sync(ctx))
}) })

View File

@ -136,7 +136,7 @@ func (d *headSync) RemoveObjects(ids []string) {
func (d *headSync) Close() (err error) { func (d *headSync) Close() (err error) {
d.periodicSync.Close() d.periodicSync.Close()
return nil return d.syncer.Close()
} }
func (d *headSync) fillDiff(objectIds []string) { func (d *headSync) fillDiff(objectIds []string) {

View File

@ -65,6 +65,7 @@ func TestDiffService(t *testing.T) {
t.Run("close", func(t *testing.T) { t.Run("close", func(t *testing.T) {
pSyncMock.EXPECT().Close() pSyncMock.EXPECT().Close()
syncer.EXPECT().Close()
service.Close() service.Close()
}) })
} }

View File

@ -35,6 +35,20 @@ func (m *MockDiffSyncer) EXPECT() *MockDiffSyncerMockRecorder {
return m.recorder return m.recorder
} }
// Close mocks base method.
func (m *MockDiffSyncer) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockDiffSyncerMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockDiffSyncer)(nil).Close))
}
// Init mocks base method. // Init mocks base method.
func (m *MockDiffSyncer) Init(arg0 settingsstate.ObjectDeletionState) { func (m *MockDiffSyncer) Init(arg0 settingsstate.ObjectDeletionState) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT. // Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager) // Source: github.com/anyproto/any-sync/commonspace/object/treemanager (interfaces: TreeManager,TreeSyncer)
// Package mock_treemanager is a generated GoMock package. // Package mock_treemanager is a generated GoMock package.
package mock_treemanager package mock_treemanager
@ -10,6 +10,7 @@ import (
app "github.com/anyproto/any-sync/app" app "github.com/anyproto/any-sync/app"
objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree" objecttree "github.com/anyproto/any-sync/commonspace/object/tree/objecttree"
treemanager "github.com/anyproto/any-sync/commonspace/object/treemanager"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
) )
@ -121,6 +122,20 @@ func (mr *MockTreeManagerMockRecorder) Name() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockTreeManager)(nil).Name)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockTreeManager)(nil).Name))
} }
// NewTreeSyncer mocks base method.
func (m *MockTreeManager) NewTreeSyncer(arg0 string, arg1 treemanager.TreeManager) treemanager.TreeSyncer {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "NewTreeSyncer", arg0, arg1)
ret0, _ := ret[0].(treemanager.TreeSyncer)
return ret0
}
// NewTreeSyncer indicates an expected call of NewTreeSyncer.
func (mr *MockTreeManagerMockRecorder) NewTreeSyncer(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewTreeSyncer", reflect.TypeOf((*MockTreeManager)(nil).NewTreeSyncer), arg0, arg1)
}
// Run mocks base method. // Run mocks base method.
func (m *MockTreeManager) Run(arg0 context.Context) error { func (m *MockTreeManager) Run(arg0 context.Context) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
@ -134,3 +149,66 @@ func (mr *MockTreeManagerMockRecorder) Run(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0)
} }
// MockTreeSyncer is a mock of TreeSyncer interface.
type MockTreeSyncer struct {
ctrl *gomock.Controller
recorder *MockTreeSyncerMockRecorder
}
// MockTreeSyncerMockRecorder is the mock recorder for MockTreeSyncer.
type MockTreeSyncerMockRecorder struct {
mock *MockTreeSyncer
}
// NewMockTreeSyncer creates a new mock instance.
func NewMockTreeSyncer(ctrl *gomock.Controller) *MockTreeSyncer {
mock := &MockTreeSyncer{ctrl: ctrl}
mock.recorder = &MockTreeSyncerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockTreeSyncer) EXPECT() *MockTreeSyncerMockRecorder {
return m.recorder
}
// Close mocks base method.
func (m *MockTreeSyncer) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockTreeSyncerMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockTreeSyncer)(nil).Close))
}
// Init mocks base method.
func (m *MockTreeSyncer) Init() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Init")
}
// Init indicates an expected call of Init.
func (mr *MockTreeSyncerMockRecorder) Init() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockTreeSyncer)(nil).Init))
}
// SyncAll mocks base method.
func (m *MockTreeSyncer) SyncAll(arg0 context.Context, arg1 string, arg2, arg3 []string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncAll", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error)
return ret0
}
// SyncAll indicates an expected call of SyncAll.
func (mr *MockTreeSyncerMockRecorder) SyncAll(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncAll", reflect.TypeOf((*MockTreeSyncer)(nil).SyncAll), arg0, arg1, arg2, arg3)
}

View File

@ -1,4 +1,4 @@
//go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager //go:generate mockgen -destination mock_treemanager/mock_treemanager.go github.com/anyproto/any-sync/commonspace/object/treemanager TreeManager,TreeSyncer
package treemanager package treemanager
import ( import (
@ -14,4 +14,11 @@ type TreeManager interface {
GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error) GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error
DeleteTree(ctx context.Context, spaceId, treeId string) error DeleteTree(ctx context.Context, spaceId, treeId string) error
NewTreeSyncer(spaceId string, treeManager TreeManager) TreeSyncer
}
type TreeSyncer interface {
Init()
SyncAll(ctx context.Context, peerId string, existing, missing []string) error
Close() error
} }

View File

@ -53,9 +53,11 @@ func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageH
func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
s.updateLastUsage() s.updateLastUsage()
var cancel context.CancelFunc if _, ok := ctx.Deadline(); !ok {
ctx, cancel = context.WithTimeout(ctx, time.Minute) var cancel context.CancelFunc
defer cancel() ctx, cancel = context.WithTimeout(ctx, time.Minute)
defer cancel()
}
newCounter := s.counter.Add(1) newCounter := s.counter.Add(1)
msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter) msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter)
log.InfoCtx(ctx, "mpool sendSync", zap.String("requestId", msg.RequestId)) log.InfoCtx(ctx, "mpool sendSync", zap.String("requestId", msg.RequestId))

View File

@ -16,7 +16,7 @@ type ObjectDeletionState interface {
GetQueued() (ids []string) GetQueued() (ids []string)
Delete(id string) (err error) Delete(id string) (err error)
Exists(id string) bool Exists(id string) bool
FilterJoin(ids ...[]string) (filtered []string) Filter(ids []string) (filtered []string)
} }
type objectDeletionState struct { type objectDeletionState struct {
@ -113,19 +113,14 @@ func (st *objectDeletionState) Exists(id string) bool {
return st.exists(id) return st.exists(id)
} }
func (st *objectDeletionState) FilterJoin(ids ...[]string) (filtered []string) { func (st *objectDeletionState) Filter(ids []string) (filtered []string) {
st.RLock() st.RLock()
defer st.RUnlock() defer st.RUnlock()
filter := func(ids []string) { for _, id := range ids {
for _, id := range ids { if !st.exists(id) {
if !st.exists(id) { filtered = append(filtered, id)
filtered = append(filtered, id)
}
} }
} }
for _, arr := range ids {
filter(arr)
}
return return
} }

View File

@ -80,8 +80,8 @@ func TestDeletionState_FilterJoin(t *testing.T) {
fx.delState.queued["id1"] = struct{}{} fx.delState.queued["id1"] = struct{}{}
fx.delState.queued["id2"] = struct{}{} fx.delState.queued["id2"] = struct{}{}
filtered := fx.delState.FilterJoin([]string{"id1"}, []string{"id3", "id2"}, []string{"id4"}) filtered := fx.delState.Filter([]string{"id3", "id2"})
require.Equal(t, []string{"id3", "id4"}, filtered) require.Equal(t, []string{"id3"}, filtered)
} }
func TestDeletionState_AddObserver(t *testing.T) { func TestDeletionState_AddObserver(t *testing.T) {

View File

@ -87,22 +87,18 @@ func (mr *MockObjectDeletionStateMockRecorder) Exists(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockObjectDeletionState)(nil).Exists), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockObjectDeletionState)(nil).Exists), arg0)
} }
// FilterJoin mocks base method. // Filter mocks base method.
func (m *MockObjectDeletionState) FilterJoin(arg0 ...[]string) []string { func (m *MockObjectDeletionState) Filter(arg0 []string) []string {
m.ctrl.T.Helper() m.ctrl.T.Helper()
varargs := []interface{}{} ret := m.ctrl.Call(m, "Filter", arg0)
for _, a := range arg0 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "FilterJoin", varargs...)
ret0, _ := ret[0].([]string) ret0, _ := ret[0].([]string)
return ret0 return ret0
} }
// FilterJoin indicates an expected call of FilterJoin. // Filter indicates an expected call of Filter.
func (mr *MockObjectDeletionStateMockRecorder) FilterJoin(arg0 ...interface{}) *gomock.Call { func (mr *MockObjectDeletionStateMockRecorder) Filter(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterJoin", reflect.TypeOf((*MockObjectDeletionState)(nil).FilterJoin), arg0...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockObjectDeletionState)(nil).Filter), arg0)
} }
// GetQueued mocks base method. // GetQueued mocks base method.

View File

@ -217,6 +217,20 @@ func (m *mockConfig) GetSpace() Config {
// Mock TreeManager // Mock TreeManager
// //
type noOpSyncer struct {
}
func (n noOpSyncer) Init() {
}
func (n noOpSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
return nil
}
func (n noOpSyncer) Close() error {
return nil
}
type mockTreeManager struct { type mockTreeManager struct {
space Space space Space
cache ocache.OCache cache ocache.OCache
@ -224,6 +238,10 @@ type mockTreeManager struct {
markedIds []string markedIds []string
} }
func (t *mockTreeManager) NewTreeSyncer(spaceId string, treeManager treemanager.TreeManager) treemanager.TreeSyncer {
return noOpSyncer{}
}
func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error {
t.markedIds = append(t.markedIds, treeId) t.markedIds = append(t.markedIds, treeId)
return nil return nil

View File

@ -6,33 +6,38 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// newExecPool creates new execPool // NewExecPool creates new ExecPool
// workers - how many processes will execute tasks // workers - how many processes will execute tasks
// maxSize - limit for queue size // maxSize - limit for queue size
func newExecPool(workers, maxSize int) *execPool { func NewExecPool(workers, maxSize int) *ExecPool {
ss := &execPool{ ss := &ExecPool{
batch: mb.New[func()](maxSize), workers: workers,
} batch: mb.New[func()](maxSize),
for i := 0; i < workers; i++ {
go ss.sendLoop()
} }
return ss return ss
} }
// execPool needed for parallel execution of the incoming send tasks // ExecPool needed for parallel execution of the incoming send tasks
type execPool struct { type ExecPool struct {
batch *mb.MB[func()] workers int
batch *mb.MB[func()]
} }
func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) { func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) {
return ss.batch.Add(ctx, f...) return ss.batch.Add(ctx, f...)
} }
func (ss *execPool) TryAdd(f ...func()) (err error) { func (ss *ExecPool) TryAdd(f ...func()) (err error) {
return ss.batch.TryAdd(f...) return ss.batch.TryAdd(f...)
} }
func (ss *execPool) sendLoop() { func (ss *ExecPool) Run() {
for i := 0; i < ss.workers; i++ {
go ss.sendLoop()
}
}
func (ss *ExecPool) sendLoop() {
for { for {
f, err := ss.batch.WaitOne(context.Background()) f, err := ss.batch.WaitOne(context.Background())
if err != nil { if err != nil {
@ -43,6 +48,6 @@ func (ss *execPool) sendLoop() {
} }
} }
func (ss *execPool) Close() (err error) { func (ss *ExecPool) Close() (err error) {
return ss.batch.Close() return ss.batch.Close()
} }

View File

@ -58,7 +58,7 @@ type streamPool struct {
streamIdsByTag map[string][]uint32 streamIdsByTag map[string][]uint32
streams map[uint32]*stream streams map[uint32]*stream
opening map[string]*openingProcess opening map[string]*openingProcess
dial *execPool dial *ExecPool
mu sync.Mutex mu sync.Mutex
writeQueueSize int writeQueueSize int
lastStreamId uint32 lastStreamId uint32

View File

@ -33,6 +33,7 @@ type service struct {
} }
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize)
sp := &streamPool{ sp := &streamPool{
handler: h, handler: h,
writeQueueSize: conf.SendQueueSize, writeQueueSize: conf.SendQueueSize,
@ -40,8 +41,9 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
streamIdsByTag: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{}, streams: map[uint32]*stream{},
opening: map[string]*openingProcess{}, opening: map[string]*openingProcess{},
dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), dial: pl,
} }
pl.Run()
if s.metric != nil { if s.metric != nil {
registerMetrics(s.metric.Registry(), sp, "") registerMetrics(s.metric.Registry(), sp, "")
} }