Update action queue

This commit is contained in:
mcrakhman 2022-12-29 13:45:55 +01:00 committed by Mikhail Iudin
parent bf146e6676
commit 8afb4b5c1f
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
3 changed files with 34 additions and 54 deletions

View File

@ -15,79 +15,59 @@ type ActionQueue interface {
}
type actionQueue struct {
batcher *mb.MB[ActionFunc]
ctx context.Context
cancel context.CancelFunc
queueDone chan struct{}
batcher *mb.MB[ActionFunc]
maxReaders int
maxQueueLen int
readers chan struct{}
}
func NewActionQueue() ActionQueue {
func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue {
return &actionQueue{
batcher: mb.New[ActionFunc](0),
ctx: nil,
cancel: nil,
queueDone: make(chan struct{}),
batcher: mb.New[ActionFunc](maxQueueLen),
maxReaders: maxReaders,
maxQueueLen: maxQueueLen,
}
}
func (q *actionQueue) Send(action ActionFunc) (err error) {
log.Debug("adding action to batcher")
return q.batcher.Add(q.ctx, action)
err = q.batcher.TryAdd(action)
if err == nil {
return
}
log.With(zap.Error(err)).Debug("queue returned error")
actions := q.batcher.GetAll()
actions = actions[len(actions)/2:]
return q.batcher.Add(context.Background(), actions...)
}
func (q *actionQueue) Run() {
log.Debug("running the queue")
q.ctx, q.cancel = context.WithCancel(context.Background())
go q.read()
q.readers = make(chan struct{}, q.maxReaders)
for i := 0; i < q.maxReaders; i++ {
go q.startReading()
}
}
func (q *actionQueue) read() {
limiter := make(chan struct{}, maxStreamReaders)
for i := 0; i < maxStreamReaders; i++ {
limiter <- struct{}{}
}
func (q *actionQueue) startReading() {
defer func() {
// wait until all operations are done
for i := 0; i < maxStreamReaders; i++ {
<-limiter
}
close(q.queueDone)
q.readers <- struct{}{}
}()
doSendActions := func() {
actions, err := q.batcher.Wait(q.ctx)
log.Debug("reading from batcher")
if err != nil {
log.With(zap.Error(err)).Error("queue finished")
return
}
for _, msg := range actions {
select {
case <-q.ctx.Done():
return
default:
}
<-limiter
go func(action ActionFunc) {
err = action()
if err != nil {
log.With(zap.Error(err)).Debug("action errored out")
}
limiter <- struct{}{}
}(msg)
}
}
for {
select {
case <-q.ctx.Done():
action, err := q.batcher.WaitOne(context.Background())
if err != nil {
return
default:
doSendActions()
}
err = action()
if err != nil {
log.With(zap.Error(err)).Debug("action errored out")
}
}
}
func (q *actionQueue) Close() {
q.cancel()
<-q.queueDone
q.batcher.Close()
for i := 0; i < q.maxReaders; i++ {
<-q.readers
}
}

View File

@ -86,7 +86,7 @@ func newObjectSync(
checker: checker,
syncCtx: syncCtx,
cancelSync: cancel,
actionQueue: NewActionQueue(),
actionQueue: NewActionQueue(maxStreamReaders, 100),
}
}

View File

@ -248,7 +248,7 @@ func (s *streamPool) Close() (err error) {
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
var (
log = log.With(zap.String("peerId", peerId))
queue = NewActionQueue()
queue = NewActionQueue(maxStreamReaders, 100)
)
queue.Run()