From 553ed3a64b350fa53f81864c65a5b54d810a168d Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 29 May 2023 17:56:44 +0200 Subject: [PATCH] peer drpc conn pool --- net/peer/peer.go | 59 +++++- net/peer/peer_test.go | 62 ++++++ net/peerservice/peerservice.go | 2 +- net/pool/pool_test.go | 13 +- .../mock_transport/mock_transport.go | 188 ++++++++++++++++++ net/transport/transport.go | 1 + 6 files changed, 316 insertions(+), 9 deletions(-) create mode 100644 net/peer/peer_test.go create mode 100644 net/transport/mock_transport/mock_transport.go diff --git a/net/peer/peer.go b/net/peer/peer.go index cfe0b636..df8a2799 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -1,7 +1,12 @@ package peer import ( + "context" + "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/net/transport" + "storj.io/drpc" + "storj.io/drpc/drpcconn" + "sync" "time" "github.com/anyproto/any-sync/app/logger" @@ -12,7 +17,10 @@ var log = logger.NewNamed("common.net.peer") func NewPeer(mc transport.MultiConn) (p Peer, err error) { ctx := mc.Context() - pr := &peer{} + pr := &peer{ + active: map[drpc.Conn]struct{}{}, + MultiConn: mc, + } if pr.id, err = CtxPeerId(ctx); err != nil { return } @@ -21,12 +29,25 @@ func NewPeer(mc transport.MultiConn) (p Peer, err error) { type Peer interface { Id() string + + AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) + ReleaseDrpcConn(conn drpc.Conn) + + IsClosed() bool + TryClose(objectTTL time.Duration) (res bool, err error) - transport.MultiConn + + ocache.Object } type peer struct { id string + + // drpc conn pool + inactive []drpc.Conn + active map[drpc.Conn]struct{} + mu sync.Mutex + transport.MultiConn } @@ -34,10 +55,44 @@ func (p *peer) Id() string { return p.id } +func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) { + p.mu.Lock() + defer p.mu.Unlock() + if len(p.inactive) == 0 { + conn, err := p.Open(ctx) + if err != nil { + return nil, err + } + dconn := drpcconn.New(conn) + p.inactive = append(p.inactive, dconn) + } + idx := len(p.inactive) - 1 + res := p.inactive[idx] + p.inactive = p.inactive[:idx] + p.active[res] = struct{}{} + return res, nil +} + +func (p *peer) ReleaseDrpcConn(conn drpc.Conn) { + p.mu.Lock() + defer p.mu.Unlock() + if _, ok := p.active[conn]; ok { + delete(p.active, conn) + } + p.inactive = append(p.inactive, conn) + return +} + func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) { if time.Now().Sub(p.LastUsage()) < objectTTL { return false, nil } + p.mu.Lock() + if len(p.active) > 0 { + p.mu.Unlock() + return false, nil + } + p.mu.Unlock() return true, p.Close() } diff --git a/net/peer/peer_test.go b/net/peer/peer_test.go new file mode 100644 index 00000000..41efc45d --- /dev/null +++ b/net/peer/peer_test.go @@ -0,0 +1,62 @@ +package peer + +import ( + "context" + "github.com/anyproto/any-sync/net/transport/mock_transport" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "net" + "testing" +) + +var ctx = context.Background() + +func TestPeer_AcquireDrpcConn(t *testing.T) { + fx := newFixture(t, "p1") + defer fx.finish() + in, out := net.Pipe() + defer out.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) + dc, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + assert.NotEmpty(t, dc) + defer dc.Close() + + assert.Len(t, fx.active, 1) + assert.Len(t, fx.inactive, 0) + + fx.ReleaseDrpcConn(dc) + + assert.Len(t, fx.active, 0) + assert.Len(t, fx.inactive, 1) + + dc, err = fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + assert.NotEmpty(t, dc) + assert.Len(t, fx.active, 1) + assert.Len(t, fx.inactive, 0) +} + +func newFixture(t *testing.T, peerId string) *fixture { + fx := &fixture{ + ctrl: gomock.NewController(t), + } + fx.mc = mock_transport.NewMockMultiConn(fx.ctrl) + ctx := CtxWithPeerId(context.Background(), peerId) + fx.mc.EXPECT().Context().Return(ctx).AnyTimes() + p, err := NewPeer(fx.mc) + require.NoError(t, err) + fx.peer = p.(*peer) + return fx +} + +type fixture struct { + *peer + ctrl *gomock.Controller + mc *mock_transport.MockMultiConn +} + +func (fx *fixture) finish() { + fx.ctrl.Finish() +} diff --git a/net/peerservice/peerservice.go b/net/peerservice/peerservice.go index 3394c5e8..a83e6785 100644 --- a/net/peerservice/peerservice.go +++ b/net/peerservice/peerservice.go @@ -83,7 +83,7 @@ func (p *peerService) Accept(mc transport.MultiConn) (err error) { if err != nil { return err } - if err = p.pool.AddPeer(pr); err != nil { + if err = p.pool.AddPeer(context.Background(), pr); err != nil { _ = pr.Close() } return diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index b1cc0087..c31fcf69 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" net2 "net" + "storj.io/drpc" "testing" "time" ) @@ -216,6 +217,12 @@ type testPeer struct { closed chan struct{} } +func (t *testPeer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) { + return nil, fmt.Errorf("not implemented") +} + +func (t *testPeer) ReleaseDrpcConn(conn drpc.Conn) {} + func (t *testPeer) Context() context.Context { //TODO implement me panic("implement me") @@ -239,12 +246,6 @@ func (t *testPeer) Id() string { return t.id } -func (t *testPeer) LastUsage() time.Time { - return time.Now() -} - -func (t *testPeer) UpdateLastUsage() {} - func (t *testPeer) TryClose(objectTTL time.Duration) (res bool, err error) { return true, t.Close() } diff --git a/net/transport/mock_transport/mock_transport.go b/net/transport/mock_transport/mock_transport.go new file mode 100644 index 00000000..43f5572c --- /dev/null +++ b/net/transport/mock_transport/mock_transport.go @@ -0,0 +1,188 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anyproto/any-sync/net/transport (interfaces: Transport,MultiConn) + +// Package mock_transport is a generated GoMock package. +package mock_transport + +import ( + context "context" + net "net" + reflect "reflect" + time "time" + + transport "github.com/anyproto/any-sync/net/transport" + gomock "github.com/golang/mock/gomock" +) + +// MockTransport is a mock of Transport interface. +type MockTransport struct { + ctrl *gomock.Controller + recorder *MockTransportMockRecorder +} + +// MockTransportMockRecorder is the mock recorder for MockTransport. +type MockTransportMockRecorder struct { + mock *MockTransport +} + +// NewMockTransport creates a new mock instance. +func NewMockTransport(ctrl *gomock.Controller) *MockTransport { + mock := &MockTransport{ctrl: ctrl} + mock.recorder = &MockTransportMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockTransport) EXPECT() *MockTransportMockRecorder { + return m.recorder +} + +// Dial mocks base method. +func (m *MockTransport) Dial(arg0 context.Context, arg1 string) (transport.MultiConn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Dial", arg0, arg1) + ret0, _ := ret[0].(transport.MultiConn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Dial indicates an expected call of Dial. +func (mr *MockTransportMockRecorder) Dial(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Dial", reflect.TypeOf((*MockTransport)(nil).Dial), arg0, arg1) +} + +// SetAccepter mocks base method. +func (m *MockTransport) SetAccepter(arg0 transport.Accepter) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetAccepter", arg0) +} + +// SetAccepter indicates an expected call of SetAccepter. +func (mr *MockTransportMockRecorder) SetAccepter(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAccepter", reflect.TypeOf((*MockTransport)(nil).SetAccepter), arg0) +} + +// MockMultiConn is a mock of MultiConn interface. +type MockMultiConn struct { + ctrl *gomock.Controller + recorder *MockMultiConnMockRecorder +} + +// MockMultiConnMockRecorder is the mock recorder for MockMultiConn. +type MockMultiConnMockRecorder struct { + mock *MockMultiConn +} + +// NewMockMultiConn creates a new mock instance. +func NewMockMultiConn(ctrl *gomock.Controller) *MockMultiConn { + mock := &MockMultiConn{ctrl: ctrl} + mock.recorder = &MockMultiConnMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockMultiConn) EXPECT() *MockMultiConnMockRecorder { + return m.recorder +} + +// Accept mocks base method. +func (m *MockMultiConn) Accept() (net.Conn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Accept") + ret0, _ := ret[0].(net.Conn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Accept indicates an expected call of Accept. +func (mr *MockMultiConnMockRecorder) Accept() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Accept", reflect.TypeOf((*MockMultiConn)(nil).Accept)) +} + +// Addr mocks base method. +func (m *MockMultiConn) Addr() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Addr") + ret0, _ := ret[0].(string) + return ret0 +} + +// Addr indicates an expected call of Addr. +func (mr *MockMultiConnMockRecorder) Addr() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addr", reflect.TypeOf((*MockMultiConn)(nil).Addr)) +} + +// Close mocks base method. +func (m *MockMultiConn) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockMultiConnMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockMultiConn)(nil).Close)) +} + +// Context mocks base method. +func (m *MockMultiConn) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockMultiConnMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockMultiConn)(nil).Context)) +} + +// IsClosed mocks base method. +func (m *MockMultiConn) IsClosed() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsClosed") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsClosed indicates an expected call of IsClosed. +func (mr *MockMultiConnMockRecorder) IsClosed() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockMultiConn)(nil).IsClosed)) +} + +// LastUsage mocks base method. +func (m *MockMultiConn) LastUsage() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LastUsage") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// LastUsage indicates an expected call of LastUsage. +func (mr *MockMultiConnMockRecorder) LastUsage() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockMultiConn)(nil).LastUsage)) +} + +// Open mocks base method. +func (m *MockMultiConn) Open(arg0 context.Context) (net.Conn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", arg0) + ret0, _ := ret[0].(net.Conn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Open indicates an expected call of Open. +func (mr *MockMultiConnMockRecorder) Open(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockMultiConn)(nil).Open), arg0) +} diff --git a/net/transport/transport.go b/net/transport/transport.go index 127c9c06..7b247793 100644 --- a/net/transport/transport.go +++ b/net/transport/transport.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_transport/mock_transport.go github.com/anyproto/any-sync/net/transport Transport,MultiConn package transport import (