From 3d4ac4d1309fcaf09bc07bab2116dfde1df143d3 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 15 May 2023 19:52:19 +0200 Subject: [PATCH] use multiqueue for stream + MessageQueueId interface --- net/streampool/stream.go | 27 ++++++++++++--------------- net/streampool/streampool.go | 13 ++++++------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/net/streampool/stream.go b/net/streampool/stream.go index da14f74d..29d9e199 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -3,7 +3,7 @@ package streampool import ( "context" "github.com/anytypeio/any-sync/app/logger" - "github.com/cheggaaa/mb/v3" + "github.com/anytypeio/any-sync/util/multiqueue" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -17,12 +17,16 @@ type stream struct { streamId uint32 closed atomic.Bool l logger.CtxLogger - queue *mb.MB[drpc.Message] + queue multiqueue.MultiQueue[drpc.Message] tags []string } func (sr *stream) write(msg drpc.Message) (err error) { - return sr.queue.TryAdd(msg) + var queueId string + if qId, ok := msg.(MessageQueueId); ok { + queueId = qId.MessageQueueId() + } + return sr.queue.Add(sr.stream.Context(), queueId, msg) } func (sr *stream) readLoop() error { @@ -45,20 +49,13 @@ func (sr *stream) readLoop() error { } } -func (sr *stream) writeLoop() error { - defer func() { +func (sr *stream) writeToStream(msg drpc.Message) { + if err := sr.stream.MsgSend(msg, EncodingProto); err != nil { + sr.l.Warn("msg send error", zap.Error(err)) sr.streamClose() - }() - sr.l.Debug("stream write started") - for { - msg, err := sr.queue.WaitOne(sr.stream.Context()) - if err != nil { - return err - } - if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { - return err - } + return } + return } func (sr *stream) streamClose() { diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 4155647e..1230ed88 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" - "github.com/cheggaaa/mb/v3" + "github.com/anytypeio/any-sync/util/multiqueue" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -25,6 +25,10 @@ type StreamHandler interface { // PeerGetter should dial or return cached peers type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error) +type MessageQueueId interface { + MessageQueueId() string +} + // StreamPool keeps and read streams type StreamPool interface { // AddStream adds new outgoing stream into the pool @@ -115,19 +119,14 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, pool: s, streamId: streamId, l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), - queue: mb.New[drpc.Message](queueSize), tags: tags, } + st.queue = multiqueue.New[drpc.Message](st.writeToStream, s.writeQueueSize) s.streams[streamId] = st s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId) for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } - go func() { - if err := st.writeLoop(); err != nil { - st.l.Info("stream closed", zap.Error(err)) - } - }() return st, nil }