Update action queue
This commit is contained in:
parent
a103c960a4
commit
b5dc56d72c
@ -15,79 +15,59 @@ type ActionQueue interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type actionQueue struct {
|
type actionQueue struct {
|
||||||
batcher *mb.MB[ActionFunc]
|
batcher *mb.MB[ActionFunc]
|
||||||
ctx context.Context
|
maxReaders int
|
||||||
cancel context.CancelFunc
|
maxQueueLen int
|
||||||
queueDone chan struct{}
|
readers chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewActionQueue() ActionQueue {
|
func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue {
|
||||||
return &actionQueue{
|
return &actionQueue{
|
||||||
batcher: mb.New[ActionFunc](0),
|
batcher: mb.New[ActionFunc](maxQueueLen),
|
||||||
ctx: nil,
|
maxReaders: maxReaders,
|
||||||
cancel: nil,
|
maxQueueLen: maxQueueLen,
|
||||||
queueDone: make(chan struct{}),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *actionQueue) Send(action ActionFunc) (err error) {
|
func (q *actionQueue) Send(action ActionFunc) (err error) {
|
||||||
log.Debug("adding action to batcher")
|
log.Debug("adding action to batcher")
|
||||||
return q.batcher.Add(q.ctx, action)
|
err = q.batcher.TryAdd(action)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.With(zap.Error(err)).Debug("queue returned error")
|
||||||
|
actions := q.batcher.GetAll()
|
||||||
|
actions = actions[len(actions)/2:]
|
||||||
|
return q.batcher.Add(context.Background(), actions...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *actionQueue) Run() {
|
func (q *actionQueue) Run() {
|
||||||
log.Debug("running the queue")
|
log.Debug("running the queue")
|
||||||
q.ctx, q.cancel = context.WithCancel(context.Background())
|
q.readers = make(chan struct{}, q.maxReaders)
|
||||||
go q.read()
|
for i := 0; i < q.maxReaders; i++ {
|
||||||
|
go q.startReading()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *actionQueue) read() {
|
func (q *actionQueue) startReading() {
|
||||||
limiter := make(chan struct{}, maxStreamReaders)
|
|
||||||
for i := 0; i < maxStreamReaders; i++ {
|
|
||||||
limiter <- struct{}{}
|
|
||||||
}
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// wait until all operations are done
|
q.readers <- struct{}{}
|
||||||
for i := 0; i < maxStreamReaders; i++ {
|
|
||||||
<-limiter
|
|
||||||
}
|
|
||||||
close(q.queueDone)
|
|
||||||
}()
|
}()
|
||||||
doSendActions := func() {
|
|
||||||
actions, err := q.batcher.Wait(q.ctx)
|
|
||||||
log.Debug("reading from batcher")
|
|
||||||
if err != nil {
|
|
||||||
log.With(zap.Error(err)).Error("queue finished")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, msg := range actions {
|
|
||||||
select {
|
|
||||||
case <-q.ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
<-limiter
|
|
||||||
go func(action ActionFunc) {
|
|
||||||
err = action()
|
|
||||||
if err != nil {
|
|
||||||
log.With(zap.Error(err)).Debug("action errored out")
|
|
||||||
}
|
|
||||||
limiter <- struct{}{}
|
|
||||||
}(msg)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
select {
|
action, err := q.batcher.WaitOne(context.Background())
|
||||||
case <-q.ctx.Done():
|
if err != nil {
|
||||||
return
|
return
|
||||||
default:
|
}
|
||||||
doSendActions()
|
err = action()
|
||||||
|
if err != nil {
|
||||||
|
log.With(zap.Error(err)).Debug("action errored out")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (q *actionQueue) Close() {
|
func (q *actionQueue) Close() {
|
||||||
q.cancel()
|
q.batcher.Close()
|
||||||
<-q.queueDone
|
for i := 0; i < q.maxReaders; i++ {
|
||||||
|
<-q.readers
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -86,7 +86,7 @@ func newObjectSync(
|
|||||||
checker: checker,
|
checker: checker,
|
||||||
syncCtx: syncCtx,
|
syncCtx: syncCtx,
|
||||||
cancelSync: cancel,
|
cancelSync: cancel,
|
||||||
actionQueue: NewActionQueue(),
|
actionQueue: NewActionQueue(maxStreamReaders, 100),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -248,7 +248,7 @@ func (s *streamPool) Close() (err error) {
|
|||||||
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||||
var (
|
var (
|
||||||
log = log.With(zap.String("peerId", peerId))
|
log = log.With(zap.String("peerId", peerId))
|
||||||
queue = NewActionQueue()
|
queue = NewActionQueue(maxStreamReaders, 100)
|
||||||
)
|
)
|
||||||
queue.Run()
|
queue.Run()
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user