From af9d71d16ef446f0698c8017e463a0f6be993c23 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 5 Jun 2023 21:23:41 +0200 Subject: [PATCH] peer subConn gc --- net/peer/peer.go | 68 +++++++++++++++++++++++++++++++++++++------ net/peer/peer_test.go | 50 +++++++++++++++++++++++++++++++ 2 files changed, 109 insertions(+), 9 deletions(-) diff --git a/net/peer/peer.go b/net/peer/peer.go index ab0ae236..7f42260c 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -4,6 +4,7 @@ import ( "context" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/ocache" + "github.com/anyproto/any-sync/net/connutil" "github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto" "github.com/anyproto/any-sync/net/transport" @@ -25,7 +26,7 @@ type connCtrl interface { func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) { ctx := mc.Context() pr := &peer{ - active: map[drpc.Conn]struct{}{}, + active: map[*subConn]struct{}{}, MultiConn: mc, ctrl: ctrl, } @@ -51,14 +52,19 @@ type Peer interface { ocache.Object } +type subConn struct { + drpc.Conn + *connutil.LastUsageConn +} + type peer struct { id string ctrl connCtrl // drpc conn pool - inactive []drpc.Conn - active map[drpc.Conn]struct{} + inactive []*subConn + active map[*subConn]struct{} mu sync.Mutex @@ -91,10 +97,14 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) { func (p *peer) ReleaseDrpcConn(conn drpc.Conn) { p.mu.Lock() defer p.mu.Unlock() - if _, ok := p.active[conn]; ok { - delete(p.active, conn) + sc, ok := conn.(*subConn) + if !ok { + return } - p.inactive = append(p.inactive, conn) + if _, ok = p.active[sc]; ok { + delete(p.active, sc) + } + p.inactive = append(p.inactive, sc) return } @@ -107,7 +117,7 @@ func (p *peer) DoDrpc(ctx context.Context, do func(conn drpc.Conn) error) error return do(conn) } -func (p *peer) openDrpcConn(ctx context.Context) (dconn drpc.Conn, err error) { +func (p *peer) openDrpcConn(ctx context.Context) (dconn *subConn, err error) { conn, err := p.Open(ctx) if err != nil { return nil, err @@ -115,8 +125,11 @@ func (p *peer) openDrpcConn(ctx context.Context) (dconn drpc.Conn, err error) { if err = handshake.OutgoingProtoHandshake(ctx, conn, handshakeproto.ProtoType_DRPC); err != nil { return nil, err } - dconn = drpcconn.New(conn) - return + tconn := connutil.NewLastUsageConn(conn) + return &subConn{ + Conn: drpcconn.New(tconn), + LastUsageConn: tconn, + }, nil } func (p *peer) acceptLoop() { @@ -159,12 +172,49 @@ func (p *peer) serve(conn net.Conn) (err error) { } func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) { + p.gc(objectTTL) if time.Now().Sub(p.LastUsage()) < objectTTL { return false, nil } return true, p.Close() } +func (p *peer) gc(ttl time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + minLastUsage := time.Now().Add(-ttl) + var hasClosed bool + for i, in := range p.inactive { + select { + case <-in.Closed(): + p.inactive[i] = nil + hasClosed = true + default: + } + if in.LastUsage().Before(minLastUsage) { + _ = in.Close() + p.inactive[i] = nil + hasClosed = true + } + } + if hasClosed { + inactive := p.inactive + p.inactive = p.inactive[:0] + for _, in := range inactive { + if in != nil { + p.inactive = append(p.inactive, in) + } + } + } + for act := range p.active { + select { + case <-act.Closed(): + delete(p.active, act) + default: + } + } +} + func (p *peer) Close() (err error) { log.Debug("peer close", zap.String("peerId", p.id)) return p.MultiConn.Close() diff --git a/net/peer/peer_test.go b/net/peer/peer_test.go index 00a14252..ac1ff28b 100644 --- a/net/peer/peer_test.go +++ b/net/peer/peer_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" "io" "net" + _ "net/http/pprof" "testing" "time" ) @@ -20,6 +21,9 @@ func TestPeer_AcquireDrpcConn(t *testing.T) { fx := newFixture(t, "p1") defer fx.finish() in, out := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker) + }() defer out.Close() fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) dc, err := fx.AcquireDrpcConn(ctx) @@ -77,6 +81,52 @@ func TestPeer_TryClose(t *testing.T) { require.NoError(t, err) assert.True(t, res) }) + t.Run("gc", func(t *testing.T) { + fx := newFixture(t, "p1") + defer fx.finish() + now := time.Now() + fx.mc.EXPECT().LastUsage().Return(now.Add(time.Millisecond * 100)) + + // make one inactive + in, out := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker) + }() + defer out.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) + dc, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + + // make one active but closed + in2, out2 := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out2, defaultProtoChecker) + }() + defer out2.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in2, nil) + dc2, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + _ = dc2.Close() + + // make one inactive and closed + in3, out3 := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out3, defaultProtoChecker) + }() + defer out3.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in3, nil) + dc3, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + fx.ReleaseDrpcConn(dc3) + _ = dc3.Close() + fx.ReleaseDrpcConn(dc) + + time.Sleep(time.Millisecond * 100) + + res, err := fx.TryClose(time.Millisecond * 50) + require.NoError(t, err) + assert.False(t, res) + }) } type acceptedConn struct {