From 98e99686e472b920fb8c44ddb4569c5501d88c93 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 15 May 2023 15:50:27 +0200 Subject: [PATCH 1/4] streampool: send queue per peer + metrics --- commonspace/objectsync/objectsync.go | 2 +- net/streampool/metric.go | 36 ++++++++++++++ net/streampool/stream.go | 24 +++++++-- net/streampool/streampool.go | 74 +++++++++++----------------- net/streampool/streampool_test.go | 1 - net/streampool/streampoolservice.go | 12 +++-- 6 files changed, 94 insertions(+), 55 deletions(-) create mode 100644 net/streampool/metric.go diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index ee26c519..f6f26630 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -109,7 +109,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { - log.DebugCtx(ctx, "failed to get object") + log.WarnCtx(ctx, "failed to get object", zap.Error(err)) // TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId) } diff --git a/net/streampool/metric.go b/net/streampool/metric.go new file mode 100644 index 00000000..0818306f --- /dev/null +++ b/net/streampool/metric.go @@ -0,0 +1,36 @@ +package streampool + +import "github.com/prometheus/client_golang/prometheus" + +func registerMetrics(ref *prometheus.Registry, sp *streamPool, name string) { + ref.MustRegister( + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: name, + Subsystem: "streampool", + Name: "stream_count", + Help: "count of opened streams", + }, func() float64 { + sp.mu.Lock() + defer sp.mu.Unlock() + return float64(len(sp.streams)) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: name, + Subsystem: "streampool", + Name: "tag_count", + Help: "count of active tags", + }, func() float64 { + sp.mu.Lock() + defer sp.mu.Unlock() + return float64(len(sp.streamIdsByTag)) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: name, + Subsystem: "streampool", + Name: "dial_queue", + Help: "dial queue size", + }, func() float64 { + return float64(sp.dial.batch.Len()) + }), + ) +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go index f2e092c4..da14f74d 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -3,6 +3,7 @@ package streampool import ( "context" "github.com/anytypeio/any-sync/app/logger" + "github.com/cheggaaa/mb/v3" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -16,14 +17,12 @@ type stream struct { streamId uint32 closed atomic.Bool l logger.CtxLogger + queue *mb.MB[drpc.Message] tags []string } func (sr *stream) write(msg drpc.Message) (err error) { - if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { - sr.streamClose() - } - return err + return sr.queue.TryAdd(msg) } func (sr *stream) readLoop() error { @@ -46,8 +45,25 @@ func (sr *stream) readLoop() error { } } +func (sr *stream) writeLoop() error { + defer func() { + sr.streamClose() + }() + sr.l.Debug("stream write started") + for { + msg, err := sr.queue.WaitOne(sr.stream.Context()) + if err != nil { + return err + } + if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { + return err + } + } +} + func (sr *stream) streamClose() { if !sr.closed.Swap(true) { + _ = sr.queue.Close() _ = sr.stream.Close() sr.pool.removeStream(sr.streamId) } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 5295cea4..4155647e 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" + "github.com/cheggaaa/mb/v3" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -52,9 +53,9 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - exec *execPool dial *execPool mu sync.Mutex + writeQueueSize int lastStreamId uint32 } @@ -103,6 +104,10 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, defer s.mu.Unlock() s.lastStreamId++ streamId := s.lastStreamId + queueSize := s.writeQueueSize + if queueSize <= 0 { + queueSize = 100 + } st := &stream{ peerId: peerId, peerCtx: ctx, @@ -110,6 +115,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, pool: s, streamId: streamId, l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), + queue: mb.New[drpc.Message](queueSize), tags: tags, } s.streams[streamId] = st @@ -117,27 +123,25 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } + go func() { + if err := st.writeLoop(); err != nil { + st.l.Info("stream closed", zap.Error(err)) + } + }() return st, nil } func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) { - var sendOneFunc = func(sp peer.Peer) func() { - return func() { - if e := s.sendOne(ctx, sp, msg); e != nil { - log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id())) - } else { - log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id())) - } - } - } return s.dial.TryAdd(func() { peers, dialErr := peerGetter(ctx) if dialErr != nil { log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) } for _, p := range peers { - if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { - return + if e := s.sendOne(ctx, p, msg); e != nil { + log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id())) + } else { + log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id())) } } }) @@ -157,22 +161,14 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ... } s.mu.Unlock() - var sendStreamsFunc = func(streams []*stream) func() { - return func() { - for _, st := range streams { - if e := st.write(msg); e != nil { - st.l.Debug("sendById write error", zap.Error(e)) - } else { - st.l.DebugCtx(ctx, "sendById success") - return - } - } - } - } - for _, streams := range streamsByPeer { - if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil { - return + for _, st := range streams { + if e := st.write(msg); e != nil { + st.l.Debug("sendById write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "sendById success") + return + } } } if len(streamsByPeer) == 0 { @@ -271,21 +267,12 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st } } s.mu.Unlock() - var sendStreamFunc = func(st *stream) func() { - return func() { - if e := st.write(msg); e != nil { - st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e)) - } else { - st.l.DebugCtx(ctx, "broadcast success") - } - } - } + for _, st := range streams { - if st == nil { - panic("nil stream") - } - if err = s.exec.Add(ctx, sendStreamFunc(st)); err != nil { - return err + if e := st.write(msg); e != nil { + st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "broadcast success") } } return @@ -361,10 +348,7 @@ func (s *streamPool) removeStream(streamId uint32) { } func (s *streamPool) Close() (err error) { - if e := s.dial.Close(); e != nil { - log.Warn("dial queue close error", zap.Error(e)) - } - return s.exec.Close() + return s.dial.Close() } func removeStream(m map[string][]uint32, key string, streamId uint32) { diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index 57b2698d..7f1a7fe4 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -202,7 +202,6 @@ func newFixture(t *testing.T) *fixture { fx.tp = rpctest.NewTestPool().WithServer(ts) fx.th = &testHandler{} fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{ - SendQueueWorkers: 4, SendQueueSize: 10, DialQueueWorkers: 1, DialQueueSize: 10, diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 25070497..eca11221 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -3,6 +3,7 @@ package streampool import ( "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" + "github.com/anytypeio/any-sync/metric" ) const CName = "common.net.streampool" @@ -14,9 +15,7 @@ func New() Service { } type StreamConfig struct { - // SendQueueWorkers how many workers will write message to streams - SendQueueWorkers int - // SendQueueSize size of the queue for write + // SendQueueSize size of the queue for write per peer SendQueueSize int // DialQueueWorkers how many workers will dial to peers DialQueueWorkers int @@ -30,22 +29,27 @@ type Service interface { } type service struct { + metric metric.Metric } func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { sp := &streamPool{ handler: h, + writeQueueSize: conf.SendQueueSize, streamIdsByPeer: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize), dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), } + if s.metric != nil { + registerMetrics(s.metric.Registry(), sp, "") + } return sp } func (s *service) Init(a *app.App) (err error) { + s.metric, _ = a.Component(metric.CName).(metric.Metric) return nil } From bc24da876280807c7a13544df89b2470b05cd1a5 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 15 May 2023 19:52:19 +0200 Subject: [PATCH 2/4] use multiqueue for stream + MessageQueueId interface --- net/streampool/stream.go | 27 ++++++++++++--------------- net/streampool/streampool.go | 13 ++++++------- 2 files changed, 18 insertions(+), 22 deletions(-) diff --git a/net/streampool/stream.go b/net/streampool/stream.go index da14f74d..29d9e199 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -3,7 +3,7 @@ package streampool import ( "context" "github.com/anytypeio/any-sync/app/logger" - "github.com/cheggaaa/mb/v3" + "github.com/anytypeio/any-sync/util/multiqueue" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -17,12 +17,16 @@ type stream struct { streamId uint32 closed atomic.Bool l logger.CtxLogger - queue *mb.MB[drpc.Message] + queue multiqueue.MultiQueue[drpc.Message] tags []string } func (sr *stream) write(msg drpc.Message) (err error) { - return sr.queue.TryAdd(msg) + var queueId string + if qId, ok := msg.(MessageQueueId); ok { + queueId = qId.MessageQueueId() + } + return sr.queue.Add(sr.stream.Context(), queueId, msg) } func (sr *stream) readLoop() error { @@ -45,20 +49,13 @@ func (sr *stream) readLoop() error { } } -func (sr *stream) writeLoop() error { - defer func() { +func (sr *stream) writeToStream(msg drpc.Message) { + if err := sr.stream.MsgSend(msg, EncodingProto); err != nil { + sr.l.Warn("msg send error", zap.Error(err)) sr.streamClose() - }() - sr.l.Debug("stream write started") - for { - msg, err := sr.queue.WaitOne(sr.stream.Context()) - if err != nil { - return err - } - if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { - return err - } + return } + return } func (sr *stream) streamClose() { diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 4155647e..1230ed88 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" - "github.com/cheggaaa/mb/v3" + "github.com/anytypeio/any-sync/util/multiqueue" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -25,6 +25,10 @@ type StreamHandler interface { // PeerGetter should dial or return cached peers type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error) +type MessageQueueId interface { + MessageQueueId() string +} + // StreamPool keeps and read streams type StreamPool interface { // AddStream adds new outgoing stream into the pool @@ -115,19 +119,14 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, pool: s, streamId: streamId, l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), - queue: mb.New[drpc.Message](queueSize), tags: tags, } + st.queue = multiqueue.New[drpc.Message](st.writeToStream, s.writeQueueSize) s.streams[streamId] = st s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId) for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } - go func() { - if err := st.writeLoop(); err != nil { - st.l.Info("stream closed", zap.Error(err)) - } - }() return st, nil } From c410101d5fd3a62d45ba940fdcf463ca33d91bac Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 15 May 2023 20:01:52 +0200 Subject: [PATCH 3/4] QueueId wrapper + test --- net/streampool/stream.go | 1 + net/streampool/streampool.go | 19 +++++++++++++++++++ net/streampool/streampool_test.go | 2 +- 3 files changed, 21 insertions(+), 1 deletion(-) diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 29d9e199..141d433c 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -25,6 +25,7 @@ func (sr *stream) write(msg drpc.Message) (err error) { var queueId string if qId, ok := msg.(MessageQueueId); ok { queueId = qId.MessageQueueId() + msg = qId.DrpcMessage() } return sr.queue.Add(sr.stream.Context(), queueId, msg) } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 1230ed88..442a5097 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -27,6 +27,7 @@ type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error) type MessageQueueId interface { MessageQueueId() string + DrpcMessage() drpc.Message } // StreamPool keeps and read streams @@ -363,3 +364,21 @@ func removeStream(m map[string][]uint32, key string, streamId uint32) { m[key] = streamIds } } + +// WithQueueId wraps the message and adds queueId +func WithQueueId(msg drpc.Message, queueId string) drpc.Message { + return &messageWithQueueId{queueId: queueId, Message: msg} +} + +type messageWithQueueId struct { + drpc.Message + queueId string +} + +func (m messageWithQueueId) MessageQueueId() string { + return m.queueId +} + +func (m messageWithQueueId) DrpcMessage() drpc.Message { + return m.Message +} diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index 7f1a7fe4..8facd481 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -39,7 +39,7 @@ func TestStreamPool_AddStream(t *testing.T) { require.NoError(t, fx.AddStream(s2, "space2", "common")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1")) - require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2")) + require.NoError(t, fx.Broadcast(ctx, WithQueueId(&testservice.StreamMessage{ReqData: "space2"}, "q2"), "space2")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common")) var serverResults []string From 94e211f13444a6ffdd1df9baee0129068c322374 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 15 May 2023 20:30:32 +0200 Subject: [PATCH 4/4] increase sendSync timeout --- commonspace/objectsync/msgpool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 533efc7e..acce59f7 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -54,7 +54,7 @@ func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageH func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { s.updateLastUsage() var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Second*10) + ctx, cancel = context.WithTimeout(ctx, time.Minute) defer cancel() newCounter := s.counter.Add(1) msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter)