peer subConn gc

This commit is contained in:
Sergey Cherepanov 2023-06-05 21:23:41 +02:00
parent 9d4945c733
commit af9d71d16e
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
2 changed files with 109 additions and 9 deletions

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/net/connutil"
"github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/anyproto/any-sync/net/secureservice/handshake"
"github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto" "github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto"
"github.com/anyproto/any-sync/net/transport" "github.com/anyproto/any-sync/net/transport"
@ -25,7 +26,7 @@ type connCtrl interface {
func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) { func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) {
ctx := mc.Context() ctx := mc.Context()
pr := &peer{ pr := &peer{
active: map[drpc.Conn]struct{}{}, active: map[*subConn]struct{}{},
MultiConn: mc, MultiConn: mc,
ctrl: ctrl, ctrl: ctrl,
} }
@ -51,14 +52,19 @@ type Peer interface {
ocache.Object ocache.Object
} }
type subConn struct {
drpc.Conn
*connutil.LastUsageConn
}
type peer struct { type peer struct {
id string id string
ctrl connCtrl ctrl connCtrl
// drpc conn pool // drpc conn pool
inactive []drpc.Conn inactive []*subConn
active map[drpc.Conn]struct{} active map[*subConn]struct{}
mu sync.Mutex mu sync.Mutex
@ -91,10 +97,14 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
func (p *peer) ReleaseDrpcConn(conn drpc.Conn) { func (p *peer) ReleaseDrpcConn(conn drpc.Conn) {
p.mu.Lock() p.mu.Lock()
defer p.mu.Unlock() defer p.mu.Unlock()
if _, ok := p.active[conn]; ok { sc, ok := conn.(*subConn)
delete(p.active, conn) if !ok {
return
} }
p.inactive = append(p.inactive, conn) if _, ok = p.active[sc]; ok {
delete(p.active, sc)
}
p.inactive = append(p.inactive, sc)
return return
} }
@ -107,7 +117,7 @@ func (p *peer) DoDrpc(ctx context.Context, do func(conn drpc.Conn) error) error
return do(conn) return do(conn)
} }
func (p *peer) openDrpcConn(ctx context.Context) (dconn drpc.Conn, err error) { func (p *peer) openDrpcConn(ctx context.Context) (dconn *subConn, err error) {
conn, err := p.Open(ctx) conn, err := p.Open(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
@ -115,8 +125,11 @@ func (p *peer) openDrpcConn(ctx context.Context) (dconn drpc.Conn, err error) {
if err = handshake.OutgoingProtoHandshake(ctx, conn, handshakeproto.ProtoType_DRPC); err != nil { if err = handshake.OutgoingProtoHandshake(ctx, conn, handshakeproto.ProtoType_DRPC); err != nil {
return nil, err return nil, err
} }
dconn = drpcconn.New(conn) tconn := connutil.NewLastUsageConn(conn)
return return &subConn{
Conn: drpcconn.New(tconn),
LastUsageConn: tconn,
}, nil
} }
func (p *peer) acceptLoop() { func (p *peer) acceptLoop() {
@ -159,12 +172,49 @@ func (p *peer) serve(conn net.Conn) (err error) {
} }
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) { func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
p.gc(objectTTL)
if time.Now().Sub(p.LastUsage()) < objectTTL { if time.Now().Sub(p.LastUsage()) < objectTTL {
return false, nil return false, nil
} }
return true, p.Close() return true, p.Close()
} }
func (p *peer) gc(ttl time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
minLastUsage := time.Now().Add(-ttl)
var hasClosed bool
for i, in := range p.inactive {
select {
case <-in.Closed():
p.inactive[i] = nil
hasClosed = true
default:
}
if in.LastUsage().Before(minLastUsage) {
_ = in.Close()
p.inactive[i] = nil
hasClosed = true
}
}
if hasClosed {
inactive := p.inactive
p.inactive = p.inactive[:0]
for _, in := range inactive {
if in != nil {
p.inactive = append(p.inactive, in)
}
}
}
for act := range p.active {
select {
case <-act.Closed():
delete(p.active, act)
default:
}
}
}
func (p *peer) Close() (err error) { func (p *peer) Close() (err error) {
log.Debug("peer close", zap.String("peerId", p.id)) log.Debug("peer close", zap.String("peerId", p.id))
return p.MultiConn.Close() return p.MultiConn.Close()

View File

@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"io" "io"
"net" "net"
_ "net/http/pprof"
"testing" "testing"
"time" "time"
) )
@ -20,6 +21,9 @@ func TestPeer_AcquireDrpcConn(t *testing.T) {
fx := newFixture(t, "p1") fx := newFixture(t, "p1")
defer fx.finish() defer fx.finish()
in, out := net.Pipe() in, out := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close() defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
dc, err := fx.AcquireDrpcConn(ctx) dc, err := fx.AcquireDrpcConn(ctx)
@ -77,6 +81,52 @@ func TestPeer_TryClose(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
assert.True(t, res) assert.True(t, res)
}) })
t.Run("gc", func(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
now := time.Now()
fx.mc.EXPECT().LastUsage().Return(now.Add(time.Millisecond * 100))
// make one inactive
in, out := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
}()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
dc, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
// make one active but closed
in2, out2 := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out2, defaultProtoChecker)
}()
defer out2.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in2, nil)
dc2, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
_ = dc2.Close()
// make one inactive and closed
in3, out3 := net.Pipe()
go func() {
handshake.IncomingProtoHandshake(ctx, out3, defaultProtoChecker)
}()
defer out3.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in3, nil)
dc3, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
fx.ReleaseDrpcConn(dc3)
_ = dc3.Close()
fx.ReleaseDrpcConn(dc)
time.Sleep(time.Millisecond * 100)
res, err := fx.TryClose(time.Millisecond * 50)
require.NoError(t, err)
assert.False(t, res)
})
} }
type acceptedConn struct { type acceptedConn struct {