Merge pull request #96 from anytypeio/streampool-send-queue-per-peer
streampool: send queue per peer + metrics
This commit is contained in:
commit
13bc7472a4
@ -54,7 +54,7 @@ func newMessagePool(peerManager peermanager.PeerManager, messageHandler MessageH
|
|||||||
func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
||||||
s.updateLastUsage()
|
s.updateLastUsage()
|
||||||
var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
ctx, cancel = context.WithTimeout(ctx, time.Second*10)
|
ctx, cancel = context.WithTimeout(ctx, time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
newCounter := s.counter.Add(1)
|
newCounter := s.counter.Add(1)
|
||||||
msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
msg.RequestId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
||||||
|
|||||||
@ -109,7 +109,7 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
|||||||
}
|
}
|
||||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.DebugCtx(ctx, "failed to get object")
|
log.WarnCtx(ctx, "failed to get object", zap.Error(err))
|
||||||
// TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync
|
// TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync
|
||||||
return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId)
|
return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId)
|
||||||
}
|
}
|
||||||
|
|||||||
36
net/streampool/metric.go
Normal file
36
net/streampool/metric.go
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
package streampool
|
||||||
|
|
||||||
|
import "github.com/prometheus/client_golang/prometheus"
|
||||||
|
|
||||||
|
func registerMetrics(ref *prometheus.Registry, sp *streamPool, name string) {
|
||||||
|
ref.MustRegister(
|
||||||
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: name,
|
||||||
|
Subsystem: "streampool",
|
||||||
|
Name: "stream_count",
|
||||||
|
Help: "count of opened streams",
|
||||||
|
}, func() float64 {
|
||||||
|
sp.mu.Lock()
|
||||||
|
defer sp.mu.Unlock()
|
||||||
|
return float64(len(sp.streams))
|
||||||
|
}),
|
||||||
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: name,
|
||||||
|
Subsystem: "streampool",
|
||||||
|
Name: "tag_count",
|
||||||
|
Help: "count of active tags",
|
||||||
|
}, func() float64 {
|
||||||
|
sp.mu.Lock()
|
||||||
|
defer sp.mu.Unlock()
|
||||||
|
return float64(len(sp.streamIdsByTag))
|
||||||
|
}),
|
||||||
|
prometheus.NewGaugeFunc(prometheus.GaugeOpts{
|
||||||
|
Namespace: name,
|
||||||
|
Subsystem: "streampool",
|
||||||
|
Name: "dial_queue",
|
||||||
|
Help: "dial queue size",
|
||||||
|
}, func() float64 {
|
||||||
|
return float64(sp.dial.batch.Len())
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
}
|
||||||
@ -3,6 +3,7 @@ package streampool
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
|
"github.com/anytypeio/any-sync/util/multiqueue"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"storj.io/drpc"
|
"storj.io/drpc"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
@ -16,14 +17,17 @@ type stream struct {
|
|||||||
streamId uint32
|
streamId uint32
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
l logger.CtxLogger
|
l logger.CtxLogger
|
||||||
|
queue multiqueue.MultiQueue[drpc.Message]
|
||||||
tags []string
|
tags []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sr *stream) write(msg drpc.Message) (err error) {
|
func (sr *stream) write(msg drpc.Message) (err error) {
|
||||||
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil {
|
var queueId string
|
||||||
sr.streamClose()
|
if qId, ok := msg.(MessageQueueId); ok {
|
||||||
|
queueId = qId.MessageQueueId()
|
||||||
|
msg = qId.DrpcMessage()
|
||||||
}
|
}
|
||||||
return err
|
return sr.queue.Add(sr.stream.Context(), queueId, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sr *stream) readLoop() error {
|
func (sr *stream) readLoop() error {
|
||||||
@ -46,8 +50,18 @@ func (sr *stream) readLoop() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sr *stream) writeToStream(msg drpc.Message) {
|
||||||
|
if err := sr.stream.MsgSend(msg, EncodingProto); err != nil {
|
||||||
|
sr.l.Warn("msg send error", zap.Error(err))
|
||||||
|
sr.streamClose()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (sr *stream) streamClose() {
|
func (sr *stream) streamClose() {
|
||||||
if !sr.closed.Swap(true) {
|
if !sr.closed.Swap(true) {
|
||||||
|
_ = sr.queue.Close()
|
||||||
_ = sr.stream.Close()
|
_ = sr.stream.Close()
|
||||||
sr.pool.removeStream(sr.streamId)
|
sr.pool.removeStream(sr.streamId)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"github.com/anytypeio/any-sync/net/peer"
|
"github.com/anytypeio/any-sync/net/peer"
|
||||||
"github.com/anytypeio/any-sync/net/pool"
|
"github.com/anytypeio/any-sync/net/pool"
|
||||||
|
"github.com/anytypeio/any-sync/util/multiqueue"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
@ -24,6 +25,11 @@ type StreamHandler interface {
|
|||||||
// PeerGetter should dial or return cached peers
|
// PeerGetter should dial or return cached peers
|
||||||
type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)
|
type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)
|
||||||
|
|
||||||
|
type MessageQueueId interface {
|
||||||
|
MessageQueueId() string
|
||||||
|
DrpcMessage() drpc.Message
|
||||||
|
}
|
||||||
|
|
||||||
// StreamPool keeps and read streams
|
// StreamPool keeps and read streams
|
||||||
type StreamPool interface {
|
type StreamPool interface {
|
||||||
// AddStream adds new outgoing stream into the pool
|
// AddStream adds new outgoing stream into the pool
|
||||||
@ -52,9 +58,9 @@ type streamPool struct {
|
|||||||
streamIdsByTag map[string][]uint32
|
streamIdsByTag map[string][]uint32
|
||||||
streams map[uint32]*stream
|
streams map[uint32]*stream
|
||||||
opening map[string]*openingProcess
|
opening map[string]*openingProcess
|
||||||
exec *execPool
|
|
||||||
dial *execPool
|
dial *execPool
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
writeQueueSize int
|
||||||
lastStreamId uint32
|
lastStreamId uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,6 +109,10 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
|
|||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.lastStreamId++
|
s.lastStreamId++
|
||||||
streamId := s.lastStreamId
|
streamId := s.lastStreamId
|
||||||
|
queueSize := s.writeQueueSize
|
||||||
|
if queueSize <= 0 {
|
||||||
|
queueSize = 100
|
||||||
|
}
|
||||||
st := &stream{
|
st := &stream{
|
||||||
peerId: peerId,
|
peerId: peerId,
|
||||||
peerCtx: ctx,
|
peerCtx: ctx,
|
||||||
@ -112,6 +122,7 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
|
|||||||
l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)),
|
l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)),
|
||||||
tags: tags,
|
tags: tags,
|
||||||
}
|
}
|
||||||
|
st.queue = multiqueue.New[drpc.Message](st.writeToStream, s.writeQueueSize)
|
||||||
s.streams[streamId] = st
|
s.streams[streamId] = st
|
||||||
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
|
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
|
||||||
for _, tag := range tags {
|
for _, tag := range tags {
|
||||||
@ -121,23 +132,16 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) {
|
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) {
|
||||||
var sendOneFunc = func(sp peer.Peer) func() {
|
|
||||||
return func() {
|
|
||||||
if e := s.sendOne(ctx, sp, msg); e != nil {
|
|
||||||
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", sp.Id()))
|
|
||||||
} else {
|
|
||||||
log.DebugCtx(ctx, "send success", zap.String("peerId", sp.Id()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return s.dial.TryAdd(func() {
|
return s.dial.TryAdd(func() {
|
||||||
peers, dialErr := peerGetter(ctx)
|
peers, dialErr := peerGetter(ctx)
|
||||||
if dialErr != nil {
|
if dialErr != nil {
|
||||||
log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr))
|
log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr))
|
||||||
}
|
}
|
||||||
for _, p := range peers {
|
for _, p := range peers {
|
||||||
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil {
|
if e := s.sendOne(ctx, p, msg); e != nil {
|
||||||
return
|
log.InfoCtx(ctx, "send peer error", zap.Error(e), zap.String("peerId", p.Id()))
|
||||||
|
} else {
|
||||||
|
log.DebugCtx(ctx, "send success", zap.String("peerId", p.Id()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
@ -157,22 +161,14 @@ func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...
|
|||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
|
|
||||||
var sendStreamsFunc = func(streams []*stream) func() {
|
|
||||||
return func() {
|
|
||||||
for _, st := range streams {
|
|
||||||
if e := st.write(msg); e != nil {
|
|
||||||
st.l.Debug("sendById write error", zap.Error(e))
|
|
||||||
} else {
|
|
||||||
st.l.DebugCtx(ctx, "sendById success")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, streams := range streamsByPeer {
|
for _, streams := range streamsByPeer {
|
||||||
if err = s.exec.Add(ctx, sendStreamsFunc(streams)); err != nil {
|
for _, st := range streams {
|
||||||
return
|
if e := st.write(msg); e != nil {
|
||||||
|
st.l.Debug("sendById write error", zap.Error(e))
|
||||||
|
} else {
|
||||||
|
st.l.DebugCtx(ctx, "sendById success")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(streamsByPeer) == 0 {
|
if len(streamsByPeer) == 0 {
|
||||||
@ -271,21 +267,12 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
var sendStreamFunc = func(st *stream) func() {
|
|
||||||
return func() {
|
|
||||||
if e := st.write(msg); e != nil {
|
|
||||||
st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e))
|
|
||||||
} else {
|
|
||||||
st.l.DebugCtx(ctx, "broadcast success")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, st := range streams {
|
for _, st := range streams {
|
||||||
if st == nil {
|
if e := st.write(msg); e != nil {
|
||||||
panic("nil stream")
|
st.l.InfoCtx(ctx, "broadcast write error", zap.Error(e))
|
||||||
}
|
} else {
|
||||||
if err = s.exec.Add(ctx, sendStreamFunc(st)); err != nil {
|
st.l.DebugCtx(ctx, "broadcast success")
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
@ -361,10 +348,7 @@ func (s *streamPool) removeStream(streamId uint32) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPool) Close() (err error) {
|
func (s *streamPool) Close() (err error) {
|
||||||
if e := s.dial.Close(); e != nil {
|
return s.dial.Close()
|
||||||
log.Warn("dial queue close error", zap.Error(e))
|
|
||||||
}
|
|
||||||
return s.exec.Close()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func removeStream(m map[string][]uint32, key string, streamId uint32) {
|
func removeStream(m map[string][]uint32, key string, streamId uint32) {
|
||||||
@ -380,3 +364,21 @@ func removeStream(m map[string][]uint32, key string, streamId uint32) {
|
|||||||
m[key] = streamIds
|
m[key] = streamIds
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithQueueId wraps the message and adds queueId
|
||||||
|
func WithQueueId(msg drpc.Message, queueId string) drpc.Message {
|
||||||
|
return &messageWithQueueId{queueId: queueId, Message: msg}
|
||||||
|
}
|
||||||
|
|
||||||
|
type messageWithQueueId struct {
|
||||||
|
drpc.Message
|
||||||
|
queueId string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m messageWithQueueId) MessageQueueId() string {
|
||||||
|
return m.queueId
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m messageWithQueueId) DrpcMessage() drpc.Message {
|
||||||
|
return m.Message
|
||||||
|
}
|
||||||
|
|||||||
@ -39,7 +39,7 @@ func TestStreamPool_AddStream(t *testing.T) {
|
|||||||
require.NoError(t, fx.AddStream(s2, "space2", "common"))
|
require.NoError(t, fx.AddStream(s2, "space2", "common"))
|
||||||
|
|
||||||
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1"))
|
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1"))
|
||||||
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2"))
|
require.NoError(t, fx.Broadcast(ctx, WithQueueId(&testservice.StreamMessage{ReqData: "space2"}, "q2"), "space2"))
|
||||||
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common"))
|
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common"))
|
||||||
|
|
||||||
var serverResults []string
|
var serverResults []string
|
||||||
@ -202,7 +202,6 @@ func newFixture(t *testing.T) *fixture {
|
|||||||
fx.tp = rpctest.NewTestPool().WithServer(ts)
|
fx.tp = rpctest.NewTestPool().WithServer(ts)
|
||||||
fx.th = &testHandler{}
|
fx.th = &testHandler{}
|
||||||
fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{
|
fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{
|
||||||
SendQueueWorkers: 4,
|
|
||||||
SendQueueSize: 10,
|
SendQueueSize: 10,
|
||||||
DialQueueWorkers: 1,
|
DialQueueWorkers: 1,
|
||||||
DialQueueSize: 10,
|
DialQueueSize: 10,
|
||||||
|
|||||||
@ -3,6 +3,7 @@ package streampool
|
|||||||
import (
|
import (
|
||||||
"github.com/anytypeio/any-sync/app"
|
"github.com/anytypeio/any-sync/app"
|
||||||
"github.com/anytypeio/any-sync/app/logger"
|
"github.com/anytypeio/any-sync/app/logger"
|
||||||
|
"github.com/anytypeio/any-sync/metric"
|
||||||
)
|
)
|
||||||
|
|
||||||
const CName = "common.net.streampool"
|
const CName = "common.net.streampool"
|
||||||
@ -14,9 +15,7 @@ func New() Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type StreamConfig struct {
|
type StreamConfig struct {
|
||||||
// SendQueueWorkers how many workers will write message to streams
|
// SendQueueSize size of the queue for write per peer
|
||||||
SendQueueWorkers int
|
|
||||||
// SendQueueSize size of the queue for write
|
|
||||||
SendQueueSize int
|
SendQueueSize int
|
||||||
// DialQueueWorkers how many workers will dial to peers
|
// DialQueueWorkers how many workers will dial to peers
|
||||||
DialQueueWorkers int
|
DialQueueWorkers int
|
||||||
@ -30,22 +29,27 @@ type Service interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type service struct {
|
type service struct {
|
||||||
|
metric metric.Metric
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
|
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
|
||||||
sp := &streamPool{
|
sp := &streamPool{
|
||||||
handler: h,
|
handler: h,
|
||||||
|
writeQueueSize: conf.SendQueueSize,
|
||||||
streamIdsByPeer: map[string][]uint32{},
|
streamIdsByPeer: map[string][]uint32{},
|
||||||
streamIdsByTag: map[string][]uint32{},
|
streamIdsByTag: map[string][]uint32{},
|
||||||
streams: map[uint32]*stream{},
|
streams: map[uint32]*stream{},
|
||||||
opening: map[string]*openingProcess{},
|
opening: map[string]*openingProcess{},
|
||||||
exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize),
|
|
||||||
dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize),
|
dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize),
|
||||||
}
|
}
|
||||||
|
if s.metric != nil {
|
||||||
|
registerMetrics(s.metric.Registry(), sp, "")
|
||||||
|
}
|
||||||
return sp
|
return sp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *service) Init(a *app.App) (err error) {
|
func (s *service) Init(a *app.App) (err error) {
|
||||||
|
s.metric, _ = a.Component(metric.CName).(metric.Metric)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user