diff --git a/common/commonspace/objectsync/actionqueue.go b/common/commonspace/objectsync/actionqueue.go index bf76334f..740c5b9b 100644 --- a/common/commonspace/objectsync/actionqueue.go +++ b/common/commonspace/objectsync/actionqueue.go @@ -42,13 +42,13 @@ func (q *actionQueue) Run() { } func (q *actionQueue) read() { - limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) - for i := 0; i < maxSimultaneousOperationsPerStream; i++ { + limiter := make(chan struct{}, maxStreamReaders) + for i := 0; i < maxStreamReaders; i++ { limiter <- struct{}{} } defer func() { // wait until all operations are done - for i := 0; i < maxSimultaneousOperationsPerStream; i++ { + for i := 0; i < maxStreamReaders; i++ { <-limiter } close(q.queueDone) diff --git a/common/commonspace/objectsync/streampool.go b/common/commonspace/objectsync/streampool.go index 6d550f14..6f150a81 100644 --- a/common/commonspace/objectsync/streampool.go +++ b/common/commonspace/objectsync/streampool.go @@ -16,7 +16,7 @@ import ( var ErrEmptyPeer = errors.New("don't have such a peer") var ErrStreamClosed = errors.New("stream is already closed") -var maxSimultaneousOperationsPerStream = 10 +var maxStreamReaders = 10 var syncWaitPeriod = 2 * time.Second var ErrSyncTimeout = errors.New("too long wait on sync receive") @@ -246,7 +246,14 @@ func (s *streamPool) Close() (err error) { } func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { - log := log.With(zap.String("peerId", peerId)) + var ( + msg *spacesyncproto.ObjectSyncMessage + mx = sync.Mutex{} + maxWaitMsgs = 500 + queue = make([]*spacesyncproto.ObjectSyncMessage, 0, 10) + readers = 0 + log = log.With(zap.String("peerId", peerId)) + ) defer func() { log.Debug("stopped reading stream from peer") s.removePeer(peerId, stream) @@ -254,22 +261,45 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn }() log.Debug("started reading stream from peer") - limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) - for i := 0; i < maxSimultaneousOperationsPerStream; i++ { - limiter <- struct{}{} - } + readQueue := func() { + defer func() { + mx.Lock() + readers-- + mx.Unlock() + }() - process := func(msg *spacesyncproto.ObjectSyncMessage) { - log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) - log.Debug("getting message with reply id") - err = s.messageHandler(stream.Context(), peerId, msg) - if err != nil { - log.With(zap.Error(err)).Debug("message handling failed") + for { + select { + case <-stream.Context().Done(): + return + default: + } + + mx.Lock() + if len(queue) == 0 { + mx.Unlock() + return + } + newEl := queue[0] + queue = queue[1:] + mx.Unlock() + + log := log.With(zap.String("replyId", newEl.ReplyId), zap.String("object id", newEl.ObjectId)) + log.Debug("getting message with reply id") + err = s.messageHandler(stream.Context(), peerId, newEl) + if err != nil { + log.With(zap.Error(err)).Debug("message handling failed") + } } } for { - var msg *spacesyncproto.ObjectSyncMessage + select { + case <-stream.Context().Done(): + return + default: + } + msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { @@ -277,6 +307,7 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn } if msg.ReplyId != "" { + // then we can send it directly to waiters without adding to queue or starting a reader s.waitersMx.Lock() waiter, exists := s.waiters[msg.ReplyId] if exists { @@ -290,16 +321,21 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") } - select { - case <-limiter: - case <-stream.Context().Done(): - return + mx.Lock() + queue = append(queue, msg) + if len(queue) > maxWaitMsgs { + queue = queue[len(queue)-maxWaitMsgs:] } + // if we already have max goroutines reading the queue in parallel + if readers >= maxStreamReaders { + mx.Unlock() + continue + } + readers++ + mx.Unlock() - go func(msg *spacesyncproto.ObjectSyncMessage) { - process(msg) - limiter <- struct{}{} - }(msg) + // starting another reader + go readQueue() } }