diff --git a/go.mod b/go.mod index aa746768..f07d5928 100644 --- a/go.mod +++ b/go.mod @@ -29,6 +29,7 @@ require ( github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multihash v0.2.3 github.com/prometheus/client_golang v1.16.0 + github.com/quic-go/quic-go v0.36.2 github.com/stretchr/testify v1.8.4 github.com/tyler-smith/go-bip39 v1.1.0 github.com/zeebo/blake3 v0.2.3 @@ -53,8 +54,11 @@ require ( github.com/fogleman/gg v1.3.0 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect + github.com/golang/mock v1.6.0 // indirect github.com/golang/protobuf v1.5.3 // indirect + github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/ipfs/bbloom v0.0.4 // indirect github.com/ipfs/go-bitfield v1.1.0 // indirect @@ -83,6 +87,7 @@ require ( github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multistream v0.4.1 // indirect github.com/multiformats/go-varint v0.0.7 // indirect + github.com/onsi/ginkgo/v2 v2.11.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect @@ -90,6 +95,8 @@ require ( github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.44.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect + github.com/quic-go/qtls-go1-19 v0.3.2 // indirect + github.com/quic-go/qtls-go1-20 v0.2.2 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect github.com/zeebo/errs v1.3.0 // indirect @@ -97,8 +104,10 @@ require ( go.opentelemetry.io/otel/trace v1.7.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/image v0.6.0 // indirect + golang.org/x/mod v0.12.0 // indirect golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.10.0 // indirect + golang.org/x/tools v0.11.0 // indirect google.golang.org/protobuf v1.30.0 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/go.sum b/go.sum index 28c4b775..099f2ed9 100644 --- a/go.sum +++ b/go.sum @@ -45,6 +45,7 @@ github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= +github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y= github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8= github.com/goccy/go-graphviz v0.1.1 h1:MGrsnzBxTyt7KG8FhHsFPDTGvF7UaQMmSa6A610DqPg= @@ -54,6 +55,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69 github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= +github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= @@ -63,6 +65,7 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8 github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA= +github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= @@ -213,8 +216,9 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY= -github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= +github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= +github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -234,8 +238,11 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo= github.com/quic-go/qtls-go1-19 v0.3.2 h1:tFxjCFcTQzK+oMxG6Zcvp4Dq8dx4yD3dDiIiyc86Z5U= +github.com/quic-go/qtls-go1-19 v0.3.2/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI= github.com/quic-go/qtls-go1-20 v0.2.2 h1:WLOPx6OY/hxtTxKV1Zrq20FtXtDEkeY00CGQm8GEa3E= +github.com/quic-go/qtls-go1-20 v0.2.2/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM= github.com/quic-go/quic-go v0.36.2 h1:ZX/UNQ4gvpCv2RmwdbA6lrRjF6EBm5yZ7TMoT4NQVrA= +github.com/quic-go/quic-go v0.36.2/go.mod h1:zPetvwDlILVxt15n3hr3Gf/I3mDf7LpLKPhR4Ez0AZQ= github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= @@ -247,6 +254,7 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= @@ -312,6 +320,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc= +golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= @@ -365,10 +374,12 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8= +golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/net/secureservice/secureservice.go b/net/secureservice/secureservice.go index f5a1aa78..86b5e07b 100644 --- a/net/secureservice/secureservice.go +++ b/net/secureservice/secureservice.go @@ -34,9 +34,10 @@ func New() SecureService { type SecureService interface { SecureOutbound(ctx context.Context, conn net.Conn) (cctx context.Context, err error) + HandshakeOutbound(ctx context.Context, conn io.ReadWriteCloser, peerId string) (cctx context.Context, err error) SecureInbound(ctx context.Context, conn net.Conn) (cctx context.Context, err error) HandshakeInbound(ctx context.Context, conn io.ReadWriteCloser, remotePeerId string) (cctx context.Context, err error) - ServerTlsConfig() (*tls.Config, error) + TlsConfig() (*tls.Config, <-chan crypto.PubKey, error) app.Component } @@ -115,7 +116,10 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx if err != nil { return nil, handshake.HandshakeError{Err: err} } - peerId := sc.RemotePeer().String() + return s.HandshakeOutbound(ctx, sc, sc.RemotePeer().String()) +} + +func (s *secureService) HandshakeOutbound(ctx context.Context, conn io.ReadWriteCloser, peerId string) (cctx context.Context, err error) { confTypes := s.nodeconf.NodeTypes(peerId) var checker handshake.CredentialChecker if len(confTypes) > 0 { @@ -123,23 +127,23 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx } else { checker = s.noVerifyChecker } - res, err := handshake.OutgoingHandshake(ctx, sc, sc.RemotePeer().String(), checker) + res, err := handshake.OutgoingHandshake(ctx, conn, peerId, checker) if err != nil { return nil, err } cctx = context.Background() - cctx = peer.CtxWithPeerId(cctx, sc.RemotePeer().String()) + cctx = peer.CtxWithPeerId(cctx, peerId) cctx = peer.CtxWithIdentity(cctx, res.Identity) cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion) return cctx, nil } -func (s *secureService) ServerTlsConfig() (*tls.Config, error) { +func (s *secureService) TlsConfig() (*tls.Config, <-chan crypto.PubKey, error) { p2pIdn, err := libp2ptls.NewIdentity(s.key) if err != nil { - return nil, err + return nil, nil, err } - conf, _ := p2pIdn.ConfigForPeer("") + conf, keyCh := p2pIdn.ConfigForPeer("") conf.NextProtos = []string{"anysync"} - return conf, nil + return conf, keyCh, nil } diff --git a/net/transport/quic/conn.go b/net/transport/quic/conn.go index 3154bdec..5ed704af 100644 --- a/net/transport/quic/conn.go +++ b/net/transport/quic/conn.go @@ -5,48 +5,74 @@ import ( "github.com/anyproto/any-sync/net/transport" "github.com/quic-go/quic-go" "net" - "time" ) func newConn(cctx context.Context, qconn quic.Connection) transport.MultiConn { - return &quicMultiConn{qconn} + return &quicMultiConn{ + cctx: cctx, + Connection: qconn, + } } type quicMultiConn struct { + cctx context.Context quic.Connection } func (q *quicMultiConn) Context() context.Context { - //TODO implement me - panic("implement me") + return q.cctx } func (q *quicMultiConn) Accept() (conn net.Conn, err error) { - //TODO implement me - panic("implement me") + stream, err := q.Connection.AcceptStream(context.Background()) + if err != nil { + return nil, err + } + return quicNetConn{ + Stream: stream, + localAddr: q.LocalAddr(), + remoteAddr: q.RemoteAddr(), + }, nil } func (q *quicMultiConn) Open(ctx context.Context) (conn net.Conn, err error) { - //TODO implement me - panic("implement me") -} - -func (q *quicMultiConn) LastUsage() time.Time { - //TODO implement me - panic("implement me") + stream, err := q.OpenStreamSync(ctx) + if err != nil { + return nil, err + } + return quicNetConn{ + Stream: stream, + localAddr: q.LocalAddr(), + remoteAddr: q.RemoteAddr(), + }, nil } func (q *quicMultiConn) Addr() string { - //TODO implement me - panic("implement me") + return q.RemoteAddr().String() } func (q *quicMultiConn) IsClosed() bool { - //TODO implement me - panic("implement me") + select { + case <-q.Connection.Context().Done(): + return true + default: + return false + } } func (q *quicMultiConn) Close() error { - //TODO implement me - panic("implement me") + return q.Connection.CloseWithError(2, "") +} + +type quicNetConn struct { + quic.Stream + localAddr, remoteAddr net.Addr +} + +func (q quicNetConn) LocalAddr() net.Addr { + return q.localAddr +} + +func (q quicNetConn) RemoteAddr() net.Addr { + return q.remoteAddr } diff --git a/net/transport/quic/quic.go b/net/transport/quic/quic.go index 797c28a2..5024e38e 100644 --- a/net/transport/quic/quic.go +++ b/net/transport/quic/quic.go @@ -7,6 +7,7 @@ import ( "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/net/secureservice" "github.com/anyproto/any-sync/net/transport" + libp2crypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" "github.com/quic-go/quic-go" @@ -60,7 +61,7 @@ func (q *quicTransport) Run(ctx context.Context) (err error) { if q.accepter == nil { return fmt.Errorf("can't run service without accepter") } - tlConf, err := q.secure.ServerTlsConfig() + tlConf, _, err := q.secure.TlsConfig() if err != nil { return } @@ -79,8 +80,45 @@ func (q *quicTransport) Run(ctx context.Context) (err error) { } func (q *quicTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) { - //TODO implement me - panic("implement me") + tlsConf, keyCh, err := q.secure.TlsConfig() + if err != nil { + return nil, err + } + qConn, err := quic.DialAddr(ctx, addr, tlsConf, q.quicConf) + if err != nil { + return nil, err + } + var remotePubKey libp2crypto.PubKey + select { + case remotePubKey = <-keyCh: + default: + } + if remotePubKey == nil { + _ = qConn.CloseWithError(1, "") + return nil, fmt.Errorf("libp2p tls handshake bug: no key") + } + + remotePeerId, err := peer.IDFromPublicKey(remotePubKey) + if err != nil { + _ = qConn.CloseWithError(1, "") + return nil, err + } + + stream, err := qConn.OpenStreamSync(ctx) + if err != nil { + _ = qConn.CloseWithError(1, "") + return nil, err + } + defer func() { + _ = stream.Close() + }() + + cctx, err := q.secure.HandshakeOutbound(ctx, stream, remotePeerId.String()) + if err != nil { + return nil, err + } + + return newConn(cctx, qConn), nil } func (q *quicTransport) acceptLoop(ctx context.Context, list *quic.Listener) { diff --git a/net/transport/quic/quic_test.go b/net/transport/quic/quic_test.go new file mode 100644 index 00000000..2d47aa8c --- /dev/null +++ b/net/transport/quic/quic_test.go @@ -0,0 +1,255 @@ +package quic + +import ( + "bytes" + "context" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/net/secureservice" + "github.com/anyproto/any-sync/net/transport" + "github.com/anyproto/any-sync/nodeconf" + "github.com/anyproto/any-sync/nodeconf/mock_nodeconf" + "github.com/anyproto/any-sync/testutil/accounttest" + "github.com/anyproto/any-sync/testutil/testnodeconf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "io" + "net" + "sync" + "testing" + "time" +) + +var ctx = context.Background() + +func TestQuicTransport_Dial(t *testing.T) { + fxS := newFixture(t) + defer fxS.finish(t) + fxC := newFixture(t) + defer fxC.finish(t) + + mcC, err := fxC.Dial(ctx, fxS.addr) + require.NoError(t, err) + var mcS transport.MultiConn + select { + case mcS = <-fxS.accepter.mcs: + case <-time.After(time.Second * 5): + require.True(t, false, "timeout") + } + + var ( + sData string + acceptErr error + copyErr error + done = make(chan struct{}) + ) + + go func() { + defer close(done) + conn, serr := mcS.Accept() + if serr != nil { + acceptErr = serr + return + } + buf := bytes.NewBuffer(nil) + _, copyErr = io.Copy(buf, conn) + sData = buf.String() + return + }() + + conn, err := mcC.Open(ctx) + require.NoError(t, err) + data := "some data" + _, err = conn.Write([]byte(data)) + require.NoError(t, err) + require.NoError(t, conn.Close()) + <-done + + assert.NoError(t, acceptErr) + assert.Equal(t, data, sData) + assert.NoError(t, copyErr) +} + +// no deadline - 69100 rps +// common write deadline - 66700 rps +// subconn write deadline - 67100 rps +func TestWriteBenchReuse(t *testing.T) { + t.Skip() + var ( + numSubConn = 10 + numWrites = 10000 + ) + + fxS := newFixture(t) + defer fxS.finish(t) + fxC := newFixture(t) + defer fxC.finish(t) + + mcC, err := fxC.Dial(ctx, fxS.addr) + require.NoError(t, err) + mcS := <-fxS.accepter.mcs + + go func() { + for i := 0; i < numSubConn; i++ { + conn, err := mcS.Accept() + require.NoError(t, err) + go func(sc net.Conn) { + var b = make([]byte, 1024) + for { + n, _ := sc.Read(b) + if n > 0 { + sc.Write(b[:n]) + } else { + break + } + } + }(conn) + } + }() + + var wg sync.WaitGroup + wg.Add(numSubConn) + st := time.Now() + for i := 0; i < numSubConn; i++ { + conn, err := mcC.Open(ctx) + require.NoError(t, err) + go func(sc net.Conn) { + defer sc.Close() + defer wg.Done() + for j := 0; j < numWrites; j++ { + var b = []byte("some data some data some data some data some data some data some data some data some data") + sc.Write(b) + sc.Read(b) + } + }(conn) + } + wg.Wait() + dur := time.Since(st) + t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds()) +} + +func TestWriteBenchNew(t *testing.T) { + t.Skip() + var ( + numSubConn = 10 + numWrites = 10000 + ) + + fxS := newFixture(t) + defer fxS.finish(t) + fxC := newFixture(t) + defer fxC.finish(t) + + mcC, err := fxC.Dial(ctx, fxS.addr) + require.NoError(t, err) + mcS := <-fxS.accepter.mcs + + go func() { + for i := 0; i < numSubConn; i++ { + require.NoError(t, err) + go func() { + var b = make([]byte, 1024) + for { + conn, _ := mcS.Accept() + n, _ := conn.Read(b) + if n > 0 { + conn.Write(b[:n]) + } else { + _ = conn.Close() + break + } + conn.Close() + } + }() + } + }() + + var wg sync.WaitGroup + wg.Add(numSubConn) + st := time.Now() + for i := 0; i < numSubConn; i++ { + go func() { + defer wg.Done() + for j := 0; j < numWrites; j++ { + sc, err := mcC.Open(ctx) + require.NoError(t, err) + var b = []byte("some data some data some data some data some data some data some data some data some data") + sc.Write(b) + sc.Read(b) + sc.Close() + } + }() + } + wg.Wait() + dur := time.Since(st) + t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds()) +} + +type fixture struct { + *quicTransport + a *app.App + ctrl *gomock.Controller + mockNodeConf *mock_nodeconf.MockService + acc *accounttest.AccountTestService + accepter *testAccepter + addr string +} + +func newFixture(t *testing.T) *fixture { + fx := &fixture{ + quicTransport: New().(*quicTransport), + ctrl: gomock.NewController(t), + acc: &accounttest.AccountTestService{}, + accepter: &testAccepter{mcs: make(chan transport.MultiConn, 100)}, + a: new(app.App), + } + + fx.mockNodeConf = mock_nodeconf.NewMockService(fx.ctrl) + fx.mockNodeConf.EXPECT().Init(gomock.Any()) + fx.mockNodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes() + fx.mockNodeConf.EXPECT().Run(ctx) + fx.mockNodeConf.EXPECT().Close(ctx) + fx.mockNodeConf.EXPECT().NodeTypes(gomock.Any()).Return([]nodeconf.NodeType{nodeconf.NodeTypeTree}).AnyTimes() + fx.a.Register(fx.acc).Register(newTestConf()).Register(fx.mockNodeConf).Register(secureservice.New()).Register(fx.quicTransport).Register(fx.accepter) + require.NoError(t, fx.a.Start(ctx)) + fx.addr = fx.listeners[0].Addr().String() + return fx +} + +func (fx *fixture) finish(t *testing.T) { + require.NoError(t, fx.a.Close(ctx)) + fx.ctrl.Finish() +} + +func newTestConf() *testConf { + return &testConf{testnodeconf.GenNodeConfig(1)} +} + +type testConf struct { + *testnodeconf.Config +} + +func (c *testConf) GetQuic() Config { + return Config{ + ListenAddrs: []string{"127.0.0.1:0"}, + WriteTimeoutSec: 10, + DialTimeoutSec: 10, + } +} + +type testAccepter struct { + err error + mcs chan transport.MultiConn +} + +func (t *testAccepter) Accept(mc transport.MultiConn) (err error) { + t.mcs <- mc + return t.err +} + +func (t *testAccepter) Init(a *app.App) (err error) { + a.MustComponent(CName).(transport.Transport).SetAccepter(t) + return nil +} + +func (t *testAccepter) Name() (name string) { return "testAccepter" }