Expose more methods in exec pool

This commit is contained in:
mcrakhman 2023-05-26 11:29:21 +02:00
parent f0a3edd798
commit 556f03ed7f
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 28 additions and 164 deletions

View File

@ -1,157 +0,0 @@
package treemanager
import (
"context"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/commonspace/object/tree/synctree"
"github.com/anyproto/any-sync/net/peer"
"github.com/anyproto/any-sync/net/streampool"
"go.uber.org/zap"
"sync"
"time"
)
type executor struct {
pool *streampool.ExecPool
objs map[string]struct{}
sync.Mutex
}
func newExecutor(workers, size int) *executor {
return &executor{
pool: streampool.NewExecPool(workers, size),
objs: map[string]struct{}{},
}
}
func (e *executor) tryAdd(id string, action func()) (err error) {
if _, exists := e.objs[id]; exists {
return nil
}
e.Lock()
defer e.Unlock()
e.objs[id] = struct{}{}
return e.pool.TryAdd(func() {
action()
e.Lock()
defer e.Unlock()
delete(e.objs, id)
})
}
func (e *executor) close() {
e.pool.Close()
}
type treeSyncer struct {
sync.Mutex
log logger.CtxLogger
size int
requests int
spaceId string
timeout time.Duration
requestPools map[string]*executor
headPools map[string]*executor
treeManager TreeManager
isRunning bool
}
func NewTreeSyncer(spaceId string, timeout time.Duration, concurrentReqs int, treeManager TreeManager, log logger.CtxLogger) TreeSyncer {
return &treeSyncer{
log: log,
requests: concurrentReqs,
spaceId: spaceId,
timeout: timeout,
requestPools: map[string]*executor{},
headPools: map[string]*executor{},
treeManager: treeManager,
}
}
func (t *treeSyncer) Init() {
t.Lock()
defer t.Unlock()
t.isRunning = true
}
func (t *treeSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
t.Lock()
defer t.Unlock()
if !t.isRunning {
return nil
}
reqExec, exists := t.requestPools[peerId]
if !exists {
reqExec = newExecutor(t.requests, t.size)
t.requestPools[peerId] = reqExec
}
headExec, exists := t.headPools[peerId]
if !exists {
headExec = newExecutor(1, t.size)
t.requestPools[peerId] = headExec
}
for _, id := range existing {
err := headExec.tryAdd(id, func() {
t.updateTree(peerId, id)
})
if err != nil {
t.log.Error("failed to add to head queue", zap.Error(err))
}
}
for _, id := range missing {
err := reqExec.tryAdd(id, func() {
t.requestTree(peerId, id)
})
if err != nil {
t.log.Error("failed to add to request queue", zap.Error(err))
}
}
return nil
}
func (t *treeSyncer) requestTree(peerId, id string) {
log := t.log.With(zap.String("treeId", id))
ctx := peer.CtxWithPeerId(context.Background(), peerId)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
_, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.WarnCtx(ctx, "can't load missing tree", zap.Error(err))
} else {
log.DebugCtx(ctx, "loaded missing tree")
}
}
func (t *treeSyncer) updateTree(peerId, id string) {
log := t.log.With(zap.String("treeId", id))
ctx := peer.CtxWithPeerId(context.Background(), peerId)
ctx, cancel := context.WithTimeout(ctx, t.timeout)
defer cancel()
tr, err := t.treeManager.GetTree(ctx, t.spaceId, id)
if err != nil {
log.WarnCtx(ctx, "can't load existing tree", zap.Error(err))
return
}
syncTree, ok := tr.(synctree.SyncTree)
if !ok {
log.WarnCtx(ctx, "not a sync tree")
}
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err))
} else {
log.DebugCtx(ctx, "success synctree.SyncWithPeer")
}
}
func (t *treeSyncer) Close() error {
t.Lock()
defer t.Unlock()
t.isRunning = false
for _, pool := range t.headPools {
pool.close()
}
for _, pool := range t.requestPools {
pool.close()
}
return nil
}

View File

@ -217,6 +217,20 @@ func (m *mockConfig) GetSpace() Config {
// Mock TreeManager // Mock TreeManager
// //
type noOpSyncer struct {
}
func (n noOpSyncer) Init() {
}
func (n noOpSyncer) SyncAll(ctx context.Context, peerId string, existing, missing []string) error {
return nil
}
func (n noOpSyncer) Close() error {
return nil
}
type mockTreeManager struct { type mockTreeManager struct {
space Space space Space
cache ocache.OCache cache ocache.OCache
@ -225,7 +239,7 @@ type mockTreeManager struct {
} }
func (t *mockTreeManager) NewTreeSyncer(spaceId string) treemanager.TreeSyncer { func (t *mockTreeManager) NewTreeSyncer(spaceId string) treemanager.TreeSyncer {
return treemanager.NewTreeSyncer(spaceId, time.Second, 10, t, log) return noOpSyncer{}
} }
func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error { func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error {

View File

@ -11,17 +11,16 @@ import (
// maxSize - limit for queue size // maxSize - limit for queue size
func NewExecPool(workers, maxSize int) *ExecPool { func NewExecPool(workers, maxSize int) *ExecPool {
ss := &ExecPool{ ss := &ExecPool{
batch: mb.New[func()](maxSize), workers: workers,
} batch: mb.New[func()](maxSize),
for i := 0; i < workers; i++ {
go ss.sendLoop()
} }
return ss return ss
} }
// ExecPool needed for parallel execution of the incoming send tasks // ExecPool needed for parallel execution of the incoming send tasks
type ExecPool struct { type ExecPool struct {
batch *mb.MB[func()] workers int
batch *mb.MB[func()]
} }
func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) { func (ss *ExecPool) Add(ctx context.Context, f ...func()) (err error) {
@ -32,6 +31,12 @@ func (ss *ExecPool) TryAdd(f ...func()) (err error) {
return ss.batch.TryAdd(f...) return ss.batch.TryAdd(f...)
} }
func (ss *ExecPool) Run() {
for i := 0; i < ss.workers; i++ {
go ss.sendLoop()
}
}
func (ss *ExecPool) sendLoop() { func (ss *ExecPool) sendLoop() {
for { for {
f, err := ss.batch.WaitOne(context.Background()) f, err := ss.batch.WaitOne(context.Background())

View File

@ -33,6 +33,7 @@ type service struct {
} }
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize)
sp := &streamPool{ sp := &streamPool{
handler: h, handler: h,
writeQueueSize: conf.SendQueueSize, writeQueueSize: conf.SendQueueSize,
@ -40,8 +41,9 @@ func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
streamIdsByTag: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{}, streams: map[uint32]*stream{},
opening: map[string]*openingProcess{}, opening: map[string]*openingProcess{},
dial: NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize), dial: pl,
} }
pl.Run()
if s.metric != nil { if s.metric != nil {
registerMetrics(s.metric.Registry(), sp, "") registerMetrics(s.metric.Registry(), sp, "")
} }