From 474b81c8b60c295bde94cf5483b0e0690200dc7c Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 21 Apr 2023 17:00:35 +0200 Subject: [PATCH 01/10] move rpc prometheus config to metric service --- metric/drpc.go | 4 ++-- metric/metric.go | 33 +++++++++++++++++++++++++++++++++ net/rpc/server/drpcserver.go | 20 +------------------- 3 files changed, 36 insertions(+), 21 deletions(-) diff --git a/metric/drpc.go b/metric/drpc.go index 97c01f8a..b81ed863 100644 --- a/metric/drpc.go +++ b/metric/drpc.go @@ -6,12 +6,12 @@ import ( "time" ) -type PrometheusDRPC struct { +type prometheusDRPC struct { drpc.Handler SummaryVec *prometheus.SummaryVec } -func (ph *PrometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) { +func (ph *prometheusDRPC) HandleRPC(stream drpc.Stream, rpc string) (err error) { st := time.Now() defer func() { ph.SummaryVec.WithLabelValues(rpc).Observe(time.Since(st).Seconds()) diff --git a/metric/metric.go b/metric/metric.go index f3ddfae6..d64e81da 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -3,32 +3,40 @@ package metric import ( "context" "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/app/logger" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.uber.org/zap" "net/http" + "storj.io/drpc" "time" ) const CName = "common.metric" +var log = logger.NewNamed(CName) + func New() Metric { return new(metric) } type Metric interface { Registry() *prometheus.Registry + WrapDRPCHandler(h drpc.Handler) drpc.Handler app.ComponentRunnable } type metric struct { registry *prometheus.Registry + rpcLog logger.CtxLogger config Config } func (m *metric) Init(a *app.App) (err error) { m.registry = prometheus.NewRegistry() m.config = a.MustComponent("config").(configSource).GetMetric() + m.rpcLog = logger.NewNamed("rpcLog") return nil } @@ -61,6 +69,31 @@ func (m *metric) Registry() *prometheus.Registry { return m.registry } +func (m *metric) WrapDRPCHandler(h drpc.Handler) drpc.Handler { + if m == nil { + return h + } + histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "drpc", + Subsystem: "server", + Name: "duration_seconds", + Objectives: map[float64]float64{ + 0.5: 0.5, + 0.85: 0.01, + 0.95: 0.0005, + 0.99: 0.0001, + }, + }, []string{"rpc"}) + if err := m.Registry().Register(histVec); err != nil { + log.Warn("can't register prometheus drpc metric", zap.Error(err)) + return h + } + return &prometheusDRPC{ + Handler: h, + SummaryVec: histVec, + } +} + func (m *metric) Close(ctx context.Context) (err error) { return } diff --git a/net/rpc/server/drpcserver.go b/net/rpc/server/drpcserver.go index 4d9d54ac..e97ea0d3 100644 --- a/net/rpc/server/drpcserver.go +++ b/net/rpc/server/drpcserver.go @@ -8,7 +8,6 @@ import ( anyNet "github.com/anytypeio/any-sync/net" "github.com/anytypeio/any-sync/net/secureservice" "github.com/libp2p/go-libp2p/core/sec" - "github.com/prometheus/client_golang/prometheus" "net" "storj.io/drpc" "time" @@ -46,29 +45,12 @@ func (s *drpcServer) Name() (name string) { } func (s *drpcServer) Run(ctx context.Context) (err error) { - histVec := prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "drpc", - Subsystem: "server", - Name: "duration_seconds", - Objectives: map[float64]float64{ - 0.5: 0.5, - 0.85: 0.01, - 0.95: 0.0005, - 0.99: 0.0001, - }, - }, []string{"rpc"}) - if err = s.metric.Registry().Register(histVec); err != nil { - return - } params := Params{ BufferSizeMb: s.config.Stream.MaxMsgSizeMb, TimeoutMillis: s.config.Stream.TimeoutMilliseconds, ListenAddrs: s.config.Server.ListenAddrs, Wrapper: func(handler drpc.Handler) drpc.Handler { - return &metric.PrometheusDRPC{ - Handler: handler, - SummaryVec: histVec, - } + return s.metric.WrapDRPCHandler(handler) }, Handshake: func(conn net.Conn) (cCtx context.Context, sc sec.SecureConn, err error) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) From 6cbff3a930e437421930a78f6de1424863faa020 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 24 Apr 2023 14:59:15 +0200 Subject: [PATCH 02/10] wip: drpc access log --- commonspace/space.go | 31 ++++++++++++--- commonspace/spaceservice.go | 4 ++ commonspace/spacestorage/spacestorage_test.go | 1 + metric/log.go | 39 +++++++++++++++++++ metric/metric.go | 1 + 5 files changed, 71 insertions(+), 5 deletions(-) create mode 100644 metric/log.go diff --git a/commonspace/space.go b/commonspace/space.go index cdb00e18..ecb81a80 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -21,6 +21,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/metric" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/crypto" @@ -55,10 +56,21 @@ type SpaceCreatePayload struct { } type HandleMessage struct { - Id uint64 - Deadline time.Time - SenderId string - Message *spacesyncproto.ObjectSyncMessage + Id uint64 + ReceiveTime time.Time + StartHandlingTime time.Time + Deadline time.Time + SenderId string + Message *spacesyncproto.ObjectSyncMessage +} + +func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field { + return append(fields, + metric.SpaceId(m.Message.SpaceId), + metric.ObjectId(m.Message.ObjectId), + metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)), + metric.TotalDur(time.Since(m.ReceiveTime)), + ) } type SpaceDerivePayload struct { @@ -124,6 +136,7 @@ type space struct { configuration nodeconf.NodeConf settingsObject settings.SettingsObject peerManager peermanager.PeerManager + metric metric.Metric handleQueue multiqueue.MultiQueue[HandleMessage] @@ -372,6 +385,7 @@ func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.R func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { threadId := hm.Message.ObjectId + hm.ReceiveTime = time.Now() if hm.Message.ReplyId != "" { threadId += hm.Message.ReplyId defer func() { @@ -388,12 +402,19 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) } func (s *space) handleMessage(msg HandleMessage) { + var err error + msg.StartHandlingTime = time.Now() ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) + defer func() { + s.metric.RequestLog(ctx, msg.LogFields(zap.Error(err))...) + }() + if !msg.Deadline.IsZero() { now := time.Now() if now.After(msg.Deadline) { log.InfoCtx(ctx, "skip message: deadline exceed") + err = context.DeadlineExceeded return } var cancel context.CancelFunc @@ -401,7 +422,7 @@ func (s *space) handleMessage(msg HandleMessage) { defer cancel() } - if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { + if err = s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { if msg.Message.ObjectId != "" { // cleanup thread on error _ = s.handleQueue.CloseThread(msg.Message.ObjectId) diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 0d2168c7..ab938161 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -15,6 +15,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/metric" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/rpc/rpcerr" @@ -51,6 +52,7 @@ type spaceService struct { credentialProvider credentialprovider.CredentialProvider treeGetter treegetter.TreeGetter pool pool.Pool + metric metric.Metric } func (s *spaceService) Init(a *app.App) (err error) { @@ -67,6 +69,7 @@ func (s *spaceService) Init(a *app.App) (err error) { s.credentialProvider = credentialprovider.NewNoOp() } s.pool = a.MustComponent(pool.CName).(pool.Pool) + s.metric, _ = a.Component(metric.CName).(metric.Metric) return nil } @@ -173,6 +176,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { treesUsed: &atomic.Int32{}, isClosed: spaceIsClosed, isDeleted: spaceIsDeleted, + metric: s.metric, } return sp, nil } diff --git a/commonspace/spacestorage/spacestorage_test.go b/commonspace/spacestorage/spacestorage_test.go index 8243291e..3e7b9959 100644 --- a/commonspace/spacestorage/spacestorage_test.go +++ b/commonspace/spacestorage/spacestorage_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" rand2 "golang.org/x/exp/rand" "strconv" + "strings" "testing" "time" ) diff --git a/metric/log.go b/metric/log.go new file mode 100644 index 00000000..12a18424 --- /dev/null +++ b/metric/log.go @@ -0,0 +1,39 @@ +package metric + +import ( + "go.uber.org/zap" + "golang.org/x/net/context" + "time" +) + +func Method(val string) zap.Field { + return zap.String("rpc", val) +} + +func QueueDur(val time.Duration) zap.Field { + return zap.Int64("queueMs", val.Milliseconds()) +} + +func TotalDur(val time.Duration) zap.Field { + return zap.Int64("totalMs", val.Milliseconds()) +} + +func SpaceId(val string) zap.Field { + return zap.String("spaceId", val) +} + +func ObjectId(val string) zap.Field { + return zap.String("objectId", val) +} + +func Identity(val string) zap.Field { + return zap.String("identity", val) +} + +func IP(val string) zap.Field { + return zap.String("ip", val) +} + +func (m *metric) RequestLog(ctx context.Context, fields ...zap.Field) { + m.rpcLog.InfoCtx(ctx, "", fields...) +} diff --git a/metric/metric.go b/metric/metric.go index d64e81da..32f29ae5 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -24,6 +24,7 @@ func New() Metric { type Metric interface { Registry() *prometheus.Registry WrapDRPCHandler(h drpc.Handler) drpc.Handler + RequestLog(ctx context.Context, fields ...zap.Field) app.ComponentRunnable } From 4bc402f1f0836c3b7ff36d4bbf9083ff55a02de3 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 4 May 2023 17:38:34 +0200 Subject: [PATCH 03/10] stream ctx + peer addr --- commonspace/headsync/diffsyncer_test.go | 4 +++ metric/log_test.go | 1 + net/peer/context.go | 14 ++++++++++ net/peer/peer.go | 8 ++++++ net/pool/pool_test.go | 4 +++ net/rpc/rpctest/pool.go | 4 +++ net/rpc/server/baseserver.go | 6 ++++- net/streampool/stream.go | 3 ++- net/streampool/streampool.go | 34 ++++++++++++++++++------- net/streampool/streampool_test.go | 13 +++++----- 10 files changed, 74 insertions(+), 17 deletions(-) create mode 100644 metric/log_test.go diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index e6558b61..bdbcab58 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -51,6 +51,10 @@ func (p pushSpaceRequestMatcher) String() string { type mockPeer struct{} +func (m mockPeer) Addr() string { + return "" +} + func (m mockPeer) TryClose(objectTTL time.Duration) (res bool, err error) { return true, m.Close() } diff --git a/metric/log_test.go b/metric/log_test.go new file mode 100644 index 00000000..0bad30a0 --- /dev/null +++ b/metric/log_test.go @@ -0,0 +1 @@ +package metric diff --git a/net/peer/context.go b/net/peer/context.go index d759ad01..b753e7d2 100644 --- a/net/peer/context.go +++ b/net/peer/context.go @@ -13,6 +13,7 @@ type contextKey uint const ( contextKeyPeerId contextKey = iota contextKeyIdentity + contextKeyPeerAddr ) var ( @@ -36,6 +37,19 @@ func CtxWithPeerId(ctx context.Context, peerId string) context.Context { return context.WithValue(ctx, contextKeyPeerId, peerId) } +// CtxPeerAddr returns peer address +func CtxPeerAddr(ctx context.Context) string { + if p, ok := ctx.Value(contextKeyPeerAddr).(string); ok { + return p + } + return "" +} + +// CtxWithPeerAddr sets peer address to the context +func CtxWithPeerAddr(ctx context.Context, addr string) context.Context { + return context.WithValue(ctx, contextKeyPeerAddr, addr) +} + // CtxIdentity returns identity from context func CtxIdentity(ctx context.Context) ([]byte, error) { if identity, ok := ctx.Value(contextKeyIdentity).([]byte); ok { diff --git a/net/peer/peer.go b/net/peer/peer.go index 92137930..95ac2b12 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -26,6 +26,7 @@ type Peer interface { Id() string LastUsage() time.Time UpdateLastUsage() + Addr() string TryClose(objectTTL time.Duration) (res bool, err error) drpc.Conn } @@ -86,6 +87,13 @@ func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) { return true, p.Close() } +func (p *peer) Addr() string { + if p.sc != nil { + return p.sc.RemoteAddr().String() + } + return "" +} + func (p *peer) Close() (err error) { log.Debug("peer close", zap.String("peerId", p.id)) return p.Conn.Close() diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index ce3876e0..262a59f6 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -184,6 +184,10 @@ type testPeer struct { closed chan struct{} } +func (t *testPeer) Addr() string { + return "" +} + func (t *testPeer) Id() string { return t.id } diff --git a/net/rpc/rpctest/pool.go b/net/rpc/rpctest/pool.go index 630cbb6a..3a9935df 100644 --- a/net/rpc/rpctest/pool.go +++ b/net/rpc/rpctest/pool.go @@ -103,6 +103,10 @@ type testPeer struct { drpc.Conn } +func (t testPeer) Addr() string { + return "" +} + func (t testPeer) TryClose(objectTTL time.Duration) (res bool, err error) { return true, t.Close() } diff --git a/net/rpc/server/baseserver.go b/net/rpc/server/baseserver.go index 85d23bf3..91fca952 100644 --- a/net/rpc/server/baseserver.go +++ b/net/rpc/server/baseserver.go @@ -2,6 +2,7 @@ package server import ( "context" + "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/secureservice" "github.com/libp2p/go-libp2p/core/sec" "github.com/zeebo/errs" @@ -98,8 +99,11 @@ func (s *BaseDrpcServer) serveConn(conn net.Conn) { l.Info("handshake error", zap.Error(err)) return } + if sc, ok := conn.(sec.SecureConn); ok { + ctx = peer.CtxWithPeerId(ctx, sc.RemotePeer().String()) + } } - + ctx = peer.CtxWithPeerAddr(ctx, conn.RemoteAddr().String()) l.Debug("connection opened") if err := s.drpcServer.ServeOne(ctx, conn); err != nil { if errs.Is(err, context.Canceled) || errs.Is(err, io.EOF) { diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 065f322e..f2e092c4 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -10,6 +10,7 @@ import ( type stream struct { peerId string + peerCtx context.Context stream drpc.Stream pool *streamPool streamId uint32 @@ -36,7 +37,7 @@ func (sr *stream) readLoop() error { sr.l.Info("msg receive error", zap.Error(err)) return err } - ctx := streamCtx(context.Background(), sr.streamId, sr.peerId) + ctx := streamCtx(sr.peerCtx, sr.streamId, sr.peerId) ctx = logger.CtxWithFields(ctx, zap.String("peerId", sr.peerId)) if err := sr.pool.handler.HandleMessage(ctx, sr.peerId, msg); err != nil { sr.l.Info("msg handle error", zap.Error(err)) diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 84108371..5295cea4 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -27,9 +27,9 @@ type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error) // StreamPool keeps and read streams type StreamPool interface { // AddStream adds new outgoing stream into the pool - AddStream(peerId string, stream drpc.Stream, tags ...string) + AddStream(stream drpc.Stream, tags ...string) (err error) // ReadStream adds new incoming stream and synchronously read it - ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error) + ReadStream(stream drpc.Stream, tags ...string) (err error) // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error) // SendById sends a message to given peerIds. Works only if stream exists @@ -63,16 +63,23 @@ type openingProcess struct { err error } -func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { - st := s.addStream(peerId, drpcStream, tags...) +func (s *streamPool) ReadStream(drpcStream drpc.Stream, tags ...string) error { + st, err := s.addStream(drpcStream, tags...) + if err != nil { + return err + } return st.readLoop() } -func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) { - st := s.addStream(peerId, drpcStream, tags...) +func (s *streamPool) AddStream(drpcStream drpc.Stream, tags ...string) error { + st, err := s.addStream(drpcStream, tags...) + if err != nil { + return err + } go func() { _ = st.readLoop() }() + return nil } func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) { @@ -86,13 +93,19 @@ func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) { return } -func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream { +func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream, error) { + ctx := drpcStream.Context() + peerId, err := peer.CtxPeerId(ctx) + if err != nil { + return nil, err + } s.mu.Lock() defer s.mu.Unlock() s.lastStreamId++ streamId := s.lastStreamId st := &stream{ peerId: peerId, + peerCtx: ctx, stream: drpcStream, pool: s, streamId: streamId, @@ -104,7 +117,7 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } - return st + return st, nil } func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) { @@ -241,7 +254,10 @@ func (s *streamPool) openStream(ctx context.Context, p peer.Peer) *openingProces op.err = err return } - s.AddStream(p.Id(), st, tags...) + if err = s.AddStream(st, tags...); err != nil { + op.err = nil + return + } }() return op } diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index 2ab15d26..57b2698d 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -21,6 +21,7 @@ var ctx = context.Background() func newClientStream(t *testing.T, fx *fixture, peerId string) (st testservice.DRPCTest_TestStreamClient, p peer.Peer) { p, err := fx.tp.Dial(ctx, peerId) require.NoError(t, err) + ctx = peer.CtxWithPeerId(ctx, peerId) s, err := testservice.NewDRPCTestClient(p).TestStream(ctx) require.NoError(t, err) return s, p @@ -33,9 +34,9 @@ func TestStreamPool_AddStream(t *testing.T) { defer fx.Finish(t) s1, _ := newClientStream(t, fx, "p1") - fx.AddStream("p1", s1, "space1", "common") + require.NoError(t, fx.AddStream(s1, "space1", "common")) s2, _ := newClientStream(t, fx, "p2") - fx.AddStream("p2", s2, "space2", "common") + require.NoError(t, fx.AddStream(s2, "space2", "common")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1")) require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2")) @@ -64,7 +65,7 @@ func TestStreamPool_AddStream(t *testing.T) { s1, p1 := newClientStream(t, fx, "p1") defer s1.Close() - fx.AddStream("p1", s1, "space1", "common") + require.NoError(t, fx.AddStream(s1, "space1", "common")) require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) { return []peer.Peer{p1}, nil @@ -159,7 +160,7 @@ func TestStreamPool_SendById(t *testing.T) { s1, _ := newClientStream(t, fx, "p1") defer s1.Close() - fx.AddStream("p1", s1, "space1", "common") + require.NoError(t, fx.AddStream(s1, "space1", "common")) require.NoError(t, fx.SendById(ctx, &testservice.StreamMessage{ReqData: "test"}, "p1")) var msg *testservice.StreamMessage @@ -177,11 +178,11 @@ func TestStreamPool_Tags(t *testing.T) { s1, _ := newClientStream(t, fx, "p1") defer s1.Close() - fx.AddStream("p1", s1, "t1") + require.NoError(t, fx.AddStream(s1, "t1")) s2, _ := newClientStream(t, fx, "p2") defer s1.Close() - fx.AddStream("p2", s2, "t2") + require.NoError(t, fx.AddStream(s2, "t2")) err := fx.AddTagsCtx(streamCtx(ctx, 1, "p1"), "t3", "t3") require.NoError(t, err) From 9855eb4484a15f4d1994e40845af0e96053c2b74 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 4 May 2023 17:39:23 +0200 Subject: [PATCH 04/10] request log --- commonspace/space.go | 5 ++++- metric/log.go | 23 +++++++++++++++++------ metric/log_test.go | 11 +++++++++++ metric/metric.go | 2 +- 4 files changed, 33 insertions(+), 8 deletions(-) diff --git a/commonspace/space.go b/commonspace/space.go index 6d3f5854..57792090 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -62,6 +62,7 @@ type HandleMessage struct { Deadline time.Time SenderId string Message *spacesyncproto.ObjectSyncMessage + PeerCtx context.Context } func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field { @@ -409,7 +410,9 @@ func (s *space) handleMessage(msg HandleMessage) { ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) defer func() { - s.metric.RequestLog(ctx, msg.LogFields(zap.Error(err))...) + s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields( + zap.Error(err), + )...) }() if !msg.Deadline.IsZero() { diff --git a/metric/log.go b/metric/log.go index 12a18424..23e76591 100644 --- a/metric/log.go +++ b/metric/log.go @@ -1,6 +1,7 @@ package metric import ( + "github.com/anytypeio/any-sync/net/peer" "go.uber.org/zap" "golang.org/x/net/context" "time" @@ -11,11 +12,11 @@ func Method(val string) zap.Field { } func QueueDur(val time.Duration) zap.Field { - return zap.Int64("queueMs", val.Milliseconds()) + return zap.Float64("queueDur", val.Seconds()) } func TotalDur(val time.Duration) zap.Field { - return zap.Int64("totalMs", val.Milliseconds()) + return zap.Float64("totalDur", val.Seconds()) } func SpaceId(val string) zap.Field { @@ -26,14 +27,24 @@ func ObjectId(val string) zap.Field { return zap.String("objectId", val) } +func PeerId(val string) zap.Field { + return zap.String("peerId", val) +} + func Identity(val string) zap.Field { return zap.String("identity", val) } -func IP(val string) zap.Field { - return zap.String("ip", val) +func Addr(val string) zap.Field { + return zap.String("addr", val) } -func (m *metric) RequestLog(ctx context.Context, fields ...zap.Field) { - m.rpcLog.InfoCtx(ctx, "", fields...) +func (m *metric) RequestLog(ctx context.Context, rpc string, fields ...zap.Field) { + peerId, _ := peer.CtxPeerId(ctx) + ak, _ := peer.CtxPubKey(ctx) + var acc string + if ak != nil { + acc = ak.Account() + } + m.rpcLog.Info("", append(fields, Addr(peer.CtxPeerAddr(ctx)), PeerId(peerId), Identity(acc), Method(rpc))...) } diff --git a/metric/log_test.go b/metric/log_test.go index 0bad30a0..50835795 100644 --- a/metric/log_test.go +++ b/metric/log_test.go @@ -1 +1,12 @@ package metric + +import ( + "context" + "github.com/anytypeio/any-sync/app/logger" + "testing" +) + +func TestLog(t *testing.T) { + m := &metric{rpcLog: logger.NewNamed("rpcLog")} + m.RequestLog(context.Background(), "rpc") +} diff --git a/metric/metric.go b/metric/metric.go index 32f29ae5..d5319982 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -24,7 +24,7 @@ func New() Metric { type Metric interface { Registry() *prometheus.Registry WrapDRPCHandler(h drpc.Handler) drpc.Handler - RequestLog(ctx context.Context, fields ...zap.Field) + RequestLog(ctx context.Context, rpc string, fields ...zap.Field) app.ComponentRunnable } From 847b2d40fd899039d261a7237f6cf350654de472 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 4 May 2023 17:49:24 +0200 Subject: [PATCH 05/10] file fields --- metric/log.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/metric/log.go b/metric/log.go index 23e76591..65ab4790 100644 --- a/metric/log.go +++ b/metric/log.go @@ -39,6 +39,18 @@ func Addr(val string) zap.Field { return zap.String("addr", val) } +func FileId(fileId string) zap.Field { + return zap.String("fileId", fileId) +} + +func Cid(cid string) zap.Field { + return zap.String("cid", cid) +} + +func Size(size int) zap.Field { + return zap.Int("size", size) +} + func (m *metric) RequestLog(ctx context.Context, rpc string, fields ...zap.Field) { peerId, _ := peer.CtxPeerId(ctx) ak, _ := peer.CtxPubKey(ctx) From debbf448db60a314e95d27b40ef974f454207b40 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 4 May 2023 19:04:41 +0200 Subject: [PATCH 06/10] fix import --- metric/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metric/log.go b/metric/log.go index 65ab4790..db9f56c8 100644 --- a/metric/log.go +++ b/metric/log.go @@ -1,9 +1,9 @@ package metric import ( + "context" "github.com/anytypeio/any-sync/net/peer" "go.uber.org/zap" - "golang.org/x/net/context" "time" ) From a12f3fb0dd5c942eda0fafb09a06fec3ddd06327 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 4 May 2023 19:30:46 +0200 Subject: [PATCH 07/10] allow nil metric --- metric/log.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/metric/log.go b/metric/log.go index db9f56c8..bf656aa4 100644 --- a/metric/log.go +++ b/metric/log.go @@ -52,6 +52,9 @@ func Size(size int) zap.Field { } func (m *metric) RequestLog(ctx context.Context, rpc string, fields ...zap.Field) { + if m == nil { + return + } peerId, _ := peer.CtxPeerId(ctx) ak, _ := peer.CtxPubKey(ctx) var acc string From fc4bdb6000ae45c3848906ecfab89801bc636faf Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 4 May 2023 19:35:07 +0200 Subject: [PATCH 08/10] check that metric is using --- commonspace/space.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/commonspace/space.go b/commonspace/space.go index 57792090..51435a10 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -410,6 +410,9 @@ func (s *space) handleMessage(msg HandleMessage) { ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) defer func() { + if s.metric == nil { + return + } s.metric.RequestLog(msg.PeerCtx, "space.streamOp", msg.LogFields( zap.Error(err), )...) From 36cb769379d9656a5498ab6422eca66385c117c1 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 5 May 2023 16:28:31 +0200 Subject: [PATCH 09/10] remove peer addr from log --- metric/log.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metric/log.go b/metric/log.go index bf656aa4..9cb8edbd 100644 --- a/metric/log.go +++ b/metric/log.go @@ -61,5 +61,5 @@ func (m *metric) RequestLog(ctx context.Context, rpc string, fields ...zap.Field if ak != nil { acc = ak.Account() } - m.rpcLog.Info("", append(fields, Addr(peer.CtxPeerAddr(ctx)), PeerId(peerId), Identity(acc), Method(rpc))...) + m.rpcLog.Info("", append(fields, PeerId(peerId), Identity(acc), Method(rpc))...) } From 0ca41dbd47ded0750e98cce58486850d2d03a257 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 5 May 2023 16:30:22 +0200 Subject: [PATCH 10/10] remove peer addr from log --- metric/log.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/metric/log.go b/metric/log.go index 9cb8edbd..7cc34e66 100644 --- a/metric/log.go +++ b/metric/log.go @@ -35,10 +35,6 @@ func Identity(val string) zap.Field { return zap.String("identity", val) } -func Addr(val string) zap.Field { - return zap.String("addr", val) -} - func FileId(fileId string) zap.Field { return zap.String("fileId", fileId) }