From fa076ee83fc5a6a1e96479cd6ef79900526c1b42 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 27 Dec 2022 19:59:27 +0100 Subject: [PATCH] Fix waiters in streampool --- common/commonspace/syncservice/streampool.go | 64 ++++++++++---------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index b1bc0f09..8e5231c7 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -246,59 +246,61 @@ func (s *streamPool) Close() (err error) { } func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { - log.With(zap.String("replyId", peerId)).Debug("reading stream from peer") - defer s.wg.Done() + log := log.With(zap.String("peerId", peerId)) + defer func() { + log.Debug("stopped reading stream from peer") + s.removePeer(peerId, stream) + s.wg.Done() + }() + log.Debug("started reading stream from peer") + limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) for i := 0; i < maxSimultaneousOperationsPerStream; i++ { limiter <- struct{}{} } process := func(msg *spacesyncproto.ObjectSyncMessage) { - s.lastUsage.Store(time.Now().Unix()) - log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)). - Debug("getting message with reply id") - if msg.ReplyId == "" { - s.messageHandler(stream.Context(), peerId, msg) - return + log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) + log.Debug("getting message with reply id") + err = s.messageHandler(stream.Context(), peerId, msg) + if err != nil { + log.With(zap.Error(err)).Debug("message handling failed") } - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - - if !exists { - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id not exists") - s.waitersMx.Unlock() - s.messageHandler(stream.Context(), peerId, msg) - return - } - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id exists") - - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg } -Loop: for { var msg *spacesyncproto.ObjectSyncMessage msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { - break + return } + + if msg.ReplyId != "" { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.ReplyId] + if exists { + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + waiter.ch <- msg + continue + } + s.waitersMx.Unlock() + + log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") + } + select { case <-limiter: case <-stream.Context().Done(): - log.Debug("stream context done") - break Loop + return } - go func() { + + go func(msg *spacesyncproto.ObjectSyncMessage) { process(msg) limiter <- struct{}{} - }() + }(msg) } - log.With(zap.String("peerId", peerId)).Debug("stopped reading stream from peer") - s.removePeer(peerId, stream) - return } func (s *streamPool) removePeer(peerId string, stream spacesyncproto.SpaceStream) (err error) {