peer drpc conn pool

This commit is contained in:
Sergey Cherepanov 2023-05-29 17:56:44 +02:00
parent a898c6fc9c
commit 553ed3a64b
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
6 changed files with 316 additions and 9 deletions

View File

@ -1,7 +1,12 @@
package peer
import (
"context"
"github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/net/transport"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"sync"
"time"
"github.com/anyproto/any-sync/app/logger"
@ -12,7 +17,10 @@ var log = logger.NewNamed("common.net.peer")
func NewPeer(mc transport.MultiConn) (p Peer, err error) {
ctx := mc.Context()
pr := &peer{}
pr := &peer{
active: map[drpc.Conn]struct{}{},
MultiConn: mc,
}
if pr.id, err = CtxPeerId(ctx); err != nil {
return
}
@ -21,12 +29,25 @@ func NewPeer(mc transport.MultiConn) (p Peer, err error) {
type Peer interface {
Id() string
AcquireDrpcConn(ctx context.Context) (drpc.Conn, error)
ReleaseDrpcConn(conn drpc.Conn)
IsClosed() bool
TryClose(objectTTL time.Duration) (res bool, err error)
transport.MultiConn
ocache.Object
}
type peer struct {
id string
// drpc conn pool
inactive []drpc.Conn
active map[drpc.Conn]struct{}
mu sync.Mutex
transport.MultiConn
}
@ -34,10 +55,44 @@ func (p *peer) Id() string {
return p.id
}
func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
p.mu.Lock()
defer p.mu.Unlock()
if len(p.inactive) == 0 {
conn, err := p.Open(ctx)
if err != nil {
return nil, err
}
dconn := drpcconn.New(conn)
p.inactive = append(p.inactive, dconn)
}
idx := len(p.inactive) - 1
res := p.inactive[idx]
p.inactive = p.inactive[:idx]
p.active[res] = struct{}{}
return res, nil
}
func (p *peer) ReleaseDrpcConn(conn drpc.Conn) {
p.mu.Lock()
defer p.mu.Unlock()
if _, ok := p.active[conn]; ok {
delete(p.active, conn)
}
p.inactive = append(p.inactive, conn)
return
}
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
if time.Now().Sub(p.LastUsage()) < objectTTL {
return false, nil
}
p.mu.Lock()
if len(p.active) > 0 {
p.mu.Unlock()
return false, nil
}
p.mu.Unlock()
return true, p.Close()
}

62
net/peer/peer_test.go Normal file
View File

@ -0,0 +1,62 @@
package peer
import (
"context"
"github.com/anyproto/any-sync/net/transport/mock_transport"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net"
"testing"
)
var ctx = context.Background()
func TestPeer_AcquireDrpcConn(t *testing.T) {
fx := newFixture(t, "p1")
defer fx.finish()
in, out := net.Pipe()
defer out.Close()
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
dc, err := fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
assert.NotEmpty(t, dc)
defer dc.Close()
assert.Len(t, fx.active, 1)
assert.Len(t, fx.inactive, 0)
fx.ReleaseDrpcConn(dc)
assert.Len(t, fx.active, 0)
assert.Len(t, fx.inactive, 1)
dc, err = fx.AcquireDrpcConn(ctx)
require.NoError(t, err)
assert.NotEmpty(t, dc)
assert.Len(t, fx.active, 1)
assert.Len(t, fx.inactive, 0)
}
func newFixture(t *testing.T, peerId string) *fixture {
fx := &fixture{
ctrl: gomock.NewController(t),
}
fx.mc = mock_transport.NewMockMultiConn(fx.ctrl)
ctx := CtxWithPeerId(context.Background(), peerId)
fx.mc.EXPECT().Context().Return(ctx).AnyTimes()
p, err := NewPeer(fx.mc)
require.NoError(t, err)
fx.peer = p.(*peer)
return fx
}
type fixture struct {
*peer
ctrl *gomock.Controller
mc *mock_transport.MockMultiConn
}
func (fx *fixture) finish() {
fx.ctrl.Finish()
}

View File

@ -83,7 +83,7 @@ func (p *peerService) Accept(mc transport.MultiConn) (err error) {
if err != nil {
return err
}
if err = p.pool.AddPeer(pr); err != nil {
if err = p.pool.AddPeer(context.Background(), pr); err != nil {
_ = pr.Close()
}
return

View File

@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
net2 "net"
"storj.io/drpc"
"testing"
"time"
)
@ -216,6 +217,12 @@ type testPeer struct {
closed chan struct{}
}
func (t *testPeer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
return nil, fmt.Errorf("not implemented")
}
func (t *testPeer) ReleaseDrpcConn(conn drpc.Conn) {}
func (t *testPeer) Context() context.Context {
//TODO implement me
panic("implement me")
@ -239,12 +246,6 @@ func (t *testPeer) Id() string {
return t.id
}
func (t *testPeer) LastUsage() time.Time {
return time.Now()
}
func (t *testPeer) UpdateLastUsage() {}
func (t *testPeer) TryClose(objectTTL time.Duration) (res bool, err error) {
return true, t.Close()
}

View File

@ -0,0 +1,188 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anyproto/any-sync/net/transport (interfaces: Transport,MultiConn)
// Package mock_transport is a generated GoMock package.
package mock_transport
import (
context "context"
net "net"
reflect "reflect"
time "time"
transport "github.com/anyproto/any-sync/net/transport"
gomock "github.com/golang/mock/gomock"
)
// MockTransport is a mock of Transport interface.
type MockTransport struct {
ctrl *gomock.Controller
recorder *MockTransportMockRecorder
}
// MockTransportMockRecorder is the mock recorder for MockTransport.
type MockTransportMockRecorder struct {
mock *MockTransport
}
// NewMockTransport creates a new mock instance.
func NewMockTransport(ctrl *gomock.Controller) *MockTransport {
mock := &MockTransport{ctrl: ctrl}
mock.recorder = &MockTransportMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockTransport) EXPECT() *MockTransportMockRecorder {
return m.recorder
}
// Dial mocks base method.
func (m *MockTransport) Dial(arg0 context.Context, arg1 string) (transport.MultiConn, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Dial", arg0, arg1)
ret0, _ := ret[0].(transport.MultiConn)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Dial indicates an expected call of Dial.
func (mr *MockTransportMockRecorder) Dial(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Dial", reflect.TypeOf((*MockTransport)(nil).Dial), arg0, arg1)
}
// SetAccepter mocks base method.
func (m *MockTransport) SetAccepter(arg0 transport.Accepter) {
m.ctrl.T.Helper()
m.ctrl.Call(m, "SetAccepter", arg0)
}
// SetAccepter indicates an expected call of SetAccepter.
func (mr *MockTransportMockRecorder) SetAccepter(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetAccepter", reflect.TypeOf((*MockTransport)(nil).SetAccepter), arg0)
}
// MockMultiConn is a mock of MultiConn interface.
type MockMultiConn struct {
ctrl *gomock.Controller
recorder *MockMultiConnMockRecorder
}
// MockMultiConnMockRecorder is the mock recorder for MockMultiConn.
type MockMultiConnMockRecorder struct {
mock *MockMultiConn
}
// NewMockMultiConn creates a new mock instance.
func NewMockMultiConn(ctrl *gomock.Controller) *MockMultiConn {
mock := &MockMultiConn{ctrl: ctrl}
mock.recorder = &MockMultiConnMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockMultiConn) EXPECT() *MockMultiConnMockRecorder {
return m.recorder
}
// Accept mocks base method.
func (m *MockMultiConn) Accept() (net.Conn, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Accept")
ret0, _ := ret[0].(net.Conn)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Accept indicates an expected call of Accept.
func (mr *MockMultiConnMockRecorder) Accept() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Accept", reflect.TypeOf((*MockMultiConn)(nil).Accept))
}
// Addr mocks base method.
func (m *MockMultiConn) Addr() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Addr")
ret0, _ := ret[0].(string)
return ret0
}
// Addr indicates an expected call of Addr.
func (mr *MockMultiConnMockRecorder) Addr() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Addr", reflect.TypeOf((*MockMultiConn)(nil).Addr))
}
// Close mocks base method.
func (m *MockMultiConn) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockMultiConnMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockMultiConn)(nil).Close))
}
// Context mocks base method.
func (m *MockMultiConn) Context() context.Context {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Context")
ret0, _ := ret[0].(context.Context)
return ret0
}
// Context indicates an expected call of Context.
func (mr *MockMultiConnMockRecorder) Context() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockMultiConn)(nil).Context))
}
// IsClosed mocks base method.
func (m *MockMultiConn) IsClosed() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "IsClosed")
ret0, _ := ret[0].(bool)
return ret0
}
// IsClosed indicates an expected call of IsClosed.
func (mr *MockMultiConnMockRecorder) IsClosed() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockMultiConn)(nil).IsClosed))
}
// LastUsage mocks base method.
func (m *MockMultiConn) LastUsage() time.Time {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LastUsage")
ret0, _ := ret[0].(time.Time)
return ret0
}
// LastUsage indicates an expected call of LastUsage.
func (mr *MockMultiConnMockRecorder) LastUsage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockMultiConn)(nil).LastUsage))
}
// Open mocks base method.
func (m *MockMultiConn) Open(arg0 context.Context) (net.Conn, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Open", arg0)
ret0, _ := ret[0].(net.Conn)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// Open indicates an expected call of Open.
func (mr *MockMultiConnMockRecorder) Open(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockMultiConn)(nil).Open), arg0)
}

View File

@ -1,3 +1,4 @@
//go:generate mockgen -destination mock_transport/mock_transport.go github.com/anyproto/any-sync/net/transport Transport,MultiConn
package transport
import (