diff --git a/net/connutil/timeout.go b/net/connutil/timeout.go index 057c7b8c..4c4f4c78 100644 --- a/net/connutil/timeout.go +++ b/net/connutil/timeout.go @@ -21,7 +21,13 @@ func NewTimeout(conn net.Conn, timeout time.Duration) *TimeoutConn { return &TimeoutConn{conn, timeout} } -func (c *TimeoutConn) Write(p []byte) (n int, err error) { +func (c *TimeoutConn) Write(p []byte) (n int, retErr error) { + log.Debug("start write", zap.Int("n", len(p))) + defer func() { + if retErr != nil { + log.Debug("conn write error", zap.Int("n", n), zap.Error(retErr)) + } + }() for { if c.timeout != 0 { if e := c.Conn.SetWriteDeadline(time.Now().Add(c.timeout)); e != nil { @@ -47,6 +53,7 @@ func (c *TimeoutConn) Write(p []byte) (n int, err error) { } log.Debug("connection timed out", zap.String("remoteAddr", c.RemoteAddr().String())) } + log.Debug("conn write", zap.Int("n", n)) return n, err } } diff --git a/net/transport/yamux/conn.go b/net/transport/yamux/conn.go index a3406bfa..77c38764 100644 --- a/net/transport/yamux/conn.go +++ b/net/transport/yamux/conn.go @@ -29,7 +29,7 @@ func (y *yamuxConn) Open(ctx context.Context) (conn net.Conn, err error) { if conn, err = y.Session.Open(); err != nil { return } - conn = connutil.NewTimeout(conn, time.Second*10) + //conn = connutil.NewTimeout(conn, time.Second*10) return } @@ -52,6 +52,6 @@ func (y *yamuxConn) Accept() (conn net.Conn, err error) { } return } - conn = connutil.NewTimeout(conn, time.Second*10) + //conn = connutil.NewTimeout(conn, time.Second*10) return } diff --git a/testutil/nettest/net_test.go b/testutil/nettest/net_test.go new file mode 100644 index 00000000..006d9dde --- /dev/null +++ b/testutil/nettest/net_test.go @@ -0,0 +1,114 @@ +package nettest + +import ( + "context" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/coordinator/coordinatorclient" + "github.com/anyproto/any-sync/coordinator/coordinatorproto" + "github.com/anyproto/any-sync/coordinator/nodeconfsource" + "github.com/anyproto/any-sync/net/peerservice" + "github.com/anyproto/any-sync/net/pool" + "github.com/anyproto/any-sync/net/rpc" + "github.com/anyproto/any-sync/net/rpc/server" + "github.com/anyproto/any-sync/net/secureservice" + "github.com/anyproto/any-sync/net/transport/yamux" + "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/any-sync/nodeconf/nodeconfstore" + "github.com/anyproto/any-sync/testutil/accounttest" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "net/http" + _ "net/http/pprof" + "testing" + "time" +) + +var ctx = context.Background() + +func init() { + go http.ListenAndServe(":6061", nil) +} + +func TestNet(t *testing.T) { + l := logger.NewNamed("test") + pl := pool.New() + a := new(app.App) + a.Register(&accounttest.AccountTestService{}). + Register(pl). + Register(peerservice.New()). + Register(yamux.New()). + Register(nodeconf.New()). + Register(nodeconfstore.New()). + Register(nodeconfsource.New()). + Register(server.New()). + Register(secureservice.New()). + Register(coordinatorclient.New()). + Register(&testConfig{}) + require.NoError(t, a.Start(ctx)) + defer a.Close(ctx) + + p, err := pl.Get(ctx, "12D3KooWB4hmEo7YAdWzAaFpjyk4npkcwrPm2kRigsWu3MP9Xdmg") + require.NoError(t, err) + + for i := 0; i < 1000000; i++ { + dc, err := p.AcquireDrpcConn(ctx) + require.NoError(t, err) + cl := coordinatorproto.NewDRPCCoordinatorClient(dc) + time.Sleep(time.Second) + res, err := cl.NetworkConfiguration(ctx, &coordinatorproto.NetworkConfigurationRequest{}) + if err != nil { + l.Warn("req error", zap.Error(err)) + } else { + + l.Info("req success", zap.String("nid", res.NetworkId)) + } + p.ReleaseDrpcConn(dc) + } + +} + +type testConfig struct { +} + +func (t *testConfig) GetDrpc() rpc.Config { + return rpc.Config{ + Stream: rpc.StreamConfig{ + MaxMsgSizeMb: 1, + }, + } +} + +func (t *testConfig) GetNodeConfStorePath() string { + return "/tmp" +} + +func (t *testConfig) GetNodeConf() nodeconf.Configuration { + return nodeconf.Configuration{ + Id: "", + NetworkId: "", + Nodes: []nodeconf.Node{ + { + PeerId: "12D3KooWB4hmEo7YAdWzAaFpjyk4npkcwrPm2kRigsWu3MP9Xdmg", + Addresses: []string{"ec2-3-65-13-48.eu-central-1.compute.amazonaws.com:443"}, + Types: []nodeconf.NodeType{nodeconf.NodeTypeTree}, + }, + }, + CreationTime: time.Now(), + } +} + +func (t *testConfig) GetYamux() yamux.Config { + return yamux.Config{ + WriteTimeoutSec: 10, + DialTimeoutSec: 10, + } +} + +func (t *testConfig) Init(a *app.App) (err error) { + return nil +} + +func (t *testConfig) Name() (name string) { + return "config" +}