diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 0389b508..6d109110 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -2,6 +2,7 @@ package objectsync import ( "context" + "github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "go.uber.org/zap" @@ -20,6 +21,7 @@ type StreamManager interface { // MessagePool can be made generic to work with different streams type MessagePool interface { + ocache.ObjectLastUsage synchandler.SyncHandler StreamManager SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) @@ -38,6 +40,7 @@ type messagePool struct { waiters map[string]responseWaiter waitersMx sync.Mutex counter atomic.Uint64 + lastUsage atomic.Int64 } func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { @@ -50,6 +53,7 @@ func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) } func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + s.updateLastUsage() var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, time.Second*10) defer cancel() @@ -81,7 +85,22 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn return } +func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + s.updateLastUsage() + return s.StreamManager.SendPeer(ctx, peerId, msg) +} + +func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { + s.updateLastUsage() + return s.StreamManager.SendResponsible(ctx, msg) +} +func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { + s.updateLastUsage() + return s.StreamManager.Broadcast(ctx, msg) +} + func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + s.updateLastUsage() if msg.ReplyId != "" { log.Info("mpool receive reply", zap.String("replyId", msg.ReplyId)) // we got reply, send it to waiter @@ -93,6 +112,14 @@ func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *s return s.messageHandler(ctx, senderId, msg) } +func (s *messagePool) LastUsage() time.Time { + return time.Unix(s.lastUsage.Load(), 0) +} + +func (s *messagePool) updateLastUsage() { + s.lastUsage.Store(time.Now().Unix()) +} + func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool { s.waitersMx.Lock() waiter, exists := s.waiters[msg.ReplyId] diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 5a97cd1b..e8ad33d3 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -73,8 +73,7 @@ func (s *objectSync) Close() (err error) { } func (s *objectSync) LastUsage() time.Time { - // TODO: [che] - return time.Now() + return s.messagePool.LastUsage() } func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { diff --git a/net/peer/peer.go b/net/peer/peer.go index 6056b0b9..4613a800 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -54,6 +54,20 @@ func (p *peer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (dr return p.Conn.NewStream(ctx, rpc, enc) } +func (p *peer) Read(b []byte) (n int, err error) { + if n, err = p.sc.Read(b); err != nil { + p.UpdateLastUsage() + } + return +} + +func (p *peer) Write(b []byte) (n int, err error) { + if n, err = p.sc.Write(b); err != nil { + p.UpdateLastUsage() + } + return +} + func (p *peer) UpdateLastUsage() { atomic.StoreInt64(&p.lastUsage, time.Now().Unix()) } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index f1aa486b..3f51a9f6 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -279,11 +279,9 @@ func (s *streamPool) handleMessageLoop() { if err != nil { return } - go func() { - if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil { - log.Warn("handle message error", zap.Error(err)) - } - }() + if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil { + log.Warn("handle message error", zap.Error(err)) + } } }