streampool: send queue per peer + metrics

This commit is contained in:
Sergey Cherepanov 2023-05-15 15:50:27 +02:00
parent 56c1d65e88
commit 98e99686e4
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
6 changed files with 94 additions and 55 deletions

View File

@ -109,7 +109,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
} }
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
if err != nil { if err != nil {
log.DebugCtx(ctx, "failed to get object") log.WarnCtx(ctx, "failed to get object", zap.Error(err))
// TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync // TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync
return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId) return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId)
} }

36
net/streampool/metric.go Normal file
View File

@ -0,0 +1,36 @@
package streampool
import "github.com/prometheus/client_golang/prometheus"
func registerMetrics(ref *prometheus.Registry, sp *streamPool, name string) {
ref.MustRegister(
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: name,
Subsystem: "streampool",
Name: "stream_count",
Help: "count of opened streams",
}, func() float64 {
sp.mu.Lock()
defer sp.mu.Unlock()
return float64(len(sp.streams))
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: name,
Subsystem: "streampool",
Name: "tag_count",
Help: "count of active tags",
}, func() float64 {
sp.mu.Lock()
defer sp.mu.Unlock()
return float64(len(sp.streamIdsByTag))
}),
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
Namespace: name,
Subsystem: "streampool",
Name: "dial_queue",
Help: "dial queue size",
}, func() float64 {
return float64(sp.dial.batch.Len())
}),
)
}

View File

@ -3,6 +3,7 @@ package streampool
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/drpc" "storj.io/drpc"
"sync/atomic" "sync/atomic"
@ -16,14 +17,12 @@ type stream struct {
streamId uint32 streamId uint32
closed atomic.Bool closed atomic.Bool
l logger.CtxLogger l logger.CtxLogger
queue *mb.MB[drpc.Message]
tags []string tags []string
} }
func (sr *stream) write(msg drpc.Message) (err error) { func (sr *stream) write(msg drpc.Message) (err error) {
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { return sr.queue.TryAdd(msg)
sr.streamClose()
}
return err
} }
func (sr *stream) readLoop() error { func (sr *stream) readLoop() error {
@ -46,8 +45,25 @@ func (sr *stream) readLoop() error {
} }
} }
func (sr *stream) writeLoop() error {
defer func() {
sr.streamClose()
}()
sr.l.Debug("stream write started")
for {
msg, err := sr.queue.WaitOne(sr.stream.Context())
if err != nil {
return err
}
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil {
return err
}
}
}
func (sr *stream) streamClose() { func (sr *stream) streamClose() {
if !sr.closed.Swap(true) { if !sr.closed.Swap(true) {
_ = sr.queue.Close()
_ = sr.stream.Close() _ = sr.stream.Close()
sr.pool.removeStream(sr.streamId) sr.pool.removeStream(sr.streamId)
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/pool"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -52,9 +53,9 @@ type streamPool struct {
streamIdsByTag map[string][]uint32 streamIdsByTag map[string][]uint32
streams map[uint32]*stream streams map[uint32]*stream
opening map[string]*openingProcess opening map[string]*openingProcess
exec *execPool
dial *execPool dial *execPool
mu sync.Mutex mu sync.Mutex
writeQueueSize int
lastStreamId uint32 lastStreamId uint32
} }
@ -103,6 +104,10 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
defer s.mu.Unlock() defer s.mu.Unlock()
s.lastStreamId++ s.lastStreamId++
streamId := s.lastStreamId streamId := s.lastStreamId
queueSize := s.writeQueueSize
if queueSize <= 0 {
queueSize = 100
}
st := &stream{ st := &stream{
peerId: peerId, peerId: peerId,
peerCtx: ctx, peerCtx: ctx,
@ -110,6 +115,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
pool: s, pool: s,
streamId: streamId, streamId: streamId,
l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)), l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)),
queue: mb.New[drpc.Message](queueSize),
tags: tags, tags: tags,
} }
s.streams[streamId] = st s.streams[streamId] = st
@ -117,27 +123,25 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
for _, tag := range tags { for _, tag := range tags {
s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId)
} }
go func() {
if err := st.writeLoop(); err != nil {
st.l.Info("stream closed", zap.Error(err))
}
}()
return st, nil return st, nil
} }
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) { func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) {
var sendOneFunc = func(sp peer.Peer) func() {
return func() {
if e := s.sendOne(ctx, sp, msg); e != nil {
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id()))
} else {
log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id()))
}
}
}
return s.dial.TryAdd(func() { return s.dial.TryAdd(func() {
peers, dialErr := peerGetter(ctx) peers, dialErr := peerGetter(ctx)
if dialErr != nil { if dialErr != nil {
log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr))
} }
for _, p := range peers { for _, p := range peers {
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { if e := s.sendOne(ctx, p, msg); e != nil {
return log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id()))
} else {
log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id()))
} }
} }
}) })
@ -157,22 +161,14 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...
} }
s.mu.Unlock() s.mu.Unlock()
var sendStreamsFunc = func(streams []*stream) func() {
return func() {
for _, st := range streams {
if e := st.write(msg); e != nil {
st.l.Debug("sendById write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "sendById success")
return
}
}
}
}
for _, streams := range streamsByPeer { for _, streams := range streamsByPeer {
if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil { for _, st := range streams {
return if e := st.write(msg); e != nil {
st.l.Debug("sendById write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "sendById success")
return
}
} }
} }
if len(streamsByPeer) == 0 { if len(streamsByPeer) == 0 {
@ -271,21 +267,12 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
} }
} }
s.mu.Unlock() s.mu.Unlock()
var sendStreamFunc = func(st *stream) func() {
return func() {
if e := st.write(msg); e != nil {
st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e))
} else {
st.l.DebugCtx(ctx, "broadcast success")
}
}
}
for _, st := range streams { for _, st := range streams {
if st == nil { if e := st.write(msg); e != nil {
panic("nil stream") st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e))
} } else {
if err = s.exec.Add(ctx, sendStreamFunc(st)); err != nil { st.l.DebugCtx(ctx, "broadcast success")
return err
} }
} }
return return
@ -361,10 +348,7 @@ func (s *streamPool) removeStream(streamId uint32) {
} }
func (s *streamPool) Close() (err error) { func (s *streamPool) Close() (err error) {
if e := s.dial.Close(); e != nil { return s.dial.Close()
log.Warn("dial queue close error", zap.Error(e))
}
return s.exec.Close()
} }
func removeStream(m map[string][]uint32, key string, streamId uint32) { func removeStream(m map[string][]uint32, key string, streamId uint32) {

View File

@ -202,7 +202,6 @@ func newFixture(t *testing.T) *fixture {
fx.tp = rpctest.NewTestPool().WithServer(ts) fx.tp = rpctest.NewTestPool().WithServer(ts)
fx.th = &testHandler{} fx.th = &testHandler{}
fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{ fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{
SendQueueWorkers: 4,
SendQueueSize: 10, SendQueueSize: 10,
DialQueueWorkers: 1, DialQueueWorkers: 1,
DialQueueSize: 10, DialQueueSize: 10,

View File

@ -3,6 +3,7 @@ package streampool
import ( import (
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/metric"
) )
const CName = "common.net.streampool" const CName = "common.net.streampool"
@ -14,9 +15,7 @@ func New() Service {
} }
type StreamConfig struct { type StreamConfig struct {
// SendQueueWorkers how many workers will write message to streams // SendQueueSize size of the queue for write per peer
SendQueueWorkers int
// SendQueueSize size of the queue for write
SendQueueSize int SendQueueSize int
// DialQueueWorkers how many workers will dial to peers // DialQueueWorkers how many workers will dial to peers
DialQueueWorkers int DialQueueWorkers int
@ -30,22 +29,27 @@ type Service interface {
} }
type service struct { type service struct {
metric metric.Metric
} }
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
sp := &streamPool{ sp := &streamPool{
handler: h, handler: h,
writeQueueSize: conf.SendQueueSize,
streamIdsByPeer: map[string][]uint32{}, streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{}, streams: map[uint32]*stream{},
opening: map[string]*openingProcess{}, opening: map[string]*openingProcess{},
exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize),
dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize),
} }
if s.metric != nil {
registerMetrics(s.metric.Registry(), sp, "")
}
return sp return sp
} }
func (s *service) Init(a *app.App) (err error) { func (s *service) Init(a *app.App) (err error) {
s.metric, _ = a.Component(metric.CName).(metric.Metric)
return nil return nil
} }