Merge pull request #86 from anytypeio/GO-802-metrics

GO-802 rpc metrics
This commit is contained in:
Sergey Cherepanov 2023-05-05 19:35:18 +02:00 committed by GitHub
commit 7394b4bd67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 219 additions and 43 deletions

View File

@ -51,6 +51,10 @@ func (p pushSpaceRequestMatcher) String() string {
type mockPeer struct{} type mockPeer struct{}
func (m mockPeer) Addr() string {
return ""
}
func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) { func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, m.Close() return true, m.Close()
} }

View File

@ -20,6 +20,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/metric"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf"
"github.com/anytypeio/any-sync/util/crypto" "github.com/anytypeio/any-sync/util/crypto"
@ -56,9 +57,21 @@ type SpaceCreatePayload struct {
type HandleMessage struct { type HandleMessage struct {
Id uint64 Id uint64
ReceiveTime time.Time
StartHandlingTime time.Time
Deadline time.Time Deadline time.Time
SenderId string SenderId string
Message *spacesyncproto.ObjectSyncMessage Message *spacesyncproto.ObjectSyncMessage
PeerCtx context.Context
}
func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
return append(fields,
metric.SpaceId(m.Message.SpaceId),
metric.ObjectId(m.Message.ObjectId),
metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)),
metric.TotalDur(time.Since(m.ReceiveTime)),
)
} }
type SpaceDerivePayload struct { type SpaceDerivePayload struct {
@ -124,6 +137,7 @@ type space struct {
settingsObject settings.SettingsObject settingsObject settings.SettingsObject
peerManager peermanager.PeerManager peerManager peermanager.PeerManager
treeBuilder objecttree.BuildObjectTreeFunc treeBuilder objecttree.BuildObjectTreeFunc
metric metric.Metric
handleQueue multiqueue.MultiQueue[HandleMessage] handleQueue multiqueue.MultiQueue[HandleMessage]
@ -362,6 +376,7 @@ func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.R
func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
threadId := hm.Message.ObjectId threadId := hm.Message.ObjectId
hm.ReceiveTime = time.Now()
if hm.Message.ReplyId != "" { if hm.Message.ReplyId != "" {
threadId += hm.Message.ReplyId threadId += hm.Message.ReplyId
defer func() { defer func() {
@ -378,12 +393,24 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
} }
func (s *space) handleMessage(msg HandleMessage) { func (s *space) handleMessage(msg HandleMessage) {
var err error
msg.StartHandlingTime = time.Now()
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
defer func() {
if s.metric == nil {
return
}
s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields(
zap.Error(err),
)...)
}()
if !msg.Deadline.IsZero() { if !msg.Deadline.IsZero() {
now := time.Now() now := time.Now()
if now.After(msg.Deadline) { if now.After(msg.Deadline) {
log.InfoCtx(ctx, "skip message: deadline exceed") log.InfoCtx(ctx, "skip message: deadline exceed")
err = context.DeadlineExceeded
return return
} }
var cancel context.CancelFunc var cancel context.CancelFunc
@ -391,7 +418,7 @@ func (s *space) handleMessage(msg HandleMessage) {
defer cancel() defer cancel()
} }
if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { if err = s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
if msg.Message.ObjectId != "" { if msg.Message.ObjectId != "" {
// cleanup thread on error // cleanup thread on error
_ = s.handleQueue.CloseThread(msg.Message.ObjectId) _ = s.handleQueue.CloseThread(msg.Message.ObjectId)

View File

@ -16,6 +16,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/metric"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/pool"
"github.com/anytypeio/any-sync/net/rpc/rpcerr" "github.com/anytypeio/any-sync/net/rpc/rpcerr"
@ -52,6 +53,7 @@ type spaceService struct {
credentialProvider credentialprovider.CredentialProvider credentialProvider credentialprovider.CredentialProvider
treeManager treemanager.TreeManager treeManager treemanager.TreeManager
pool pool.Pool pool pool.Pool
metric metric.Metric
} }
func (s *spaceService) Init(a *app.App) (err error) { func (s *spaceService) Init(a *app.App) (err error) {
@ -68,6 +70,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
s.credentialProvider = credentialprovider.NewNoOp() s.credentialProvider = credentialprovider.NewNoOp()
} }
s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.metric, _ = a.Component(metric.CName).(metric.Metric)
return nil return nil
} }
@ -181,6 +184,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
treeBuilder: builder, treeBuilder: builder,
isClosed: spaceIsClosed, isClosed: spaceIsClosed,
isDeleted: spaceIsDeleted, isDeleted: spaceIsDeleted,
metric: s.metric,
} }
return sp, nil return sp, nil
} }

View File

@ -6,12 +6,12 @@ import (
"time" "time"
) )
type PrometheusDRPC struct { type prometheusDRPC struct {
drpc.Handler drpc.Handler
SummaryVec *prometheus.SummaryVec SummaryVec *prometheus.SummaryVec
} }
func (ph *PrometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) { func (ph *prometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) {
st := time.Now() st := time.Now()
defer func() { defer func() {
ph.SummaryVec.WithLabelValues(rpc).Observe(time.Since(st).Seconds()) ph.SummaryVec.WithLabelValues(rpc).Observe(time.Since(st).Seconds())

61
metric/log.go Normal file
View File

@ -0,0 +1,61 @@
package metric
import (
"context"
"github.com/anytypeio/any-sync/net/peer"
"go.uber.org/zap"
"time"
)
func Method(val string) zap.Field {
return zap.String("rpc", val)
}
func QueueDur(val time.Duration) zap.Field {
return zap.Float64("queueDur", val.Seconds())
}
func TotalDur(val time.Duration) zap.Field {
return zap.Float64("totalDur", val.Seconds())
}
func SpaceId(val string) zap.Field {
return zap.String("spaceId", val)
}
func ObjectId(val string) zap.Field {
return zap.String("objectId", val)
}
func PeerId(val string) zap.Field {
return zap.String("peerId", val)
}
func Identity(val string) zap.Field {
return zap.String("identity", val)
}
func FileId(fileId string) zap.Field {
return zap.String("fileId", fileId)
}
func Cid(cid string) zap.Field {
return zap.String("cid", cid)
}
func Size(size int) zap.Field {
return zap.Int("size", size)
}
func (m *metric) RequestLog(ctx context.Context, rpc string, fields ...zap.Field) {
if m == nil {
return
}
peerId, _ := peer.CtxPeerId(ctx)
ak, _ := peer.CtxPubKey(ctx)
var acc string
if ak != nil {
acc = ak.Account()
}
m.rpcLog.Info("", append(fields, PeerId(peerId), Identity(acc), Method(rpc))...)
}

12
metric/log_test.go Normal file
View File

@ -0,0 +1,12 @@
package metric
import (
"context"
"github.com/anytypeio/any-sync/app/logger"
"testing"
)
func TestLog(t *testing.T) {
m := &metric{rpcLog: logger.NewNamed("rpcLog")}
m.RequestLog(context.Background(), "rpc")
}

View File

@ -3,32 +3,41 @@ package metric
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"net/http" "net/http"
"storj.io/drpc"
"time" "time"
) )
const CName = "common.metric" const CName = "common.metric"
var log = logger.NewNamed(CName)
func New() Metric { func New() Metric {
return new(metric) return new(metric)
} }
type Metric interface { type Metric interface {
Registry() *prometheus.Registry Registry() *prometheus.Registry
WrapDRPCHandler(h drpc.Handler) drpc.Handler
RequestLog(ctx context.Context, rpc string, fields ...zap.Field)
app.ComponentRunnable app.ComponentRunnable
} }
type metric struct { type metric struct {
registry *prometheus.Registry registry *prometheus.Registry
rpcLog logger.CtxLogger
config Config config Config
} }
func (m *metric) Init(a *app.App) (err error) { func (m *metric) Init(a *app.App) (err error) {
m.registry = prometheus.NewRegistry() m.registry = prometheus.NewRegistry()
m.config = a.MustComponent("config").(configSource).GetMetric() m.config = a.MustComponent("config").(configSource).GetMetric()
m.rpcLog = logger.NewNamed("rpcLog")
return nil return nil
} }
@ -61,6 +70,31 @@ func (m *metric) Registry() *prometheus.Registry {
return m.registry return m.registry
} }
func (m *metric) WrapDRPCHandler(h drpc.Handler) drpc.Handler {
if m == nil {
return h
}
histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "drpc",
Subsystem: "server",
Name: "duration_seconds",
Objectives: map[float64]float64{
0.5: 0.5,
0.85: 0.01,
0.95: 0.0005,
0.99: 0.0001,
},
}, []string{"rpc"})
if err := m.Registry().Register(histVec); err != nil {
log.Warn("can't register prometheus drpc metric", zap.Error(err))
return h
}
return &prometheusDRPC{
Handler: h,
SummaryVec: histVec,
}
}
func (m *metric) Close(ctx context.Context) (err error) { func (m *metric) Close(ctx context.Context) (err error) {
return return
} }

View File

@ -13,6 +13,7 @@ type contextKey uint
const ( const (
contextKeyPeerId contextKey = iota contextKeyPeerId contextKey = iota
contextKeyIdentity contextKeyIdentity
contextKeyPeerAddr
) )
var ( var (
@ -36,6 +37,19 @@ func CtxWithPeerId(ctx context.Context, peerId string) context.Context {
return context.WithValue(ctx, contextKeyPeerId, peerId) return context.WithValue(ctx, contextKeyPeerId, peerId)
} }
// CtxPeerAddr returns peer address
func CtxPeerAddr(ctx context.Context) string {
if p, ok := ctx.Value(contextKeyPeerAddr).(string); ok {
return p
}
return ""
}
// CtxWithPeerAddr sets peer address to the context
func CtxWithPeerAddr(ctx context.Context, addr string) context.Context {
return context.WithValue(ctx, contextKeyPeerAddr, addr)
}
// CtxIdentity returns identity from context // CtxIdentity returns identity from context
func CtxIdentity(ctx context.Context) ([]byte, error) { func CtxIdentity(ctx context.Context) ([]byte, error) {
if identity, ok := ctx.Value(contextKeyIdentity).([]byte); ok { if identity, ok := ctx.Value(contextKeyIdentity).([]byte); ok {

View File

@ -26,6 +26,7 @@ type Peer interface {
Id() string Id() string
LastUsage() time.Time LastUsage() time.Time
UpdateLastUsage() UpdateLastUsage()
Addr() string
TryClose(objectTTL time.Duration) (res bool, err error) TryClose(objectTTL time.Duration) (res bool, err error)
drpc.Conn drpc.Conn
} }
@ -86,6 +87,13 @@ func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, p.Close() return true, p.Close()
} }
func (p *peer) Addr() string {
if p.sc != nil {
return p.sc.RemoteAddr().String()
}
return ""
}
func (p *peer) Close() (err error) { func (p *peer) Close() (err error) {
log.Debug("peer close", zap.String("peerId", p.id)) log.Debug("peer close", zap.String("peerId", p.id))
return p.Conn.Close() return p.Conn.Close()

View File

@ -184,6 +184,10 @@ type testPeer struct {
closed chan struct{} closed chan struct{}
} }
func (t *testPeer) Addr() string {
return ""
}
func (t *testPeer) Id() string { func (t *testPeer) Id() string {
return t.id return t.id
} }

View File

@ -103,6 +103,10 @@ type testPeer struct {
drpc.Conn drpc.Conn
} }
func (t testPeer) Addr() string {
return ""
}
func (t testPeer) TryClose(objectTTL time.Duration) (res bool, err error) { func (t testPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, t.Close() return true, t.Close()
} }

View File

@ -2,6 +2,7 @@ package server
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/secureservice" "github.com/anytypeio/any-sync/net/secureservice"
"github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec"
"github.com/zeebo/errs" "github.com/zeebo/errs"
@ -98,8 +99,11 @@ func (s *BaseDrpcServer) serveConn(conn net.Conn) {
l.Info("handshake error", zap.Error(err)) l.Info("handshake error", zap.Error(err))
return return
} }
if sc, ok := conn.(sec.SecureConn); ok {
ctx = peer.CtxWithPeerId(ctx, sc.RemotePeer().String())
} }
}
ctx = peer.CtxWithPeerAddr(ctx, conn.RemoteAddr().String())
l.Debug("connection opened") l.Debug("connection opened")
if err := s.drpcServer.ServeOne(ctx, conn); err != nil { if err := s.drpcServer.ServeOne(ctx, conn); err != nil {
if errs.Is(err, context.Canceled) || errs.Is(err, io.EOF) { if errs.Is(err, context.Canceled) || errs.Is(err, io.EOF) {

View File

@ -8,7 +8,6 @@ import (
anyNet "github.com/anytypeio/any-sync/net" anyNet "github.com/anytypeio/any-sync/net"
"github.com/anytypeio/any-sync/net/secureservice" "github.com/anytypeio/any-sync/net/secureservice"
"github.com/libp2p/go-libp2p/core/sec" "github.com/libp2p/go-libp2p/core/sec"
"github.com/prometheus/client_golang/prometheus"
"net" "net"
"storj.io/drpc" "storj.io/drpc"
"time" "time"
@ -46,29 +45,12 @@ func (s *drpcServer) Name() (name string) {
} }
func (s *drpcServer) Run(ctx context.Context) (err error) { func (s *drpcServer) Run(ctx context.Context) (err error) {
histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "drpc",
Subsystem: "server",
Name: "duration_seconds",
Objectives: map[float64]float64{
0.5: 0.5,
0.85: 0.01,
0.95: 0.0005,
0.99: 0.0001,
},
}, []string{"rpc"})
if err = s.metric.Registry().Register(histVec); err != nil {
return
}
params := Params{ params := Params{
BufferSizeMb: s.config.Stream.MaxMsgSizeMb, BufferSizeMb: s.config.Stream.MaxMsgSizeMb,
TimeoutMillis: s.config.Stream.TimeoutMilliseconds, TimeoutMillis: s.config.Stream.TimeoutMilliseconds,
ListenAddrs: s.config.Server.ListenAddrs, ListenAddrs: s.config.Server.ListenAddrs,
Wrapper: func(handler drpc.Handler) drpc.Handler { Wrapper: func(handler drpc.Handler) drpc.Handler {
return &metric.PrometheusDRPC{ return s.metric.WrapDRPCHandler(handler)
Handler: handler,
SummaryVec: histVec,
}
}, },
Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) { Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)

View File

@ -10,6 +10,7 @@ import (
type stream struct { type stream struct {
peerId string peerId string
peerCtx context.Context
stream drpc.Stream stream drpc.Stream
pool *streamPool pool *streamPool
streamId uint32 streamId uint32
@ -36,7 +37,7 @@ func (sr *stream) readLoop() error {
sr.l.Info("msg receive error", zap.Error(err)) sr.l.Info("msg receive error", zap.Error(err))
return err return err
} }
ctx := streamCtx(context.Background(), sr.streamId, sr.peerId) ctx := streamCtx(sr.peerCtx, sr.streamId, sr.peerId)
ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId)) ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId))
if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil { if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil {
sr.l.Info("msg handle error", zap.Error(err)) sr.l.Info("msg handle error", zap.Error(err))

View File

@ -27,9 +27,9 @@ type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)
// StreamPool keeps and read streams // StreamPool keeps and read streams
type StreamPool interface { type StreamPool interface {
// AddStream adds new outgoing stream into the pool // AddStream adds new outgoing stream into the pool
AddStream(peerId string, stream drpc.Stream, tags ...string) AddStream(stream drpc.Stream, tags ...string) (err error)
// ReadStream adds new incoming stream and synchronously read it // ReadStream adds new incoming stream and synchronously read it
ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error) ReadStream(stream drpc.Stream, tags ...string) (err error)
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error) Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
// SendById sends a message to given peerIds. Works only if stream exists // SendById sends a message to given peerIds. Works only if stream exists
@ -63,16 +63,23 @@ type openingProcess struct {
err error err error
} }
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { func (s *streamPool) ReadStream(drpcStream drpc.Stream, tags ...string) error {
st := s.addStream(peerId, drpcStream, tags...) st, err := s.addStream(drpcStream, tags...)
if err != nil {
return err
}
return st.readLoop() return st.readLoop()
} }
func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) { func (s *streamPool) AddStream(drpcStream drpc.Stream, tags ...string) error {
st := s.addStream(peerId, drpcStream, tags...) st, err := s.addStream(drpcStream, tags...)
if err != nil {
return err
}
go func() { go func() {
_ = st.readLoop() _ = st.readLoop()
}() }()
return nil
} }
func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) { func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) {
@ -86,13 +93,19 @@ func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) {
return return
} }
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream { func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, error) {
ctx := drpcStream.Context()
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return nil, err
}
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.lastStreamId++ s.lastStreamId++
streamId := s.lastStreamId streamId := s.lastStreamId
st := &stream{ st := &stream{
peerId: peerId, peerId: peerId,
peerCtx: ctx,
stream: drpcStream, stream: drpcStream,
pool: s, pool: s,
streamId: streamId, streamId: streamId,
@ -104,7 +117,7 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st
for _, tag := range tags { for _, tag := range tags {
s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId)
} }
return st return st, nil
} }
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) { func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) {
@ -241,7 +254,10 @@ func (s *streamPool) openStream(ctx context.Context, p peer.Peer) *openingProces
op.err = err op.err = err
return return
} }
s.AddStream(p.Id(), st, tags...) if err = s.AddStream(st, tags...); err != nil {
op.err = nil
return
}
}() }()
return op return op
} }

View File

@ -21,6 +21,7 @@ var ctx = context.Background()
func newClientStream(t *testing.T, fx *fixture, peerId string) (st testservice.DRPCTest_TestStreamClient, p peer.Peer) { func newClientStream(t *testing.T, fx *fixture, peerId string) (st testservice.DRPCTest_TestStreamClient, p peer.Peer) {
p, err := fx.tp.Dial(ctx, peerId) p, err := fx.tp.Dial(ctx, peerId)
require.NoError(t, err) require.NoError(t, err)
ctx = peer.CtxWithPeerId(ctx, peerId)
s, err := testservice.NewDRPCTestClient(p).TestStream(ctx) s, err := testservice.NewDRPCTestClient(p).TestStream(ctx)
require.NoError(t, err) require.NoError(t, err)
return s, p return s, p
@ -33,9 +34,9 @@ func TestStreamPool_AddStream(t *testing.T) {
defer fx.Finish(t) defer fx.Finish(t)
s1, _ := newClientStream(t, fx, "p1") s1, _ := newClientStream(t, fx, "p1")
fx.AddStream("p1", s1, "space1", "common") require.NoError(t, fx.AddStream(s1, "space1", "common"))
s2, _ := newClientStream(t, fx, "p2") s2, _ := newClientStream(t, fx, "p2")
fx.AddStream("p2", s2, "space2", "common") require.NoError(t, fx.AddStream(s2, "space2", "common"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2"))
@ -64,7 +65,7 @@ func TestStreamPool_AddStream(t *testing.T) {
s1, p1 := newClientStream(t, fx, "p1") s1, p1 := newClientStream(t, fx, "p1")
defer s1.Close() defer s1.Close()
fx.AddStream("p1", s1, "space1", "common") require.NoError(t, fx.AddStream(s1, "space1", "common"))
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) { require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) {
return []peer.Peer{p1}, nil return []peer.Peer{p1}, nil
@ -159,7 +160,7 @@ func TestStreamPool_SendById(t *testing.T) {
s1, _ := newClientStream(t, fx, "p1") s1, _ := newClientStream(t, fx, "p1")
defer s1.Close() defer s1.Close()
fx.AddStream("p1", s1, "space1", "common") require.NoError(t, fx.AddStream(s1, "space1", "common"))
require.NoError(t, fx.SendById(ctx, &testservice.StreamMessage{ReqData: "test"}, "p1")) require.NoError(t, fx.SendById(ctx, &testservice.StreamMessage{ReqData: "test"}, "p1"))
var msg *testservice.StreamMessage var msg *testservice.StreamMessage
@ -177,11 +178,11 @@ func TestStreamPool_Tags(t *testing.T) {
s1, _ := newClientStream(t, fx, "p1") s1, _ := newClientStream(t, fx, "p1")
defer s1.Close() defer s1.Close()
fx.AddStream("p1", s1, "t1") require.NoError(t, fx.AddStream(s1, "t1"))
s2, _ := newClientStream(t, fx, "p2") s2, _ := newClientStream(t, fx, "p2")
defer s1.Close() defer s1.Close()
fx.AddStream("p2", s2, "t2") require.NoError(t, fx.AddStream(s2, "t2"))
err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3", "t3") err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3", "t3")
require.NoError(t, err) require.NoError(t, err)