Merge pull request #16 from anyproto/peer-active-subconn-gc

peer: gc active sub conn
This commit is contained in:
Sergey Cherepanov 2023-06-09 19:38:36 +02:00 committed by GitHub
commit fb1df54941
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 25 additions and 3 deletions

View File

@ -222,8 +222,15 @@ func (p *peer) gc(ttl time.Duration) {
select { select {
case <-act.Closed(): case <-act.Closed():
delete(p.active, act) delete(p.active, act)
continue
default: default:
} }
if act.LastUsage().Before(minLastUsage) {
log.Warn("close active connection because no activity", zap.String("peerId", p.id), zap.String("addr", p.Addr()))
_ = act.Close()
delete(p.active, act)
continue
}
} }
} }

View File

@ -105,6 +105,7 @@ func TestPeer_TryClose(t *testing.T) {
}() }()
defer out2.Close() defer out2.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in2, nil) fx.mc.EXPECT().Open(gomock.Any()).Return(in2, nil)
fx.mc.EXPECT().Addr().Return("")
dc2, err := fx.AcquireDrpcConn(ctx) dc2, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err) require.NoError(t, err)
_ = dc2.Close() _ = dc2.Close()
@ -118,6 +119,18 @@ func TestPeer_TryClose(t *testing.T) {
fx.mc.EXPECT().Open(gomock.Any()).Return(in3, nil) fx.mc.EXPECT().Open(gomock.Any()).Return(in3, nil)
dc3, err := fx.AcquireDrpcConn(ctx) dc3, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err) require.NoError(t, err)
// make another active, should be removed by ttl
in4, out4 := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out4, defaultProtoChecker)
}()
defer out4.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in4, nil)
dc4, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
defer dc4.Close()
fx.ReleaseDrpcConn(dc3) fx.ReleaseDrpcConn(dc3)
_ = dc3.Close() _ = dc3.Close()
fx.ReleaseDrpcConn(dc) fx.ReleaseDrpcConn(dc)

View File

@ -53,7 +53,9 @@ type secureService struct {
} }
func (s *secureService) Init(a *app.App) (err error) { func (s *secureService) Init(a *app.App) (err error) {
s.protoVersion = ProtoVersion if s.protoVersion == 0 {
s.protoVersion = ProtoVersion
}
account := a.MustComponent(commonaccount.CName).(commonaccount.Service) account := a.MustComponent(commonaccount.CName).(commonaccount.Service)
peerKey, err := account.Account().PeerKey.Raw() peerKey, err := account.Account().PeerKey.Raw()
if err != nil { if err != nil {

View File

@ -57,7 +57,7 @@ func TestHandshake(t *testing.T) {
func TestHandshakeIncompatibleVersion(t *testing.T) { func TestHandshakeIncompatibleVersion(t *testing.T) {
nc := testnodeconf.GenNodeConfig(2) nc := testnodeconf.GenNodeConfig(2)
fxS := newFixture(t, nc, nc.GetAccountService(0), 0) fxS := newFixture(t, nc, nc.GetAccountService(0), 1)
defer fxS.Finish(t) defer fxS.Finish(t)
sc, cc := net.Pipe() sc, cc := net.Pipe()
@ -72,7 +72,7 @@ func TestHandshakeIncompatibleVersion(t *testing.T) {
ar.ctx, ar.err = fxS.SecureInbound(ctx, sc) ar.ctx, ar.err = fxS.SecureInbound(ctx, sc)
resCh <- ar resCh <- ar
}() }()
fxC := newFixture(t, nc, nc.GetAccountService(1), 1) fxC := newFixture(t, nc, nc.GetAccountService(1), 2)
defer fxC.Finish(t) defer fxC.Finish(t)
_, err := fxC.SecureOutbound(ctx, cc) _, err := fxC.SecureOutbound(ctx, cc)
require.Equal(t, handshake.ErrIncompatibleVersion, err) require.Equal(t, handshake.ErrIncompatibleVersion, err)