diff --git a/common/commonspace/objectsync/actionqueue.go b/common/commonspace/objectsync/actionqueue.go index 8c677478..65d5e3af 100644 --- a/common/commonspace/objectsync/actionqueue.go +++ b/common/commonspace/objectsync/actionqueue.go @@ -21,6 +21,10 @@ type actionQueue struct { readers chan struct{} } +func NewDefaultActionQueue() ActionQueue { + return NewActionQueue(10, 200) +} + func NewActionQueue(maxReaders int, maxQueueLen int) ActionQueue { return &actionQueue{ batcher: mb.New[ActionFunc](maxQueueLen), @@ -37,7 +41,7 @@ func (q *actionQueue) Send(action ActionFunc) (err error) { } log.With(zap.Error(err)).Debug("queue returned error") actions := q.batcher.GetAll() - actions = actions[len(actions)/2:] + actions = append(actions[len(actions)/2:], action) return q.batcher.Add(context.Background(), actions...) } @@ -66,6 +70,7 @@ func (q *actionQueue) startReading() { } func (q *actionQueue) Close() { + log.Debug("closing the queue") q.batcher.Close() for i := 0; i < q.maxReaders; i++ { <-q.readers diff --git a/common/commonspace/objectsync/actionqueue_test.go b/common/commonspace/objectsync/actionqueue_test.go new file mode 100644 index 00000000..eef4a952 --- /dev/null +++ b/common/commonspace/objectsync/actionqueue_test.go @@ -0,0 +1,54 @@ +package objectsync + +import ( + "fmt" + "github.com/stretchr/testify/require" + "sync/atomic" + "testing" +) + +func TestActionQueue_Send(t *testing.T) { + maxReaders := 41 + maxLen := 93 + + queue := NewActionQueue(maxReaders, maxLen).(*actionQueue) + counter := atomic.Int32{} + expectedCounter := int32(maxReaders + (maxLen+1)/2 + 1) + blocker := make(chan struct{}, expectedCounter) + waiter := make(chan struct{}, expectedCounter) + increase := func() error { + counter.Add(1) + waiter <- struct{}{} + <-blocker + return nil + } + + queue.Run() + // sending maxReaders messages, so the goroutines will block on `blocker` channel + for i := 0; i < maxReaders; i++ { + queue.Send(increase) + } + // waiting until they all make progress + for i := 0; i < maxReaders; i++ { + <-waiter + } + fmt.Println(counter.Load()) + // check that queue is empty + require.Equal(t, queue.batcher.Len(), 0) + // making queue to overflow while readers are blocked + for i := 0; i < maxLen+1; i++ { + queue.Send(increase) + } + // check that queue was halved after overflow + require.Equal(t, (maxLen+1)/2+1, queue.batcher.Len()) + // unblocking maxReaders waiting + then we should also unblock the new readers to do a bit more readings + for i := 0; i < int(expectedCounter); i++ { + blocker <- struct{}{} + } + // waiting for all readers to finish adding + for i := 0; i < int(expectedCounter)-maxReaders; i++ { + <-waiter + } + queue.Close() + require.Equal(t, expectedCounter, counter.Load()) +} diff --git a/common/commonspace/objectsync/objectsync.go b/common/commonspace/objectsync/objectsync.go index f9d1fd86..b3c482a6 100644 --- a/common/commonspace/objectsync/objectsync.go +++ b/common/commonspace/objectsync/objectsync.go @@ -86,7 +86,7 @@ func newObjectSync( checker: checker, syncCtx: syncCtx, cancelSync: cancel, - actionQueue: NewActionQueue(maxStreamReaders, 100), + actionQueue: NewDefaultActionQueue(), } } diff --git a/common/commonspace/objectsync/streampool.go b/common/commonspace/objectsync/streampool.go index 8e5231c7..891c5f5f 100644 --- a/common/commonspace/objectsync/streampool.go +++ b/common/commonspace/objectsync/streampool.go @@ -1,12 +1,12 @@ -package syncservice +package objectsync import ( "context" "errors" "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/ocache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" - "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" "go.uber.org/zap" "sync" "sync/atomic" @@ -16,7 +16,7 @@ import ( var ErrEmptyPeer = errors.New("don't have such a peer") var ErrStreamClosed = errors.New("stream is already closed") -var maxSimultaneousOperationsPerStream = 10 +var maxStreamReaders = 10 var syncWaitPeriod = 2 * time.Second var ErrSyncTimeout = errors.New("too long wait on sync receive") @@ -24,8 +24,8 @@ var ErrSyncTimeout = errors.New("too long wait on sync receive") // StreamPool can be made generic to work with different streams type StreamPool interface { ocache.ObjectLastUsage - AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) - AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) + AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) + AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) @@ -43,7 +43,7 @@ type responseWaiter struct { type streamPool struct { sync.Mutex - peerStreams map[string]spacesyncproto.SpaceStream + peerStreams map[string]spacesyncproto.ObjectSyncStream messageHandler MessageHandler wg *sync.WaitGroup waiters map[string]responseWaiter @@ -54,7 +54,7 @@ type streamPool struct { func newStreamPool(messageHandler MessageHandler) StreamPool { s := &streamPool{ - peerStreams: make(map[string]spacesyncproto.SpaceStream), + peerStreams: make(map[string]spacesyncproto.ObjectSyncStream), messageHandler: messageHandler, waiters: make(map[string]responseWaiter), wg: &sync.WaitGroup{}, @@ -110,7 +110,7 @@ func (s *streamPool) SendSync( func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { s.lastUsage.Store(time.Now().Unix()) - getStreams := func() (streams []spacesyncproto.SpaceStream) { + getStreams := func() (streams []spacesyncproto.ObjectSyncStream) { for _, pId := range peers { stream, err := s.getOrDeleteStream(pId) if err != nil { @@ -139,7 +139,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn return err } -func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceStream, err error) { +func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) { stream, exists := s.peerStreams[id] if !exists { err = ErrEmptyPeer @@ -156,7 +156,7 @@ func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.SpaceSt return } -func (s *streamPool) getAllStreams() (streams []spacesyncproto.SpaceStream) { +func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) { s.Lock() defer s.Unlock() Loop: @@ -188,7 +188,7 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) ( return nil } -func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (err error) { +func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) { peerId, err := s.addStream(stream) if err != nil { return @@ -197,7 +197,7 @@ func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) (e return } -func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) { +func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) { peerId, err := s.addStream(stream) if err != nil { return @@ -205,7 +205,7 @@ func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (er return s.readPeerLoop(peerId, stream) } -func (s *streamPool) addStream(stream spacesyncproto.SpaceStream) (peerId string, err error) { +func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) { s.Lock() peerId, err = peer.CtxPeerId(stream.Context()) if err != nil { @@ -245,65 +245,74 @@ func (s *streamPool) Close() (err error) { return nil } -func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { - log := log.With(zap.String("peerId", peerId)) +func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { + var ( + log = log.With(zap.String("peerId", peerId)) + queue = NewDefaultActionQueue() + ) + queue.Run() + defer func() { log.Debug("stopped reading stream from peer") s.removePeer(peerId, stream) + queue.Close() s.wg.Done() }() + log.Debug("started reading stream from peer") - limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) - for i := 0; i < maxSimultaneousOperationsPerStream; i++ { - limiter <- struct{}{} + stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.ReplyId] + if exists { + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + waiter.ch <- msg + return true + } + s.waitersMx.Unlock() + return false } - process := func(msg *spacesyncproto.ObjectSyncMessage) { + process := func(msg *spacesyncproto.ObjectSyncMessage) error { log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) log.Debug("getting message with reply id") err = s.messageHandler(stream.Context(), peerId, msg) if err != nil { log.With(zap.Error(err)).Debug("message handling failed") } + return nil } for { + select { + case <-stream.Context().Done(): + return + default: + } var msg *spacesyncproto.ObjectSyncMessage msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { + stream.Close() return } if msg.ReplyId != "" { - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - if exists { - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg + // then we can send it directly to waiters without adding to queue or starting a reader + if stopWaiter(msg) { continue } - s.waitersMx.Unlock() - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") } - select { - case <-limiter: - case <-stream.Context().Done(): - return - } - - go func(msg *spacesyncproto.ObjectSyncMessage) { - process(msg) - limiter <- struct{}{} - }(msg) + queue.Send(func() error { + return process(msg) + }) } } -func (s *streamPool) removePeer(peerId string, stream spacesyncproto.SpaceStream) (err error) { +func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { s.Lock() defer s.Unlock() mapStream, ok := s.peerStreams[peerId]