79 lines
1.5 KiB
Go
79 lines
1.5 KiB
Go
package objectsync
|
|
|
|
import (
|
|
"context"
|
|
"github.com/cheggaaa/mb/v3"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
type ActionFunc func() error
|
|
|
|
type ActionQueue interface {
|
|
Send(action ActionFunc) (err error)
|
|
Run()
|
|
Close()
|
|
}
|
|
|
|
type actionQueue struct {
|
|
batcher *mb.MB[ActionFunc]
|
|
maxReaders int
|
|
maxQueueLen int
|
|
readers chan struct{}
|
|
}
|
|
|
|
func NewDefaultActionQueue() ActionQueue {
|
|
return NewActionQueue(10, 200)
|
|
}
|
|
|
|
func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue {
|
|
return &actionQueue{
|
|
batcher: mb.New[ActionFunc](maxQueueLen),
|
|
maxReaders: maxReaders,
|
|
maxQueueLen: maxQueueLen,
|
|
}
|
|
}
|
|
|
|
func (q *actionQueue) Send(action ActionFunc) (err error) {
|
|
log.Debug("adding action to batcher")
|
|
err = q.batcher.TryAdd(action)
|
|
if err == nil {
|
|
return
|
|
}
|
|
log.With(zap.Error(err)).Debug("queue returned error")
|
|
actions := q.batcher.GetAll()
|
|
actions = append(actions[len(actions)/2:], action)
|
|
return q.batcher.Add(context.Background(), actions...)
|
|
}
|
|
|
|
func (q *actionQueue) Run() {
|
|
log.Debug("running the queue")
|
|
q.readers = make(chan struct{}, q.maxReaders)
|
|
for i := 0; i < q.maxReaders; i++ {
|
|
go q.startReading()
|
|
}
|
|
}
|
|
|
|
func (q *actionQueue) startReading() {
|
|
defer func() {
|
|
q.readers <- struct{}{}
|
|
}()
|
|
for {
|
|
action, err := q.batcher.WaitOne(context.Background())
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = action()
|
|
if err != nil {
|
|
log.With(zap.Error(err)).Debug("action errored out")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (q *actionQueue) Close() {
|
|
log.Debug("closing the queue")
|
|
q.batcher.Close()
|
|
for i := 0; i < q.maxReaders; i++ {
|
|
<-q.readers
|
|
}
|
|
}
|