wip: drpc access log
This commit is contained in:
parent
474b81c8b6
commit
6cbff3a930
@ -21,6 +21,7 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
||||
"github.com/anytypeio/any-sync/metric"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/nodeconf"
|
||||
"github.com/anytypeio/any-sync/util/crypto"
|
||||
@ -55,10 +56,21 @@ type SpaceCreatePayload struct {
|
||||
}
|
||||
|
||||
type HandleMessage struct {
|
||||
Id uint64
|
||||
Deadline time.Time
|
||||
SenderId string
|
||||
Message *spacesyncproto.ObjectSyncMessage
|
||||
Id uint64
|
||||
ReceiveTime time.Time
|
||||
StartHandlingTime time.Time
|
||||
Deadline time.Time
|
||||
SenderId string
|
||||
Message *spacesyncproto.ObjectSyncMessage
|
||||
}
|
||||
|
||||
func (m HandleMessage) LogFields(fields ...zap.Field) []zap.Field {
|
||||
return append(fields,
|
||||
metric.SpaceId(m.Message.SpaceId),
|
||||
metric.ObjectId(m.Message.ObjectId),
|
||||
metric.QueueDur(m.StartHandlingTime.Sub(m.ReceiveTime)),
|
||||
metric.TotalDur(time.Since(m.ReceiveTime)),
|
||||
)
|
||||
}
|
||||
|
||||
type SpaceDerivePayload struct {
|
||||
@ -124,6 +136,7 @@ type space struct {
|
||||
configuration nodeconf.NodeConf
|
||||
settingsObject settings.SettingsObject
|
||||
peerManager peermanager.PeerManager
|
||||
metric metric.Metric
|
||||
|
||||
handleQueue multiqueue.MultiQueue[HandleMessage]
|
||||
|
||||
@ -372,6 +385,7 @@ func (s *space) DeleteSpace(ctx context.Context, deleteChange *treechangeproto.R
|
||||
|
||||
func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
|
||||
threadId := hm.Message.ObjectId
|
||||
hm.ReceiveTime = time.Now()
|
||||
if hm.Message.ReplyId != "" {
|
||||
threadId += hm.Message.ReplyId
|
||||
defer func() {
|
||||
@ -388,12 +402,19 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
|
||||
}
|
||||
|
||||
func (s *space) handleMessage(msg HandleMessage) {
|
||||
var err error
|
||||
msg.StartHandlingTime = time.Now()
|
||||
ctx := peer.CtxWithPeerId(context.Background(), msg.SenderId)
|
||||
ctx = logger.CtxWithFields(ctx, zap.Uint64("msgId", msg.Id), zap.String("senderId", msg.SenderId))
|
||||
defer func() {
|
||||
s.metric.RequestLog(ctx, msg.LogFields(zap.Error(err))...)
|
||||
}()
|
||||
|
||||
if !msg.Deadline.IsZero() {
|
||||
now := time.Now()
|
||||
if now.After(msg.Deadline) {
|
||||
log.InfoCtx(ctx, "skip message: deadline exceed")
|
||||
err = context.DeadlineExceeded
|
||||
return
|
||||
}
|
||||
var cancel context.CancelFunc
|
||||
@ -401,7 +422,7 @@ func (s *space) handleMessage(msg HandleMessage) {
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
if err := s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
|
||||
if err = s.objectSync.HandleMessage(ctx, msg.SenderId, msg.Message); err != nil {
|
||||
if msg.Message.ObjectId != "" {
|
||||
// cleanup thread on error
|
||||
_ = s.handleQueue.CloseThread(msg.Message.ObjectId)
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
||||
"github.com/anytypeio/any-sync/metric"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/pool"
|
||||
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
|
||||
@ -51,6 +52,7 @@ type spaceService struct {
|
||||
credentialProvider credentialprovider.CredentialProvider
|
||||
treeGetter treegetter.TreeGetter
|
||||
pool pool.Pool
|
||||
metric metric.Metric
|
||||
}
|
||||
|
||||
func (s *spaceService) Init(a *app.App) (err error) {
|
||||
@ -67,6 +69,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
|
||||
s.credentialProvider = credentialprovider.NewNoOp()
|
||||
}
|
||||
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||
s.metric, _ = a.Component(metric.CName).(metric.Metric)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -173,6 +176,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
treesUsed: &atomic.Int32{},
|
||||
isClosed: spaceIsClosed,
|
||||
isDeleted: spaceIsDeleted,
|
||||
metric: s.metric,
|
||||
}
|
||||
return sp, nil
|
||||
}
|
||||
|
||||
@ -14,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
rand2 "golang.org/x/exp/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
39
metric/log.go
Normal file
39
metric/log.go
Normal file
@ -0,0 +1,39 @@
|
||||
package metric
|
||||
|
||||
import (
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/net/context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Method(val string) zap.Field {
|
||||
return zap.String("rpc", val)
|
||||
}
|
||||
|
||||
func QueueDur(val time.Duration) zap.Field {
|
||||
return zap.Int64("queueMs", val.Milliseconds())
|
||||
}
|
||||
|
||||
func TotalDur(val time.Duration) zap.Field {
|
||||
return zap.Int64("totalMs", val.Milliseconds())
|
||||
}
|
||||
|
||||
func SpaceId(val string) zap.Field {
|
||||
return zap.String("spaceId", val)
|
||||
}
|
||||
|
||||
func ObjectId(val string) zap.Field {
|
||||
return zap.String("objectId", val)
|
||||
}
|
||||
|
||||
func Identity(val string) zap.Field {
|
||||
return zap.String("identity", val)
|
||||
}
|
||||
|
||||
func IP(val string) zap.Field {
|
||||
return zap.String("ip", val)
|
||||
}
|
||||
|
||||
func (m *metric) RequestLog(ctx context.Context, fields ...zap.Field) {
|
||||
m.rpcLog.InfoCtx(ctx, "", fields...)
|
||||
}
|
||||
@ -24,6 +24,7 @@ func New() Metric {
|
||||
type Metric interface {
|
||||
Registry() *prometheus.Registry
|
||||
WrapDRPCHandler(h drpc.Handler) drpc.Handler
|
||||
RequestLog(ctx context.Context, fields ...zap.Field)
|
||||
app.ComponentRunnable
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user