diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 6d109110..4e612b97 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -67,7 +67,7 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn s.waiters[msg.ReplyId] = waiter s.waitersMx.Unlock() - err = s.SendPeer(ctx, peerId, msg) + err = s.SendPeer(context.Background(), peerId, msg) if err != nil { return } @@ -87,15 +87,30 @@ func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyn func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() + select { + case <-ctx.Done(): + log.Warn("ctx.Done") + default: + } return s.StreamManager.SendPeer(ctx, peerId, msg) } func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() + select { + case <-ctx.Done(): + log.Warn("ctx.Done") + default: + } return s.StreamManager.SendResponsible(ctx, msg) } func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() + select { + case <-ctx.Done(): + log.Warn("ctx.Done") + default: + } return s.StreamManager.Broadcast(ctx, msg) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 52e44a4a..3799ae24 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -97,7 +97,7 @@ func (s *spaceService) DeriveSpace(ctx context.Context, payload SpaceDerivePaylo } func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { - st, err := s.storageProvider.SpaceStorage(id) + st, err := s.storageProvider.WaitSpaceStorage(ctx, id) if err != nil { if err != spacestorage.ErrSpaceStorageMissing { return nil, err diff --git a/commonspace/spacestorage/spacestorage.go b/commonspace/spacestorage/spacestorage.go index 348a408b..0883af9a 100644 --- a/commonspace/spacestorage/spacestorage.go +++ b/commonspace/spacestorage/spacestorage.go @@ -2,6 +2,7 @@ package spacestorage import ( + "context" "errors" "github.com/anytypeio/any-sync/app" "github.com/anytypeio/any-sync/commonspace/object/acl/aclrecordproto" @@ -51,7 +52,7 @@ type SpaceStorageCreatePayload struct { type SpaceStorageProvider interface { app.Component - SpaceStorage(id string) (SpaceStorage, error) + WaitSpaceStorage(ctx context.Context, id string) (SpaceStorage, error) SpaceExists(id string) bool CreateSpaceStorage(payload SpaceStorageCreatePayload) (SpaceStorage, error) } diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index a5dfd0a0..82fff97a 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -65,6 +65,7 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) { func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) { d.mu.RLock() defer d.mu.RUnlock() + addrs, ok := d.peerAddrs[peerId] if !ok || len(addrs) == 0 { return nil, ErrArrdsNotFound @@ -73,6 +74,7 @@ func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err erro conn drpc.Conn sc sec.SecureConn ) + log.Warn("dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) for _, addr := range addrs { conn, sc, err = d.handshake(ctx, addr) if err != nil { diff --git a/net/peer/peer.go b/net/peer/peer.go index 4613a800..7e55d5e0 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -2,12 +2,16 @@ package peer import ( "context" + "github.com/anytypeio/any-sync/app/logger" "github.com/libp2p/go-libp2p/core/sec" + "go.uber.org/zap" "storj.io/drpc" "sync/atomic" "time" ) +var log = logger.NewNamed("peer") + func NewPeer(sc sec.SecureConn, conn drpc.Conn) Peer { return &peer{ id: sc.RemotePeer().String(), @@ -71,3 +75,8 @@ func (p *peer) Write(b []byte) (n int, err error) { func (p *peer) UpdateLastUsage() { atomic.StoreInt64(&p.lastUsage, time.Now().Unix()) } + +func (p *peer) Close() (err error) { + log.Warn("peer close", zap.String("peerId", p.id)) + return p.Conn.Close() +} diff --git a/net/pool/poolservice.go b/net/pool/poolservice.go index f00e5ea3..c28fcf6b 100644 --- a/net/pool/poolservice.go +++ b/net/pool/poolservice.go @@ -28,6 +28,7 @@ type Service interface { } type poolService struct { + // default pool *pool dialer dialer.Dialer metricReg *prometheus.Registry diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 86d957f5..45416e34 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -27,6 +27,7 @@ func (sr *stream) readLoop() error { defer func() { sr.streamClose() }() + sr.l.Debug("stream read started") for { msg := sr.pool.handler.NewReadMessage() if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil { diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 3f51a9f6..9e6652f6 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -142,7 +142,7 @@ func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) } for _, st := range streams { if err = st.write(msg); err != nil { - st.l.Info("sendOne write error", zap.Error(err)) + st.l.Info("sendOne write error", zap.Error(err), zap.Int("streams", len(streams))) // continue with next stream continue } else { @@ -279,7 +279,7 @@ func (s *streamPool) handleMessageLoop() { if err != nil { return } - if err = s.handler.HandleMessage(hm.ctx, hm.peerId, hm.msg); err != nil { + if err = s.handler.HandleMessage(context.Background(), hm.peerId, hm.msg); err != nil { log.Warn("handle message error", zap.Error(err)) } } diff --git a/net/timeoutconn/conn.go b/net/timeoutconn/conn.go index 77f53069..5459d461 100644 --- a/net/timeoutconn/conn.go +++ b/net/timeoutconn/conn.go @@ -2,11 +2,15 @@ package timeoutconn import ( "errors" + "github.com/anytypeio/any-sync/app/logger" + "go.uber.org/zap" "net" "os" "time" ) +var log = logger.NewNamed("net.timeoutconn") + type Conn struct { net.Conn timeout time.Duration @@ -17,22 +21,32 @@ func NewConn(conn net.Conn, timeout time.Duration) *Conn { } func (c *Conn) Write(p []byte) (n int, err error) { + return c.Conn.Write(p) for { if c.timeout != 0 { - c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)) + if e := c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)); e != nil { + log.Warn("can't set write deadline", zap.String("remoteAddr", c.RemoteAddr().String())) + } + } nn, err := c.Conn.Write(p[n:]) n += nn if n < len(p) && nn > 0 && errors.Is(err, os.ErrDeadlineExceeded) { // Keep extending the deadline so long as we're making progress. + log.Debug("keep extending the deadline so long as we're making progress", zap.String("remoteAddr", c.RemoteAddr().String())) continue } if c.timeout != 0 { - c.Conn.SetWriteDeadline(time.Time{}) + if e := c.Conn.SetWriteDeadline(time.Time{}); e != nil { + log.Warn("can't set write deadline", zap.String("remoteAddr", c.RemoteAddr().String())) + } } if err != nil { // if the connection is timed out and we should close it - c.Conn.Close() + if e := c.Conn.Close(); e != nil { + log.Warn("connection close error", zap.String("remoteAddr", c.RemoteAddr().String())) + } + log.Debug("connection timed out", zap.String("remoteAddr", c.RemoteAddr().String())) } return n, err }