diff --git a/net/peer/peer.go b/net/peer/peer.go index 300243c4..bd0d9b34 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -6,6 +6,7 @@ import ( "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/net/connutil" + "github.com/anyproto/any-sync/net/rpc" "github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto" "github.com/anyproto/any-sync/net/transport" @@ -14,6 +15,9 @@ import ( "net" "storj.io/drpc" "storj.io/drpc/drpcconn" + "storj.io/drpc/drpcmanager" + "storj.io/drpc/drpcstream" + "storj.io/drpc/drpcwire" "sync" "time" ) @@ -22,6 +26,7 @@ var log = logger.NewNamed("common.net.peer") type connCtrl interface { ServeConn(ctx context.Context, conn net.Conn) (err error) + DrpcConfig() rpc.Config } func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) { @@ -127,8 +132,14 @@ func (p *peer) openDrpcConn(ctx context.Context) (dconn *subConn, err error) { return nil, err } tconn := connutil.NewLastUsageConn(conn) + bufSize := p.ctrl.DrpcConfig().Stream.MaxMsgSizeMb * (1 << 20) return &subConn{ - Conn: drpcconn.New(tconn), + Conn: drpcconn.NewWithOptions(conn, drpcconn.Options{ + Manager: drpcmanager.Options{ + Reader: drpcwire.ReaderOptions{MaximumBufferSize: bufSize}, + Stream: drpcstream.Options{MaximumBufferSize: bufSize}, + }, + }), LastUsageConn: tconn, }, nil } diff --git a/net/peer/peer_test.go b/net/peer/peer_test.go index ac1ff28b..c0046923 100644 --- a/net/peer/peer_test.go +++ b/net/peer/peer_test.go @@ -2,6 +2,7 @@ package peer import ( "context" + "github.com/anyproto/any-sync/net/rpc" "github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto" "github.com/anyproto/any-sync/net/transport/mock_transport" @@ -176,6 +177,10 @@ type testCtrl struct { closeCh chan struct{} } +func (t *testCtrl) DrpcConfig() rpc.Config { + return rpc.Config{Stream: rpc.StreamConfig{MaxMsgSizeMb: 10}} +} + func (t *testCtrl) ServeConn(ctx context.Context, conn net.Conn) (err error) { t.serveConn <- conn <-t.closeCh diff --git a/net/rpc/rpctest/server.go b/net/rpc/rpctest/server.go index 053187f4..5c03f3e4 100644 --- a/net/rpc/rpctest/server.go +++ b/net/rpc/rpctest/server.go @@ -3,6 +3,7 @@ package rpctest import ( "context" "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/net/rpc" "github.com/anyproto/any-sync/net/rpc/server" "net" "storj.io/drpc/drpcmux" @@ -41,3 +42,7 @@ func (ts *TestServer) Close(ctx context.Context) (err error) { func (s *TestServer) ServeConn(ctx context.Context, conn net.Conn) (err error) { return s.Server.ServeOne(ctx, conn) } + +func (s *TestServer) DrpcConfig() rpc.Config { + return rpc.Config{Stream: rpc.StreamConfig{MaxMsgSizeMb: 10}} +} diff --git a/net/rpc/server/drpcserver.go b/net/rpc/server/drpcserver.go index 48b3bfed..8fc112ca 100644 --- a/net/rpc/server/drpcserver.go +++ b/net/rpc/server/drpcserver.go @@ -12,6 +12,7 @@ import ( "storj.io/drpc/drpcmanager" "storj.io/drpc/drpcmux" "storj.io/drpc/drpcserver" + "storj.io/drpc/drpcstream" "storj.io/drpc/drpcwire" ) @@ -25,6 +26,7 @@ func New() DRPCServer { type DRPCServer interface { ServeConn(ctx context.Context, conn net.Conn) (err error) + DrpcConfig() rpc.Config app.Component drpc.Mux } @@ -52,8 +54,10 @@ func (s *drpcServer) Init(a *app.App) (err error) { if s.metric != nil { handler = s.metric.WrapDRPCHandler(s) } + bufSize := s.config.Stream.MaxMsgSizeMb * (1 << 20) s.drpcServer = drpcserver.NewWithOptions(handler, drpcserver.Options{Manager: drpcmanager.Options{ - Reader: drpcwire.ReaderOptions{MaximumBufferSize: s.config.Stream.MaxMsgSizeMb * (1 << 20)}, + Reader: drpcwire.ReaderOptions{MaximumBufferSize: bufSize}, + Stream: drpcstream.Options{MaximumBufferSize: bufSize}, }}) return } @@ -63,3 +67,7 @@ func (s *drpcServer) ServeConn(ctx context.Context, conn net.Conn) (err error) { l.Debug("drpc serve peer") return s.drpcServer.ServeOne(ctx, conn) } + +func (s *drpcServer) DrpcConfig() rpc.Config { + return s.config +}