diff --git a/commonspace/object/treemanager/treesyncer.go b/commonspace/object/treemanager/treesyncer.go deleted file mode 100644 index de47c439..00000000 --- a/commonspace/object/treemanager/treesyncer.go +++ /dev/null @@ -1,157 +0,0 @@ -package treemanager - -import ( - "context" - "github.com/anyproto/any-sync/app/logger" - "github.com/anyproto/any-sync/commonspace/object/tree/synctree" - "github.com/anyproto/any-sync/net/peer" - "github.com/anyproto/any-sync/net/streampool" - "go.uber.org/zap" - "sync" - "time" -) - -type executor struct { - pool *streampool.ExecPool - objs map[string]struct{} - sync.Mutex -} - -func newExecutor(workers, size int) *executor { - return &executor{ - pool: streampool.NewExecPool(workers, size), - objs: map[string]struct{}{}, - } -} - -func (e *executor) tryAdd(id string, action func()) (err error) { - if _, exists := e.objs[id]; exists { - return nil - } - e.Lock() - defer e.Unlock() - e.objs[id] = struct{}{} - return e.pool.TryAdd(func() { - action() - e.Lock() - defer e.Unlock() - delete(e.objs, id) - }) -} - -func (e *executor) close() { - e.pool.Close() -} - -type treeSyncer struct { - sync.Mutex - log logger.CtxLogger - size int - requests int - spaceId string - timeout time.Duration - requestPools map[string]*executor - headPools map[string]*executor - treeManager TreeManager - isRunning bool -} - -func NewTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager TreeManager, log logger.CtxLogger) TreeSyncer { - return &treeSyncer{ - log: log, - requests: concurrentReqs, - spaceId: spaceId, - timeout: timeout, - requestPools: map[string]*executor{}, - headPools: map[string]*executor{}, - treeManager: treeManager, - } -} - -func (t *treeSyncer) Init() { - t.Lock() - defer t.Unlock() - t.isRunning = true -} - -func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { - t.Lock() - defer t.Unlock() - if !t.isRunning { - return nil - } - reqExec, exists := t.requestPools[peerId] - if !exists { - reqExec = newExecutor(t.requests, t.size) - t.requestPools[peerId] = reqExec - } - headExec, exists := t.headPools[peerId] - if !exists { - headExec = newExecutor(1, t.size) - t.requestPools[peerId] = headExec - } - for _, id := range existing { - err := headExec.tryAdd(id, func() { - t.updateTree(peerId, id) - }) - if err != nil { - t.log.Error("failed to add to head queue", zap.Error(err)) - } - } - for _, id := range missing { - err := reqExec.tryAdd(id, func() { - t.requestTree(peerId, id) - }) - if err != nil { - t.log.Error("failed to add to request queue", zap.Error(err)) - } - } - return nil -} - -func (t *treeSyncer) requestTree(peerId, id string) { - log := t.log.With(zap.String("treeId", id)) - ctx := peer.CtxWithPeerId(context.Background(), peerId) - ctx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - _, err := t.treeManager.GetTree(ctx, t.spaceId, id) - if err != nil { - log.WarnCtx(ctx, "can't load missing tree", zap.Error(err)) - } else { - log.DebugCtx(ctx, "loaded missing tree") - } -} - -func (t *treeSyncer) updateTree(peerId, id string) { - log := t.log.With(zap.String("treeId", id)) - ctx := peer.CtxWithPeerId(context.Background(), peerId) - ctx, cancel := context.WithTimeout(ctx, t.timeout) - defer cancel() - tr, err := t.treeManager.GetTree(ctx, t.spaceId, id) - if err != nil { - log.WarnCtx(ctx, "can't load existing tree", zap.Error(err)) - return - } - syncTree, ok := tr.(synctree.SyncTree) - if !ok { - log.WarnCtx(ctx, "not a sync tree") - } - if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { - log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err)) - } else { - log.DebugCtx(ctx, "success synctree.SyncWithPeer") - } -} - -func (t *treeSyncer) Close() error { - t.Lock() - defer t.Unlock() - t.isRunning = false - for _, pool := range t.headPools { - pool.close() - } - for _, pool := range t.requestPools { - pool.close() - } - return nil -} diff --git a/commonspace/spaceutils_test.go b/commonspace/spaceutils_test.go index 8ff77e0c..0fc0977f 100644 --- a/commonspace/spaceutils_test.go +++ b/commonspace/spaceutils_test.go @@ -217,6 +217,20 @@ func (m *mockConfig) GetSpace() Config { // Mock TreeManager // +type noOpSyncer struct { +} + +func (n noOpSyncer) Init() { +} + +func (n noOpSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error { + return nil +} + +func (n noOpSyncer) Close() error { + return nil +} + type mockTreeManager struct { space Space cache ocache.OCache @@ -225,7 +239,7 @@ type mockTreeManager struct { } func (t *mockTreeManager) NewTreeSyncer(spaceId string) treemanager.TreeSyncer { - return treemanager.NewTreeSyncer(spaceId, time.Second, 10, t, log) + return noOpSyncer{} } func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index 6071778b..0bff0765 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -11,17 +11,16 @@ import ( // maxSize - limit for queue size func NewExecPool(workers, maxSize int) *ExecPool { ss := &ExecPool{ - batch: mb.New[func()](maxSize), - } - for i := 0; i < workers; i++ { - go ss.sendLoop() + workers: workers, + batch: mb.New[func()](maxSize), } return ss } // ExecPool needed for parallel execution of the incoming send tasks type ExecPool struct { - batch *mb.MB[func()] + workers int + batch *mb.MB[func()] } func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) { @@ -32,6 +31,12 @@ func (ss *ExecPool) TryAdd(f ...func()) (err error) { return ss.batch.TryAdd(f...) } +func (ss *ExecPool) Run() { + for i := 0; i < ss.workers; i++ { + go ss.sendLoop() + } +} + func (ss *ExecPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 220d6177..a5bb8f62 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -33,6 +33,7 @@ type service struct { } func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { + pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize) sp := &streamPool{ handler: h, writeQueueSize: conf.SendQueueSize, @@ -40,8 +41,9 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - dial: NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize), + dial: pl, } + pl.Run() if s.metric != nil { registerMetrics(s.metric.Registry(), sp, "") }