diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 0a715352..2364029f 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache" + "go.uber.org/zap" "sync" "sync/atomic" "time" @@ -97,7 +98,7 @@ func (s *streamPool) SendSync( delete(s.waiters, msg.ReplyId) s.waitersMx.Unlock() - log.With("replyId", msg.ReplyId).Error("time elapsed when waiting") + log.With(zap.String("replyId", msg.ReplyId)).Error("time elapsed when waiting") err = ErrSyncTimeout case reply = <-waiter.ch: if !delay.Stop() { @@ -124,8 +125,8 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn streams := getStreams() s.Unlock() - log.With("objectId", message.ObjectId). - Debugf("sending message to %d peers", len(streams)) + log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))). + Debug("sending message to peers") for _, s := range streams { err = s.Send(message) } @@ -172,8 +173,8 @@ Loop: func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { streams := s.getAllStreams() - log.With("objectId", message.ObjectId). - Debugf("broadcasting message to %d peers", len(streams)) + log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))). + Debug("broadcasting message to peers") for _, stream := range streams { if err = stream.Send(message); err != nil { // TODO: add logging @@ -224,7 +225,7 @@ func (s *streamPool) Close() (err error) { } func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { - log.With("peerId", peerId).Debug("reading stream from peer") + log.With(zap.String("replyId", peerId)).Debug("reading stream from peer") defer s.wg.Done() limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) for i := 0; i < maxSimultaneousOperationsPerStream; i++ { @@ -237,17 +238,17 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre s.messageHandler(stream.Context(), peerId, msg) return } - log.With("replyId", msg.ReplyId).Debug("getting message with reply id") + log.With(zap.String("replyId", msg.ReplyId)).Debug("getting message with reply id") s.waitersMx.Lock() waiter, exists := s.waiters[msg.ReplyId] if !exists { - log.With("replyId", msg.ReplyId).Debug("reply id not exists") + log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id not exists") s.waitersMx.Unlock() s.messageHandler(stream.Context(), peerId, msg) return } - log.With("replyId", msg.ReplyId).Debug("reply id exists") + log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id exists") delete(s.waiters, msg.ReplyId) s.waitersMx.Unlock() @@ -273,7 +274,7 @@ Loop: limiter <- struct{}{} }() } - log.With("peerId", peerId).Debug("stopped reading stream from peer") + log.With(zap.String("peerId", peerId)).Debug("stopped reading stream from peer") s.removePeer(peerId) return } diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 986fa5cc..3a68a109 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -13,7 +13,7 @@ import ( "time" ) -var log = logger.NewNamed("syncservice").Sugar() +var log = logger.NewNamed("commonspace.syncservice") type SyncService interface { ocache.ObjectLastUsage @@ -47,7 +47,7 @@ func NewSyncService( return syncService.HandleMessage(ctx, senderId, message) }) clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceClient) - syncLog := log.Desugar().With(zap.String("id", spaceId)) + syncLog := log.With(zap.String("id", spaceId)) syncCtx, cancel := context.WithCancel(context.Background()) checker := NewStreamChecker( spaceId,