peer and space last usage

This commit is contained in:
Sergey Cherepanov 2023-01-20 19:32:38 +03:00 committed by Mikhail Iudin
parent 5fb6ee5a7b
commit 2d2f24b3e3
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
4 changed files with 45 additions and 7 deletions

View File

@ -2,6 +2,7 @@ package objectsync
import ( import (
"context" "context"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap" "go.uber.org/zap"
@ -20,6 +21,7 @@ type StreamManager interface {
// MessagePool can be made generic to work with different streams // MessagePool can be made generic to work with different streams
type MessagePool interface { type MessagePool interface {
ocache.ObjectLastUsage
synchandler.SyncHandler synchandler.SyncHandler
StreamManager StreamManager
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
@ -38,6 +40,7 @@ type messagePool struct {
waiters map[string]responseWaiter waiters map[string]responseWaiter
waitersMx sync.Mutex waitersMx sync.Mutex
counter atomic.Uint64 counter atomic.Uint64
lastUsage atomic.Int64
} }
func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool {
@ -50,6 +53,7 @@ func newMessagePool(streamManager StreamManager, messageHandler MessageHandler)
} }
func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
s.updateLastUsage()
var cancel context.CancelFunc var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*10) ctx, cancel = context.WithTimeout(ctx, time.Second*10)
defer cancel() defer cancel()
@ -81,7 +85,22 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn
return return
} }
func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage()
return s.StreamManager.SendPeer(ctx, peerId, msg)
}
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage()
return s.StreamManager.SendResponsible(ctx, msg)
}
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage()
return s.StreamManager.Broadcast(ctx, msg)
}
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage()
if msg.ReplyId != "" { if msg.ReplyId != "" {
log.Info("mpool receive reply", zap.String("replyId", msg.ReplyId)) log.Info("mpool receive reply", zap.String("replyId", msg.ReplyId))
// we got reply, send it to waiter // we got reply, send it to waiter
@ -93,6 +112,14 @@ func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *s
return s.messageHandler(ctx, senderId, msg) return s.messageHandler(ctx, senderId, msg)
} }
func (s *messagePool) LastUsage() time.Time {
return time.Unix(s.lastUsage.Load(), 0)
}
func (s *messagePool) updateLastUsage() {
s.lastUsage.Store(time.Now().Unix())
}
func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool { func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool {
s.waitersMx.Lock() s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId] waiter, exists := s.waiters[msg.ReplyId]

View File

@ -73,8 +73,7 @@ func (s *objectSync) Close() (err error) {
} }
func (s *objectSync) LastUsage() time.Time { func (s *objectSync) LastUsage() time.Time {
// TODO: [che] return s.messagePool.LastUsage()
return time.Now()
} }
func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {

View File

@ -54,6 +54,20 @@ func (p *peer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (dr
return p.Conn.NewStream(ctx, rpc, enc) return p.Conn.NewStream(ctx, rpc, enc)
} }
func (p *peer) Read(b []byte) (n int, err error) {
if n, err = p.sc.Read(b); err != nil {
p.UpdateLastUsage()
}
return
}
func (p *peer) Write(b []byte) (n int, err error) {
if n, err = p.sc.Write(b); err != nil {
p.UpdateLastUsage()
}
return
}
func (p *peer) UpdateLastUsage() { func (p *peer) UpdateLastUsage() {
atomic.StoreInt64(&p.lastUsage, time.Now().Unix()) atomic.StoreInt64(&p.lastUsage, time.Now().Unix())
} }

View File

@ -279,11 +279,9 @@ func (s *streamPool) handleMessageLoop() {
if err != nil { if err != nil {
return return
} }
go func() {
if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil { if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil {
log.Warn("handle message error", zap.Error(err)) log.Warn("handle message error", zap.Error(err))
} }
}()
} }
} }