Add queue test

This commit is contained in:
mcrakhman 2022-12-29 15:16:30 +01:00
parent 8eafaa266d
commit a855f02f63
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 62 additions and 3 deletions

View File

@ -21,6 +21,10 @@ type actionQueue struct {
readers chan struct{}
}
func NewDefaultActionQueue() ActionQueue {
return NewActionQueue(10, 200)
}
func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue {
return &actionQueue{
batcher: mb.New[ActionFunc](maxQueueLen),
@ -37,7 +41,7 @@ func (q *actionQueue) Send(action ActionFunc) (err error) {
}
log.With(zap.Error(err)).Debug("queue returned error")
actions := q.batcher.GetAll()
actions = actions[len(actions)/2:]
actions = append(actions[len(actions)/2:], action)
return q.batcher.Add(context.Background(), actions...)
}
@ -66,6 +70,7 @@ func (q *actionQueue) startReading() {
}
func (q *actionQueue) Close() {
log.Debug("closing the queue")
q.batcher.Close()
for i := 0; i < q.maxReaders; i++ {
<-q.readers

View File

@ -0,0 +1,54 @@
package objectsync
import (
"fmt"
"github.com/stretchr/testify/require"
"sync/atomic"
"testing"
)
func TestActionQueue_Send(t *testing.T) {
maxReaders := 41
maxLen := 93
queue := NewActionQueue(maxReaders, maxLen).(*actionQueue)
counter := atomic.Int32{}
expectedCounter := int32(maxReaders + (maxLen+1)/2 + 1)
blocker := make(chan struct{}, expectedCounter)
waiter := make(chan struct{}, expectedCounter)
increase := func() error {
counter.Add(1)
waiter <- struct{}{}
<-blocker
return nil
}
queue.Run()
// sending maxReaders messages, so the goroutines will block on `blocker` channel
for i := 0; i < maxReaders; i++ {
queue.Send(increase)
}
// waiting until they all make progress
for i := 0; i < maxReaders; i++ {
<-waiter
}
fmt.Println(counter.Load())
// check that queue is empty
require.Equal(t, queue.batcher.Len(), 0)
// making queue to overflow while readers are blocked
for i := 0; i < maxLen+1; i++ {
queue.Send(increase)
}
// check that queue was halved after overflow
require.Equal(t, (maxLen+1)/2+1, queue.batcher.Len())
// unblocking maxReaders waiting + then we should also unblock the new readers to do a bit more readings
for i := 0; i < int(expectedCounter); i++ {
blocker <- struct{}{}
}
// waiting for all readers to finish adding
for i := 0; i < int(expectedCounter)-maxReaders; i++ {
<-waiter
}
queue.Close()
require.Equal(t, expectedCounter, counter.Load())
}

View File

@ -86,7 +86,7 @@ func newObjectSync(
checker: checker,
syncCtx: syncCtx,
cancelSync: cancel,
actionQueue: NewActionQueue(maxStreamReaders, 100),
actionQueue: NewDefaultActionQueue(),
}
}

View File

@ -248,7 +248,7 @@ func (s *streamPool) Close() (err error) {
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
var (
log = log.With(zap.String("peerId", peerId))
queue = NewActionQueue(maxStreamReaders, 100)
queue = NewDefaultActionQueue()
)
queue.Run()