diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 6f6c8810..64b2b3fb 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -109,7 +109,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { - log.DebugCtx(ctx, "failed to get object") + log.WarnCtx(ctx, "failed to get object", zap.Error(err)) // TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId) } diff --git a/net/streampool/metric.go b/net/streampool/metric.go new file mode 100644 index 00000000..0818306f --- /dev/null +++ b/net/streampool/metric.go @@ -0,0 +1,36 @@ +package streampool + +import "github.com/prometheus/client_golang/prometheus" + +func registerMetrics(ref *prometheus.Registry, sp *streamPool, name string) { + ref.MustRegister( + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: name, + Subsystem: "streampool", + Name: "stream_count", + Help: "count of opened streams", + }, func() float64 { + sp.mu.Lock() + defer sp.mu.Unlock() + return float64(len(sp.streams)) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: name, + Subsystem: "streampool", + Name: "tag_count", + Help: "count of active tags", + }, func() float64 { + sp.mu.Lock() + defer sp.mu.Unlock() + return float64(len(sp.streamIdsByTag)) + }), + prometheus.NewGaugeFunc(prometheus.GaugeOpts{ + Namespace: name, + Subsystem: "streampool", + Name: "dial_queue", + Help: "dial queue size", + }, func() float64 { + return float64(sp.dial.batch.Len()) + }), + ) +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go index f2e092c4..da14f74d 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -3,6 +3,7 @@ package streampool import ( "context" "github.com/anytypeio/any-sync/app/logger" + "github.com/cheggaaa/mb/v3" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -16,14 +17,12 @@ type stream struct { streamId uint32 closed atomic.Bool l logger.CtxLogger + queue *mb.MB[drpc.Message] tags []string } func (sr *stream) write(msg drpc.Message) (err error) { - if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { - sr.streamClose() - } - return err + return sr.queue.TryAdd(msg) } func (sr *stream) readLoop() error { @@ -46,8 +45,25 @@ func (sr *stream) readLoop() error { } } +func (sr *stream) writeLoop() error { + defer func() { + sr.streamClose() + }() + sr.l.Debug("stream write started") + for { + msg, err := sr.queue.WaitOne(sr.stream.Context()) + if err != nil { + return err + } + if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { + return err + } + } +} + func (sr *stream) streamClose() { if !sr.closed.Swap(true) { + _ = sr.queue.Close() _ = sr.stream.Close() sr.pool.removeStream(sr.streamId) } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 5295cea4..4155647e 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" + "github.com/cheggaaa/mb/v3" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -52,9 +53,9 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - exec *execPool dial *execPool mu sync.Mutex + writeQueueSize int lastStreamId uint32 } @@ -103,6 +104,10 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, defer s.mu.Unlock() s.lastStreamId++ streamId := s.lastStreamId + queueSize := s.writeQueueSize + if queueSize <= 0 { + queueSize = 100 + } st := &stream{ peerId: peerId, peerCtx: ctx, @@ -110,6 +115,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, pool: s, streamId: streamId, l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), + queue: mb.New[drpc.Message](queueSize), tags: tags, } s.streams[streamId] = st @@ -117,27 +123,25 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } + go func() { + if err := st.writeLoop(); err != nil { + st.l.Info("stream closed", zap.Error(err)) + } + }() return st, nil } func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) { - var sendOneFunc = func(sp peer.Peer) func() { - return func() { - if e := s.sendOne(ctx, sp, msg); e != nil { - log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id())) - } else { - log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id())) - } - } - } return s.dial.TryAdd(func() { peers, dialErr := peerGetter(ctx) if dialErr != nil { log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) } for _, p := range peers { - if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { - return + if e := s.sendOne(ctx, p, msg); e != nil { + log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id())) + } else { + log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id())) } } }) @@ -157,22 +161,14 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ... } s.mu.Unlock() - var sendStreamsFunc = func(streams []*stream) func() { - return func() { - for _, st := range streams { - if e := st.write(msg); e != nil { - st.l.Debug("sendById write error", zap.Error(e)) - } else { - st.l.DebugCtx(ctx, "sendById success") - return - } - } - } - } - for _, streams := range streamsByPeer { - if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil { - return + for _, st := range streams { + if e := st.write(msg); e != nil { + st.l.Debug("sendById write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "sendById success") + return + } } } if len(streamsByPeer) == 0 { @@ -271,21 +267,12 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st } } s.mu.Unlock() - var sendStreamFunc = func(st *stream) func() { - return func() { - if e := st.write(msg); e != nil { - st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e)) - } else { - st.l.DebugCtx(ctx, "broadcast success") - } - } - } + for _, st := range streams { - if st == nil { - panic("nil stream") - } - if err = s.exec.Add(ctx, sendStreamFunc(st)); err != nil { - return err + if e := st.write(msg); e != nil { + st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e)) + } else { + st.l.DebugCtx(ctx, "broadcast success") } } return @@ -361,10 +348,7 @@ func (s *streamPool) removeStream(streamId uint32) { } func (s *streamPool) Close() (err error) { - if e := s.dial.Close(); e != nil { - log.Warn("dial queue close error", zap.Error(e)) - } - return s.exec.Close() + return s.dial.Close() } func removeStream(m map[string][]uint32, key string, streamId uint32) { diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index 57b2698d..7f1a7fe4 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -202,7 +202,6 @@ func newFixture(t *testing.T) *fixture { fx.tp = rpctest.NewTestPool().WithServer(ts) fx.th = &testHandler{} fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{ - SendQueueWorkers: 4, SendQueueSize: 10, DialQueueWorkers: 1, DialQueueSize: 10, diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index 25070497..eca11221 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -3,6 +3,7 @@ package streampool import ( "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app/logger" + "github.com/anytypeio/any-sync/metric" ) const CName = "common.net.streampool" @@ -14,9 +15,7 @@ func New() Service { } type StreamConfig struct { - // SendQueueWorkers how many workers will write message to streams - SendQueueWorkers int - // SendQueueSize size of the queue for write + // SendQueueSize size of the queue for write per peer SendQueueSize int // DialQueueWorkers how many workers will dial to peers DialQueueWorkers int @@ -30,22 +29,27 @@ type Service interface { } type service struct { + metric metric.Metric } func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { sp := &streamPool{ handler: h, + writeQueueSize: conf.SendQueueSize, streamIdsByPeer: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize), dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), } + if s.metric != nil { + registerMetrics(s.metric.Registry(), sp, "") + } return sp } func (s *service) Init(a *app.App) (err error) { + s.metric, _ = a.Component(metric.CName).(metric.Metric) return nil }