From e97e6b68c6c0f80c416a75da18bad2c1ec3ba907 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 9 Jun 2023 19:14:23 +0200 Subject: [PATCH 1/2] peer: gc active sub conn --- net/peer/peer.go | 7 +++++++ net/peer/peer_test.go | 13 +++++++++++++ 2 files changed, 20 insertions(+) diff --git a/net/peer/peer.go b/net/peer/peer.go index bd0d9b34..2a2c50b0 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -222,8 +222,15 @@ func (p *peer) gc(ttl time.Duration) { select { case <-act.Closed(): delete(p.active, act) + continue default: } + if act.LastUsage().Before(minLastUsage) { + log.Warn("close active connection because no activity", zap.String("peerId", p.id), zap.String("addr", p.Addr())) + _ = act.Close() + delete(p.active, act) + continue + } } } diff --git a/net/peer/peer_test.go b/net/peer/peer_test.go index c0046923..67d0f65d 100644 --- a/net/peer/peer_test.go +++ b/net/peer/peer_test.go @@ -105,6 +105,7 @@ func TestPeer_TryClose(t *testing.T) { }() defer out2.Close() fx.mc.EXPECT().Open(gomock.Any()).Return(in2, nil) + fx.mc.EXPECT().Addr().Return("") dc2, err := fx.AcquireDrpcConn(ctx) require.NoError(t, err) _ = dc2.Close() @@ -118,6 +119,18 @@ func TestPeer_TryClose(t *testing.T) { fx.mc.EXPECT().Open(gomock.Any()).Return(in3, nil) dc3, err := fx.AcquireDrpcConn(ctx) require.NoError(t, err) + + // make another active, should be removed by ttl + in4, out4 := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out4, defaultProtoChecker) + }() + defer out4.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in4, nil) + dc4, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + defer dc4.Close() + fx.ReleaseDrpcConn(dc3) _ = dc3.Close() fx.ReleaseDrpcConn(dc) From 89afc03218203d0e41c85e9f6f90c2c6c938e851 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 9 Jun 2023 19:28:09 +0200 Subject: [PATCH 2/2] fix HandshakeIncompatibleVersion test --- net/secureservice/secureservice.go | 4 +++- net/secureservice/secureservice_test.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index 4c62bcdb..4487e9bc 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -53,7 +53,9 @@ type secureService struct { } func (s *secureService) Init(a *app.App) (err error) { - s.protoVersion = ProtoVersion + if s.protoVersion == 0 { + s.protoVersion = ProtoVersion + } account := a.MustComponent(commonaccount.CName).(commonaccount.Service) peerKey, err := account.Account().PeerKey.Raw() if err != nil { diff --git a/net/secureservice/secureservice_test.go b/net/secureservice/secureservice_test.go index 6aee985d..86735f66 100644 --- a/net/secureservice/secureservice_test.go +++ b/net/secureservice/secureservice_test.go @@ -57,7 +57,7 @@ func TestHandshake(t *testing.T) { func TestHandshakeIncompatibleVersion(t *testing.T) { nc := testnodeconf.GenNodeConfig(2) - fxS := newFixture(t, nc, nc.GetAccountService(0), 0) + fxS := newFixture(t, nc, nc.GetAccountService(0), 1) defer fxS.Finish(t) sc, cc := net.Pipe() @@ -72,7 +72,7 @@ func TestHandshakeIncompatibleVersion(t *testing.T) { ar.ctx, ar.err = fxS.SecureInbound(ctx, sc) resCh <- ar }() - fxC := newFixture(t, nc, nc.GetAccountService(1), 1) + fxC := newFixture(t, nc, nc.GetAccountService(1), 2) defer fxC.Finish(t) _, err := fxC.SecureOutbound(ctx, cc) require.Equal(t, handshake.ErrIncompatibleVersion, err)