diff --git a/common/commonspace/objectsync/actionqueue.go b/common/commonspace/objectsync/actionqueue.go index 8c677478..65d5e3af 100644 --- a/common/commonspace/objectsync/actionqueue.go +++ b/common/commonspace/objectsync/actionqueue.go @@ -21,6 +21,10 @@ type actionQueue struct { readers chan struct{} } +func NewDefaultActionQueue() ActionQueue { + return NewActionQueue(10, 200) +} + func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue { return &actionQueue{ batcher: mb.New[ActionFunc](maxQueueLen), @@ -37,7 +41,7 @@ func (q *actionQueue) Send(action ActionFunc) (err error) { } log.With(zap.Error(err)).Debug("queue returned error") actions := q.batcher.GetAll() - actions = actions[len(actions)/2:] + actions = append(actions[len(actions)/2:], action) return q.batcher.Add(context.Background(), actions...) } @@ -66,6 +70,7 @@ func (q *actionQueue) startReading() { } func (q *actionQueue) Close() { + log.Debug("closing the queue") q.batcher.Close() for i := 0; i < q.maxReaders; i++ { <-q.readers diff --git a/common/commonspace/objectsync/actionqueue_test.go b/common/commonspace/objectsync/actionqueue_test.go new file mode 100644 index 00000000..eef4a952 --- /dev/null +++ b/common/commonspace/objectsync/actionqueue_test.go @@ -0,0 +1,54 @@ +package objectsync + +import ( + "fmt" + "github.com/stretchr/testify/require" + "sync/atomic" + "testing" +) + +func TestActionQueue_Send(t *testing.T) { + maxReaders := 41 + maxLen := 93 + + queue := NewActionQueue(maxReaders, maxLen).(*actionQueue) + counter := atomic.Int32{} + expectedCounter := int32(maxReaders + (maxLen+1)/2 + 1) + blocker := make(chan struct{}, expectedCounter) + waiter := make(chan struct{}, expectedCounter) + increase := func() error { + counter.Add(1) + waiter <- struct{}{} + <-blocker + return nil + } + + queue.Run() + // sending maxReaders messages, so the goroutines will block on `blocker` channel + for i := 0; i < maxReaders; i++ { + queue.Send(increase) + } + // waiting until they all make progress + for i := 0; i < maxReaders; i++ { + <-waiter + } + fmt.Println(counter.Load()) + // check that queue is empty + require.Equal(t, queue.batcher.Len(), 0) + // making queue to overflow while readers are blocked + for i := 0; i < maxLen+1; i++ { + queue.Send(increase) + } + // check that queue was halved after overflow + require.Equal(t, (maxLen+1)/2+1, queue.batcher.Len()) + // unblocking maxReaders waiting + then we should also unblock the new readers to do a bit more readings + for i := 0; i < int(expectedCounter); i++ { + blocker <- struct{}{} + } + // waiting for all readers to finish adding + for i := 0; i < int(expectedCounter)-maxReaders; i++ { + <-waiter + } + queue.Close() + require.Equal(t, expectedCounter, counter.Load()) +} diff --git a/common/commonspace/objectsync/objectsync.go b/common/commonspace/objectsync/objectsync.go index f9d1fd86..b3c482a6 100644 --- a/common/commonspace/objectsync/objectsync.go +++ b/common/commonspace/objectsync/objectsync.go @@ -86,7 +86,7 @@ func newObjectSync( checker: checker, syncCtx: syncCtx, cancelSync: cancel, - actionQueue: NewActionQueue(maxStreamReaders, 100), + actionQueue: NewDefaultActionQueue(), } } diff --git a/common/commonspace/objectsync/streampool.go b/common/commonspace/objectsync/streampool.go index 46689ab8..891c5f5f 100644 --- a/common/commonspace/objectsync/streampool.go +++ b/common/commonspace/objectsync/streampool.go @@ -248,7 +248,7 @@ func (s *streamPool) Close() (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { var ( log = log.With(zap.String("peerId", peerId)) - queue = NewActionQueue(maxStreamReaders, 100) + queue = NewDefaultActionQueue() ) queue.Run()