drpc conn config

This commit is contained in:
Sergey Cherepanov 2023-06-08 13:10:30 +02:00
parent c7828d0671
commit 33cbdd06a6
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
4 changed files with 31 additions and 2 deletions

View File

@ -6,6 +6,7 @@ import (
"github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/app/ocache"
"github.com/anyproto/any-sync/net/connutil" "github.com/anyproto/any-sync/net/connutil"
"github.com/anyproto/any-sync/net/rpc"
"github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/anyproto/any-sync/net/secureservice/handshake"
"github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto" "github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto"
"github.com/anyproto/any-sync/net/transport" "github.com/anyproto/any-sync/net/transport"
@ -14,6 +15,9 @@ import (
"net" "net"
"storj.io/drpc" "storj.io/drpc"
"storj.io/drpc/drpcconn" "storj.io/drpc/drpcconn"
"storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire"
"sync" "sync"
"time" "time"
) )
@ -22,6 +26,7 @@ var log = logger.NewNamed("common.net.peer")
type connCtrl interface { type connCtrl interface {
ServeConn(ctx context.Context, conn net.Conn) (err error) ServeConn(ctx context.Context, conn net.Conn) (err error)
DrpcConfig() rpc.Config
} }
func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) { func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) {
@ -127,8 +132,14 @@ func (p *peer) openDrpcConn(ctx context.Context) (dconn *subConn, err error) {
return nil, err return nil, err
} }
tconn := connutil.NewLastUsageConn(conn) tconn := connutil.NewLastUsageConn(conn)
bufSize := p.ctrl.DrpcConfig().Stream.MaxMsgSizeMb * (1 << 20)
return &subConn{ return &subConn{
Conn: drpcconn.New(tconn), Conn: drpcconn.NewWithOptions(conn, drpcconn.Options{
Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{MaximumBufferSize: bufSize},
Stream: drpcstream.Options{MaximumBufferSize: bufSize},
},
}),
LastUsageConn: tconn, LastUsageConn: tconn,
}, nil }, nil
} }

View File

@ -2,6 +2,7 @@ package peer
import ( import (
"context" "context"
"github.com/anyproto/any-sync/net/rpc"
"github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/anyproto/any-sync/net/secureservice/handshake"
"github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto" "github.com/anyproto/any-sync/net/secureservice/handshake/handshakeproto"
"github.com/anyproto/any-sync/net/transport/mock_transport" "github.com/anyproto/any-sync/net/transport/mock_transport"
@ -176,6 +177,10 @@ type testCtrl struct {
closeCh chan struct{} closeCh chan struct{}
} }
func (t *testCtrl) DrpcConfig() rpc.Config {
return rpc.Config{Stream: rpc.StreamConfig{MaxMsgSizeMb: 10}}
}
func (t *testCtrl) ServeConn(ctx context.Context, conn net.Conn) (err error) { func (t *testCtrl) ServeConn(ctx context.Context, conn net.Conn) (err error) {
t.serveConn <- conn t.serveConn <- conn
<-t.closeCh <-t.closeCh

View File

@ -3,6 +3,7 @@ package rpctest
import ( import (
"context" "context"
"github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/net/rpc"
"github.com/anyproto/any-sync/net/rpc/server" "github.com/anyproto/any-sync/net/rpc/server"
"net" "net"
"storj.io/drpc/drpcmux" "storj.io/drpc/drpcmux"
@ -41,3 +42,7 @@ func (ts *TestServer) Close(ctx context.Context) (err error) {
func (s *TestServer) ServeConn(ctx context.Context, conn net.Conn) (err error) { func (s *TestServer) ServeConn(ctx context.Context, conn net.Conn) (err error) {
return s.Server.ServeOne(ctx, conn) return s.Server.ServeOne(ctx, conn)
} }
func (s *TestServer) DrpcConfig() rpc.Config {
return rpc.Config{Stream: rpc.StreamConfig{MaxMsgSizeMb: 10}}
}

View File

@ -12,6 +12,7 @@ import (
"storj.io/drpc/drpcmanager" "storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcmux" "storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver" "storj.io/drpc/drpcserver"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire" "storj.io/drpc/drpcwire"
) )
@ -25,6 +26,7 @@ func New() DRPCServer {
type DRPCServer interface { type DRPCServer interface {
ServeConn(ctx context.Context, conn net.Conn) (err error) ServeConn(ctx context.Context, conn net.Conn) (err error)
DrpcConfig() rpc.Config
app.Component app.Component
drpc.Mux drpc.Mux
} }
@ -52,8 +54,10 @@ func (s *drpcServer) Init(a *app.App) (err error) {
if s.metric != nil { if s.metric != nil {
handler = s.metric.WrapDRPCHandler(s) handler = s.metric.WrapDRPCHandler(s)
} }
bufSize := s.config.Stream.MaxMsgSizeMb * (1 << 20)
s.drpcServer = drpcserver.NewWithOptions(handler, drpcserver.Options{Manager: drpcmanager.Options{ s.drpcServer = drpcserver.NewWithOptions(handler, drpcserver.Options{Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{MaximumBufferSize: s.config.Stream.MaxMsgSizeMb * (1 << 20)}, Reader: drpcwire.ReaderOptions{MaximumBufferSize: bufSize},
Stream: drpcstream.Options{MaximumBufferSize: bufSize},
}}) }})
return return
} }
@ -63,3 +67,7 @@ func (s *drpcServer) ServeConn(ctx context.Context, conn net.Conn) (err error) {
l.Debug("drpc serve peer") l.Debug("drpc serve peer")
return s.drpcServer.ServeOne(ctx, conn) return s.drpcServer.ServeOne(ctx, conn)
} }
func (s *drpcServer) DrpcConfig() rpc.Config {
return s.config
}