diff --git a/commonspace/space.go b/commonspace/space.go index cdb00e18..ecb81a80 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -21,6 +21,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/metric" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/crypto" @@ -55,10 +56,21 @@ type SpaceCreatePayload struct { } type HandleMessage struct { - Id uint64 - Deadline time.Time - SenderId string - Message *spacesyncproto.ObjectSyncMessage + Id uint64 + ReceiveTime time.Time + StartHandlingTime time.Time + Deadline time.Time + SenderId string + Message *spacesyncproto.ObjectSyncMessage +} + +func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field { + return append(fields, + metric.SpaceId(m.Message.SpaceId), + metric.ObjectId(m.Message.ObjectId), + metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)), + metric.TotalDur(time.Since(m.ReceiveTime)), + ) } type SpaceDerivePayload struct { @@ -124,6 +136,7 @@ type space struct { configuration nodeconf.NodeConf settingsObject settings.SettingsObject peerManager peermanager.PeerManager + metric metric.Metric handleQueue multiqueue.MultiQueue[HandleMessage] @@ -372,6 +385,7 @@ func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.R func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { threadId := hm.Message.ObjectId + hm.ReceiveTime = time.Now() if hm.Message.ReplyId != "" { threadId += hm.Message.ReplyId defer func() { @@ -388,12 +402,19 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) } func (s *space) handleMessage(msg HandleMessage) { + var err error + msg.StartHandlingTime = time.Now() ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId) ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId)) + defer func() { + s.metric.RequestLog(ctx, msg.LogFields(zap.Error(err))...) + }() + if !msg.Deadline.IsZero() { now := time.Now() if now.After(msg.Deadline) { log.InfoCtx(ctx, "skip message: deadline exceed") + err = context.DeadlineExceeded return } var cancel context.CancelFunc @@ -401,7 +422,7 @@ func (s *space) handleMessage(msg HandleMessage) { defer cancel() } - if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { + if err = s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil { if msg.Message.ObjectId != "" { // cleanup thread on error _ = s.handleQueue.CloseThread(msg.Message.ObjectId) diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 0d2168c7..ab938161 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -15,6 +15,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/metric" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/rpc/rpcerr" @@ -51,6 +52,7 @@ type spaceService struct { credentialProvider credentialprovider.CredentialProvider treeGetter treegetter.TreeGetter pool pool.Pool + metric metric.Metric } func (s *spaceService) Init(a *app.App) (err error) { @@ -67,6 +69,7 @@ func (s *spaceService) Init(a *app.App) (err error) { s.credentialProvider = credentialprovider.NewNoOp() } s.pool = a.MustComponent(pool.CName).(pool.Pool) + s.metric, _ = a.Component(metric.CName).(metric.Metric) return nil } @@ -173,6 +176,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { treesUsed: &atomic.Int32{}, isClosed: spaceIsClosed, isDeleted: spaceIsDeleted, + metric: s.metric, } return sp, nil } diff --git a/commonspace/spacestorage/spacestorage_test.go b/commonspace/spacestorage/spacestorage_test.go index 8243291e..3e7b9959 100644 --- a/commonspace/spacestorage/spacestorage_test.go +++ b/commonspace/spacestorage/spacestorage_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" rand2 "golang.org/x/exp/rand" "strconv" + "strings" "testing" "time" ) diff --git a/metric/log.go b/metric/log.go new file mode 100644 index 00000000..12a18424 --- /dev/null +++ b/metric/log.go @@ -0,0 +1,39 @@ +package metric + +import ( + "go.uber.org/zap" + "golang.org/x/net/context" + "time" +) + +func Method(val string) zap.Field { + return zap.String("rpc", val) +} + +func QueueDur(val time.Duration) zap.Field { + return zap.Int64("queueMs", val.Milliseconds()) +} + +func TotalDur(val time.Duration) zap.Field { + return zap.Int64("totalMs", val.Milliseconds()) +} + +func SpaceId(val string) zap.Field { + return zap.String("spaceId", val) +} + +func ObjectId(val string) zap.Field { + return zap.String("objectId", val) +} + +func Identity(val string) zap.Field { + return zap.String("identity", val) +} + +func IP(val string) zap.Field { + return zap.String("ip", val) +} + +func (m *metric) RequestLog(ctx context.Context, fields ...zap.Field) { + m.rpcLog.InfoCtx(ctx, "", fields...) +} diff --git a/metric/metric.go b/metric/metric.go index d64e81da..32f29ae5 100644 --- a/metric/metric.go +++ b/metric/metric.go @@ -24,6 +24,7 @@ func New() Metric { type Metric interface { Registry() *prometheus.Registry WrapDRPCHandler(h drpc.Handler) drpc.Handler + RequestLog(ctx context.Context, fields ...zap.Field) app.ComponentRunnable }