From 8afb4b5c1feb83afeefe698d6c25ab7c4f15c540 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 29 Dec 2022 13:45:55 +0100 Subject: [PATCH] Update action queue --- common/commonspace/objectsync/actionqueue.go | 84 ++++++++------------ common/commonspace/objectsync/objectsync.go | 2 +- common/commonspace/objectsync/streampool.go | 2 +- 3 files changed, 34 insertions(+), 54 deletions(-) diff --git a/common/commonspace/objectsync/actionqueue.go b/common/commonspace/objectsync/actionqueue.go index 98409978..8c677478 100644 --- a/common/commonspace/objectsync/actionqueue.go +++ b/common/commonspace/objectsync/actionqueue.go @@ -15,79 +15,59 @@ type ActionQueue interface { } type actionQueue struct { - batcher *mb.MB[ActionFunc] - ctx context.Context - cancel context.CancelFunc - queueDone chan struct{} + batcher *mb.MB[ActionFunc] + maxReaders int + maxQueueLen int + readers chan struct{} } -func NewActionQueue() ActionQueue { +func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue { return &actionQueue{ - batcher: mb.New[ActionFunc](0), - ctx: nil, - cancel: nil, - queueDone: make(chan struct{}), + batcher: mb.New[ActionFunc](maxQueueLen), + maxReaders: maxReaders, + maxQueueLen: maxQueueLen, } } func (q *actionQueue) Send(action ActionFunc) (err error) { log.Debug("adding action to batcher") - return q.batcher.Add(q.ctx, action) + err = q.batcher.TryAdd(action) + if err == nil { + return + } + log.With(zap.Error(err)).Debug("queue returned error") + actions := q.batcher.GetAll() + actions = actions[len(actions)/2:] + return q.batcher.Add(context.Background(), actions...) } func (q *actionQueue) Run() { log.Debug("running the queue") - q.ctx, q.cancel = context.WithCancel(context.Background()) - go q.read() + q.readers = make(chan struct{}, q.maxReaders) + for i := 0; i < q.maxReaders; i++ { + go q.startReading() + } } -func (q *actionQueue) read() { - limiter := make(chan struct{}, maxStreamReaders) - for i := 0; i < maxStreamReaders; i++ { - limiter <- struct{}{} - } +func (q *actionQueue) startReading() { defer func() { - // wait until all operations are done - for i := 0; i < maxStreamReaders; i++ { - <-limiter - } - close(q.queueDone) + q.readers <- struct{}{} }() - doSendActions := func() { - actions, err := q.batcher.Wait(q.ctx) - log.Debug("reading from batcher") - if err != nil { - log.With(zap.Error(err)).Error("queue finished") - return - } - for _, msg := range actions { - select { - case <-q.ctx.Done(): - return - default: - } - - <-limiter - go func(action ActionFunc) { - err = action() - if err != nil { - log.With(zap.Error(err)).Debug("action errored out") - } - limiter <- struct{}{} - }(msg) - } - } for { - select { - case <-q.ctx.Done(): + action, err := q.batcher.WaitOne(context.Background()) + if err != nil { return - default: - doSendActions() + } + err = action() + if err != nil { + log.With(zap.Error(err)).Debug("action errored out") } } } func (q *actionQueue) Close() { - q.cancel() - <-q.queueDone + q.batcher.Close() + for i := 0; i < q.maxReaders; i++ { + <-q.readers + } } diff --git a/common/commonspace/objectsync/objectsync.go b/common/commonspace/objectsync/objectsync.go index 097706c3..f9d1fd86 100644 --- a/common/commonspace/objectsync/objectsync.go +++ b/common/commonspace/objectsync/objectsync.go @@ -86,7 +86,7 @@ func newObjectSync( checker: checker, syncCtx: syncCtx, cancelSync: cancel, - actionQueue: NewActionQueue(), + actionQueue: NewActionQueue(maxStreamReaders, 100), } } diff --git a/common/commonspace/objectsync/streampool.go b/common/commonspace/objectsync/streampool.go index 0208e253..46689ab8 100644 --- a/common/commonspace/objectsync/streampool.go +++ b/common/commonspace/objectsync/streampool.go @@ -248,7 +248,7 @@ func (s *streamPool) Close() (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { var ( log = log.With(zap.String("peerId", peerId)) - queue = NewActionQueue() + queue = NewActionQueue(maxStreamReaders, 100) ) queue.Run()