diff --git a/commonspace/requestmanager/requestmanager.go b/commonspace/requestmanager/requestmanager.go index ac4351c0..d72ea394 100644 --- a/commonspace/requestmanager/requestmanager.go +++ b/commonspace/requestmanager/requestmanager.go @@ -38,19 +38,21 @@ type MessageHandler interface { type requestManager struct { sync.Mutex - pools map[string]*streampool.ExecPool - peerPool pool.Pool - workers int - queueSize int - handler MessageHandler - ctx context.Context - cancel context.CancelFunc + pools map[string]*streampool.ExecPool + peerPool pool.Pool + workers int + queueSize int + handler MessageHandler + ctx context.Context + cancel context.CancelFunc + clientFactory spacesyncproto.ClientFactory } func (r *requestManager) Init(a *app.App) (err error) { r.ctx, r.cancel = context.WithCancel(context.Background()) r.handler = a.MustComponent(objectsync.CName).(MessageHandler) r.peerPool = a.MustComponent(pool.CName).(pool.Pool) + r.clientFactory = spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) return } @@ -89,18 +91,24 @@ func (r *requestManager) QueueRequest(peerId string, req *spacesyncproto.ObjectS // TODO: for later think when many clients are there, // we need to close pools for inactive clients return pl.TryAdd(func() { - ctx := r.ctx - resp, err := r.doRequest(ctx, peerId, req) - if err != nil { - log.Warn("failed to send request", zap.Error(err)) - return - } - ctx = peer.CtxWithPeerId(ctx, peerId) - _ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{ - SenderId: peerId, - Message: resp, - PeerCtx: ctx, - }) + doRequestAndHandle(r, peerId, req) + }) +} + +var doRequestAndHandle = (*requestManager).requestAndHandle + +func (r *requestManager) requestAndHandle(peerId string, req *spacesyncproto.ObjectSyncMessage) { + ctx := r.ctx + resp, err := r.doRequest(ctx, peerId, req) + if err != nil { + log.Warn("failed to send request", zap.Error(err)) + return + } + ctx = peer.CtxWithPeerId(ctx, peerId) + _ = r.handler.HandleMessage(ctx, objectsync.HandleMessage{ + SenderId: peerId, + Message: resp, + PeerCtx: ctx, }) } @@ -110,7 +118,7 @@ func (r *requestManager) doRequest(ctx context.Context, peerId string, msg *spac return } err = pr.DoDrpc(ctx, func(conn drpc.Conn) error { - cl := spacesyncproto.NewDRPCSpaceSyncClient(conn) + cl := r.clientFactory.Client(conn) resp, err = cl.ObjectSync(ctx, msg) return err }) diff --git a/commonspace/requestmanager/requestmanager_test.go b/commonspace/requestmanager/requestmanager_test.go index e5e89798..1bdaa884 100644 --- a/commonspace/requestmanager/requestmanager_test.go +++ b/commonspace/requestmanager/requestmanager_test.go @@ -1 +1,97 @@ package requestmanager + +import ( + "context" + "github.com/anyproto/any-sync/commonspace/objectsync" + "github.com/anyproto/any-sync/commonspace/objectsync/mock_objectsync" + "github.com/anyproto/any-sync/commonspace/spacesyncproto" + "github.com/anyproto/any-sync/commonspace/spacesyncproto/mock_spacesyncproto" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/peer/mock_peer" + "github.com/anyproto/any-sync/net/pool/mock_pool" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "storj.io/drpc" + "storj.io/drpc/drpcconn" + "testing" +) + +type fixture struct { + requestManager *requestManager + messageHandlerMock *mock_objectsync.MockObjectSync + peerPoolMock *mock_pool.MockPool + clientMock *mock_spacesyncproto.MockDRPCSpaceSyncClient + ctrl *gomock.Controller +} + +func newFixture(t *testing.T) *fixture { + ctrl := gomock.NewController(t) + manager := New().(*requestManager) + peerPoolMock := mock_pool.NewMockPool(ctrl) + messageHandlerMock := mock_objectsync.NewMockObjectSync(ctrl) + clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) + manager.peerPool = peerPoolMock + manager.handler = messageHandlerMock + manager.clientFactory = spacesyncproto.ClientFactoryFunc(func(cc drpc.Conn) spacesyncproto.DRPCSpaceSyncClient { + return clientMock + }) + manager.ctx, manager.cancel = context.WithCancel(context.Background()) + return &fixture{ + requestManager: manager, + messageHandlerMock: messageHandlerMock, + peerPoolMock: peerPoolMock, + clientMock: clientMock, + ctrl: ctrl, + } +} + +func (fx *fixture) stop() { + fx.ctrl.Finish() +} + +func TestRequestManager_Request(t *testing.T) { + ctx := context.Background() + + t.Run("send request", func(t *testing.T) { + fx := newFixture(t) + defer fx.stop() + + peerId := "peerId" + peerMock := mock_peer.NewMockPeer(fx.ctrl) + conn := &drpcconn.Conn{} + msg := &spacesyncproto.ObjectSyncMessage{} + resp := &spacesyncproto.ObjectSyncMessage{} + fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil) + fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil) + peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) { + drpcHandler(conn) + }).Return(nil) + res, err := fx.requestManager.SendRequest(ctx, peerId, msg) + require.NoError(t, err) + require.Equal(t, resp, res) + }) + + t.Run("request and handle", func(t *testing.T) { + fx := newFixture(t) + defer fx.stop() + ctx = fx.requestManager.ctx + + peerId := "peerId" + peerMock := mock_peer.NewMockPeer(fx.ctrl) + conn := &drpcconn.Conn{} + msg := &spacesyncproto.ObjectSyncMessage{} + resp := &spacesyncproto.ObjectSyncMessage{} + fx.peerPoolMock.EXPECT().Get(ctx, peerId).Return(peerMock, nil) + fx.clientMock.EXPECT().ObjectSync(ctx, msg).Return(resp, nil) + peerMock.EXPECT().DoDrpc(ctx, gomock.Any()).DoAndReturn(func(ctx context.Context, drpcHandler func(conn drpc.Conn) error) { + drpcHandler(conn) + }).Return(nil) + fx.messageHandlerMock.EXPECT().HandleMessage(gomock.Any(), gomock.Any()).DoAndReturn(func(ctx context.Context, msg objectsync.HandleMessage) { + require.Equal(t, peerId, msg.SenderId) + require.Equal(t, resp, msg.Message) + pId, _ := peer.CtxPeerId(msg.PeerCtx) + require.Equal(t, peerId, pId) + }).Return(nil) + fx.requestManager.requestAndHandle(peerId, msg) + }) +} diff --git a/net/peer/mock_peer/mock_peer.go b/net/peer/mock_peer/mock_peer.go new file mode 100644 index 00000000..dc0a5b6a --- /dev/null +++ b/net/peer/mock_peer/mock_peer.go @@ -0,0 +1,149 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anyproto/any-sync/net/peer (interfaces: Peer) + +// Package mock_peer is a generated GoMock package. +package mock_peer + +import ( + context "context" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" + drpc "storj.io/drpc" +) + +// MockPeer is a mock of Peer interface. +type MockPeer struct { + ctrl *gomock.Controller + recorder *MockPeerMockRecorder +} + +// MockPeerMockRecorder is the mock recorder for MockPeer. +type MockPeerMockRecorder struct { + mock *MockPeer +} + +// NewMockPeer creates a new mock instance. +func NewMockPeer(ctrl *gomock.Controller) *MockPeer { + mock := &MockPeer{ctrl: ctrl} + mock.recorder = &MockPeerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPeer) EXPECT() *MockPeerMockRecorder { + return m.recorder +} + +// AcquireDrpcConn mocks base method. +func (m *MockPeer) AcquireDrpcConn(arg0 context.Context) (drpc.Conn, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AcquireDrpcConn", arg0) + ret0, _ := ret[0].(drpc.Conn) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// AcquireDrpcConn indicates an expected call of AcquireDrpcConn. +func (mr *MockPeerMockRecorder) AcquireDrpcConn(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AcquireDrpcConn", reflect.TypeOf((*MockPeer)(nil).AcquireDrpcConn), arg0) +} + +// Close mocks base method. +func (m *MockPeer) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockPeerMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPeer)(nil).Close)) +} + +// Context mocks base method. +func (m *MockPeer) Context() context.Context { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Context") + ret0, _ := ret[0].(context.Context) + return ret0 +} + +// Context indicates an expected call of Context. +func (mr *MockPeerMockRecorder) Context() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Context", reflect.TypeOf((*MockPeer)(nil).Context)) +} + +// DoDrpc mocks base method. +func (m *MockPeer) DoDrpc(arg0 context.Context, arg1 func(drpc.Conn) error) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DoDrpc", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DoDrpc indicates an expected call of DoDrpc. +func (mr *MockPeerMockRecorder) DoDrpc(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DoDrpc", reflect.TypeOf((*MockPeer)(nil).DoDrpc), arg0, arg1) +} + +// Id mocks base method. +func (m *MockPeer) Id() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Id") + ret0, _ := ret[0].(string) + return ret0 +} + +// Id indicates an expected call of Id. +func (mr *MockPeerMockRecorder) Id() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Id", reflect.TypeOf((*MockPeer)(nil).Id)) +} + +// IsClosed mocks base method. +func (m *MockPeer) IsClosed() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsClosed") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsClosed indicates an expected call of IsClosed. +func (mr *MockPeerMockRecorder) IsClosed() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockPeer)(nil).IsClosed)) +} + +// ReleaseDrpcConn mocks base method. +func (m *MockPeer) ReleaseDrpcConn(arg0 drpc.Conn) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "ReleaseDrpcConn", arg0) +} + +// ReleaseDrpcConn indicates an expected call of ReleaseDrpcConn. +func (mr *MockPeerMockRecorder) ReleaseDrpcConn(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReleaseDrpcConn", reflect.TypeOf((*MockPeer)(nil).ReleaseDrpcConn), arg0) +} + +// TryClose mocks base method. +func (m *MockPeer) TryClose(arg0 time.Duration) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TryClose", arg0) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TryClose indicates an expected call of TryClose. +func (mr *MockPeerMockRecorder) TryClose(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TryClose", reflect.TypeOf((*MockPeer)(nil).TryClose), arg0) +} diff --git a/net/peer/peer.go b/net/peer/peer.go index 7f42260c..300243c4 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_peer/mock_peer.go github.com/anyproto/any-sync/net/peer Peer package peer import ( diff --git a/net/pool/mock_pool/mock_pool.go b/net/pool/mock_pool/mock_pool.go new file mode 100644 index 00000000..be884903 --- /dev/null +++ b/net/pool/mock_pool/mock_pool.go @@ -0,0 +1,80 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anyproto/any-sync/net/pool (interfaces: Pool) + +// Package mock_pool is a generated GoMock package. +package mock_pool + +import ( + context "context" + reflect "reflect" + + peer "github.com/anyproto/any-sync/net/peer" + gomock "github.com/golang/mock/gomock" +) + +// MockPool is a mock of Pool interface. +type MockPool struct { + ctrl *gomock.Controller + recorder *MockPoolMockRecorder +} + +// MockPoolMockRecorder is the mock recorder for MockPool. +type MockPoolMockRecorder struct { + mock *MockPool +} + +// NewMockPool creates a new mock instance. +func NewMockPool(ctrl *gomock.Controller) *MockPool { + mock := &MockPool{ctrl: ctrl} + mock.recorder = &MockPoolMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPool) EXPECT() *MockPoolMockRecorder { + return m.recorder +} + +// AddPeer mocks base method. +func (m *MockPool) AddPeer(arg0 context.Context, arg1 peer.Peer) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddPeer", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddPeer indicates an expected call of AddPeer. +func (mr *MockPoolMockRecorder) AddPeer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPeer", reflect.TypeOf((*MockPool)(nil).AddPeer), arg0, arg1) +} + +// Get mocks base method. +func (m *MockPool) Get(arg0 context.Context, arg1 string) (peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", arg0, arg1) + ret0, _ := ret[0].(peer.Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockPoolMockRecorder) Get(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockPool)(nil).Get), arg0, arg1) +} + +// GetOneOf mocks base method. +func (m *MockPool) GetOneOf(arg0 context.Context, arg1 []string) (peer.Peer, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetOneOf", arg0, arg1) + ret0, _ := ret[0].(peer.Peer) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetOneOf indicates an expected call of GetOneOf. +func (mr *MockPoolMockRecorder) GetOneOf(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetOneOf", reflect.TypeOf((*MockPool)(nil).GetOneOf), arg0, arg1) +} diff --git a/net/pool/pool.go b/net/pool/pool.go index 37f8328e..7e936e1b 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_pool/mock_pool.go github.com/anyproto/any-sync/net/pool Pool package pool import (