Add queue test

This commit is contained in:
mcrakhman 2022-12-29 15:16:30 +01:00 committed by Mikhail Iudin
parent 021342bfe9
commit 5f62fc7e88
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
4 changed files with 108 additions and 40 deletions

View File

@ -21,6 +21,10 @@ type actionQueue struct {
readers chan struct{} readers chan struct{}
} }
func NewDefaultActionQueue() ActionQueue {
return NewActionQueue(10, 200)
}
func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue { func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue {
return &actionQueue{ return &actionQueue{
batcher: mb.New[ActionFunc](maxQueueLen), batcher: mb.New[ActionFunc](maxQueueLen),
@ -37,7 +41,7 @@ func (q *actionQueue) Send(action ActionFunc) (err error) {
} }
log.With(zap.Error(err)).Debug("queue returned error") log.With(zap.Error(err)).Debug("queue returned error")
actions := q.batcher.GetAll() actions := q.batcher.GetAll()
actions = actions[len(actions)/2:] actions = append(actions[len(actions)/2:], action)
return q.batcher.Add(context.Background(), actions...) return q.batcher.Add(context.Background(), actions...)
} }
@ -66,6 +70,7 @@ func (q *actionQueue) startReading() {
} }
func (q *actionQueue) Close() { func (q *actionQueue) Close() {
log.Debug("closing the queue")
q.batcher.Close() q.batcher.Close()
for i := 0; i < q.maxReaders; i++ { for i := 0; i < q.maxReaders; i++ {
<-q.readers <-q.readers

View File

@ -0,0 +1,54 @@
package objectsync
import (
"fmt"
"github.com/stretchr/testify/require"
"sync/atomic"
"testing"
)
func TestActionQueue_Send(t *testing.T) {
maxReaders := 41
maxLen := 93
queue := NewActionQueue(maxReaders, maxLen).(*actionQueue)
counter := atomic.Int32{}
expectedCounter := int32(maxReaders + (maxLen+1)/2 + 1)
blocker := make(chan struct{}, expectedCounter)
waiter := make(chan struct{}, expectedCounter)
increase := func() error {
counter.Add(1)
waiter <- struct{}{}
<-blocker
return nil
}
queue.Run()
// sending maxReaders messages, so the goroutines will block on `blocker` channel
for i := 0; i < maxReaders; i++ {
queue.Send(increase)
}
// waiting until they all make progress
for i := 0; i < maxReaders; i++ {
<-waiter
}
fmt.Println(counter.Load())
// check that queue is empty
require.Equal(t, queue.batcher.Len(), 0)
// making queue to overflow while readers are blocked
for i := 0; i < maxLen+1; i++ {
queue.Send(increase)
}
// check that queue was halved after overflow
require.Equal(t, (maxLen+1)/2+1, queue.batcher.Len())
// unblocking maxReaders waiting + then we should also unblock the new readers to do a bit more readings
for i := 0; i < int(expectedCounter); i++ {
blocker <- struct{}{}
}
// waiting for all readers to finish adding
for i := 0; i < int(expectedCounter)-maxReaders; i++ {
<-waiter
}
queue.Close()
require.Equal(t, expectedCounter, counter.Load())
}

View File

@ -86,7 +86,7 @@ func newObjectSync(
checker: checker, checker: checker,
syncCtx: syncCtx, syncCtx: syncCtx,
cancelSync: cancel, cancelSync: cancel,
actionQueue: NewActionQueue(maxStreamReaders, 100), actionQueue: NewDefaultActionQueue(),
} }
} }

View File

@ -1,12 +1,12 @@
package syncservice package objectsync
import ( import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -16,7 +16,7 @@ import (
var ErrEmptyPeer = errors.New("don't have such a peer") var ErrEmptyPeer = errors.New("don't have such a peer")
var ErrStreamClosed = errors.New("stream is already closed") var ErrStreamClosed = errors.New("stream is already closed")
var maxSimultaneousOperationsPerStream = 10 var maxStreamReaders = 10
var syncWaitPeriod = 2 * time.Second var syncWaitPeriod = 2 * time.Second
var ErrSyncTimeout = errors.New("too long wait on sync receive") var ErrSyncTimeout = errors.New("too long wait on sync receive")
@ -24,8 +24,8 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive")
// StreamPool can be made generic to work with different streams // StreamPool can be made generic to work with different streams
type StreamPool interface { type StreamPool interface {
ocache.ObjectLastUsage ocache.ObjectLastUsage
AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error)
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
@ -43,7 +43,7 @@ type responseWaiter struct {
type streamPool struct { type streamPool struct {
sync.Mutex sync.Mutex
peerStreams map[string]spacesyncproto.SpaceStream peerStreams map[string]spacesyncproto.ObjectSyncStream
messageHandler MessageHandler messageHandler MessageHandler
wg *sync.WaitGroup wg *sync.WaitGroup
waiters map[string]responseWaiter waiters map[string]responseWaiter
@ -54,7 +54,7 @@ type streamPool struct {
func newStreamPool(messageHandler MessageHandler) StreamPool { func newStreamPool(messageHandler MessageHandler) StreamPool {
s := &streamPool{ s := &streamPool{
peerStreams: make(map[string]spacesyncproto.SpaceStream), peerStreams: make(map[string]spacesyncproto.ObjectSyncStream),
messageHandler: messageHandler, messageHandler: messageHandler,
waiters: make(map[string]responseWaiter), waiters: make(map[string]responseWaiter),
wg: &sync.WaitGroup{}, wg: &sync.WaitGroup{},
@ -110,7 +110,7 @@ func (s *streamPool) SendSync(
func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
s.lastUsage.Store(time.Now().Unix()) s.lastUsage.Store(time.Now().Unix())
getStreams := func() (streams []spacesyncproto.SpaceStream) { getStreams := func() (streams []spacesyncproto.ObjectSyncStream) {
for _, pId := range peers { for _, pId := range peers {
stream, err := s.getOrDeleteStream(pId) stream, err := s.getOrDeleteStream(pId)
if err != nil { if err != nil {
@ -139,7 +139,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn
return err return err
} }
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) { func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) {
stream, exists := s.peerStreams[id] stream, exists := s.peerStreams[id]
if !exists { if !exists {
err = ErrEmptyPeer err = ErrEmptyPeer
@ -156,7 +156,7 @@ func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceSt
return return
} }
func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) { func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
Loop: Loop:
@ -188,7 +188,7 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
return nil return nil
} }
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) {
peerId, err := s.addStream(stream) peerId, err := s.addStream(stream)
if err != nil { if err != nil {
return return
@ -197,7 +197,7 @@ func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (e
return return
} }
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) {
peerId, err := s.addStream(stream) peerId, err := s.addStream(stream)
if err != nil { if err != nil {
return return
@ -205,7 +205,7 @@ func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (er
return s.readPeerLoop(peerId, stream) return s.readPeerLoop(peerId, stream)
} }
func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) { func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) {
s.Lock() s.Lock()
peerId, err = peer.CtxPeerId(stream.Context()) peerId, err = peer.CtxPeerId(stream.Context())
if err != nil { if err != nil {
@ -245,65 +245,74 @@ func (s *streamPool) Close() (err error) {
return nil return nil
} }
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
log := log.With(zap.String("peerId", peerId)) var (
log = log.With(zap.String("peerId", peerId))
queue = NewDefaultActionQueue()
)
queue.Run()
defer func() { defer func() {
log.Debug("stopped reading stream from peer") log.Debug("stopped reading stream from peer")
s.removePeer(peerId, stream) s.removePeer(peerId, stream)
queue.Close()
s.wg.Done() s.wg.Done()
}() }()
log.Debug("started reading stream from peer") log.Debug("started reading stream from peer")
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool {
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { s.waitersMx.Lock()
limiter <- struct{}{} waiter, exists := s.waiters[msg.ReplyId]
if exists {
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
return true
}
s.waitersMx.Unlock()
return false
} }
process := func(msg *spacesyncproto.ObjectSyncMessage) { process := func(msg *spacesyncproto.ObjectSyncMessage) error {
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
log.Debug("getting message with reply id") log.Debug("getting message with reply id")
err = s.messageHandler(stream.Context(), peerId, msg) err = s.messageHandler(stream.Context(), peerId, msg)
if err != nil { if err != nil {
log.With(zap.Error(err)).Debug("message handling failed") log.With(zap.Error(err)).Debug("message handling failed")
} }
return nil
} }
for { for {
select {
case <-stream.Context().Done():
return
default:
}
var msg *spacesyncproto.ObjectSyncMessage var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv() msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix()) s.lastUsage.Store(time.Now().Unix())
if err != nil { if err != nil {
stream.Close()
return return
} }
if msg.ReplyId != "" { if msg.ReplyId != "" {
s.waitersMx.Lock() // then we can send it directly to waiters without adding to queue or starting a reader
waiter, exists := s.waiters[msg.ReplyId] if stopWaiter(msg) {
if exists {
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
continue continue
} }
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
} }
select { queue.Send(func() error {
case <-limiter: return process(msg)
case <-stream.Context().Done(): })
return
}
go func(msg *spacesyncproto.ObjectSyncMessage) {
process(msg)
limiter <- struct{}{}
}(msg)
} }
} }
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
mapStream, ok := s.peerStreams[peerId] mapStream, ok := s.peerStreams[peerId]