From a103c960a479226c894fedb552cd2c2ff99cb606 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 28 Dec 2022 17:52:10 +0100 Subject: [PATCH] Some streampool fixes --- common/commonspace/objectsync/actionqueue.go | 6 ++ common/commonspace/objectsync/streampool.go | 91 +++++++------------- 2 files changed, 38 insertions(+), 59 deletions(-) diff --git a/common/commonspace/objectsync/actionqueue.go b/common/commonspace/objectsync/actionqueue.go index 740c5b9b..98409978 100644 --- a/common/commonspace/objectsync/actionqueue.go +++ b/common/commonspace/objectsync/actionqueue.go @@ -61,6 +61,12 @@ func (q *actionQueue) read() { return } for _, msg := range actions { + select { + case <-q.ctx.Done(): + return + default: + } + <-limiter go func(action ActionFunc) { err = action() diff --git a/common/commonspace/objectsync/streampool.go b/common/commonspace/objectsync/streampool.go index 6f150a81..0208e253 100644 --- a/common/commonspace/objectsync/streampool.go +++ b/common/commonspace/objectsync/streampool.go @@ -247,50 +247,41 @@ func (s *streamPool) Close() (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { var ( - msg *spacesyncproto.ObjectSyncMessage - mx = sync.Mutex{} - maxWaitMsgs = 500 - queue = make([]*spacesyncproto.ObjectSyncMessage, 0, 10) - readers = 0 - log = log.With(zap.String("peerId", peerId)) + log = log.With(zap.String("peerId", peerId)) + queue = NewActionQueue() ) + queue.Run() + defer func() { log.Debug("stopped reading stream from peer") s.removePeer(peerId, stream) + queue.Close() s.wg.Done() }() + log.Debug("started reading stream from peer") - readQueue := func() { - defer func() { - mx.Lock() - readers-- - mx.Unlock() - }() - - for { - select { - case <-stream.Context().Done(): - return - default: - } - - mx.Lock() - if len(queue) == 0 { - mx.Unlock() - return - } - newEl := queue[0] - queue = queue[1:] - mx.Unlock() - - log := log.With(zap.String("replyId", newEl.ReplyId), zap.String("object id", newEl.ObjectId)) - log.Debug("getting message with reply id") - err = s.messageHandler(stream.Context(), peerId, newEl) - if err != nil { - log.With(zap.Error(err)).Debug("message handling failed") - } + stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.ReplyId] + if exists { + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + waiter.ch <- msg + return true } + s.waitersMx.Unlock() + return false + } + + process := func(msg *spacesyncproto.ObjectSyncMessage) error { + log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) + log.Debug("getting message with reply id") + err = s.messageHandler(stream.Context(), peerId, msg) + if err != nil { + log.With(zap.Error(err)).Debug("message handling failed") + } + return nil } for { @@ -299,43 +290,25 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn return default: } - + var msg *spacesyncproto.ObjectSyncMessage msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { + stream.Close() return } if msg.ReplyId != "" { // then we can send it directly to waiters without adding to queue or starting a reader - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - if exists { - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg + if stopWaiter(msg) { continue } - s.waitersMx.Unlock() - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") } - mx.Lock() - queue = append(queue, msg) - if len(queue) > maxWaitMsgs { - queue = queue[len(queue)-maxWaitMsgs:] - } - // if we already have max goroutines reading the queue in parallel - if readers >= maxStreamReaders { - mx.Unlock() - continue - } - readers++ - mx.Unlock() - - // starting another reader - go readQueue() + queue.Send(func() error { + return process(msg) + }) } }