Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d72189a41 | ||
|
|
f00bcfbc78 | ||
|
|
2f1ba8c705 |
9
go.mod
9
go.mod
@ -29,6 +29,7 @@ require (
|
||||
github.com/multiformats/go-multibase v0.2.0
|
||||
github.com/multiformats/go-multihash v0.2.3
|
||||
github.com/prometheus/client_golang v1.16.0
|
||||
github.com/quic-go/quic-go v0.36.2
|
||||
github.com/stretchr/testify v1.8.4
|
||||
github.com/tyler-smith/go-bip39 v1.1.0
|
||||
github.com/zeebo/blake3 v0.2.3
|
||||
@ -53,8 +54,11 @@ require (
|
||||
github.com/fogleman/gg v1.3.0 // indirect
|
||||
github.com/go-logr/logr v1.2.4 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 // indirect
|
||||
github.com/golang/mock v1.6.0 // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 // indirect
|
||||
github.com/hashicorp/golang-lru v0.5.4 // indirect
|
||||
github.com/ipfs/bbloom v0.0.4 // indirect
|
||||
github.com/ipfs/go-bitfield v1.1.0 // indirect
|
||||
@ -83,6 +87,7 @@ require (
|
||||
github.com/multiformats/go-multicodec v0.9.0 // indirect
|
||||
github.com/multiformats/go-multistream v0.4.1 // indirect
|
||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||
github.com/onsi/ginkgo/v2 v2.11.0 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
@ -90,6 +95,8 @@ require (
|
||||
github.com/prometheus/client_model v0.4.0 // indirect
|
||||
github.com/prometheus/common v0.44.0 // indirect
|
||||
github.com/prometheus/procfs v0.10.1 // indirect
|
||||
github.com/quic-go/qtls-go1-19 v0.3.2 // indirect
|
||||
github.com/quic-go/qtls-go1-20 v0.2.2 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect
|
||||
github.com/zeebo/errs v1.3.0 // indirect
|
||||
@ -97,8 +104,10 @@ require (
|
||||
go.opentelemetry.io/otel/trace v1.7.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/image v0.6.0 // indirect
|
||||
golang.org/x/mod v0.12.0 // indirect
|
||||
golang.org/x/sync v0.3.0 // indirect
|
||||
golang.org/x/sys v0.10.0 // indirect
|
||||
golang.org/x/tools v0.11.0 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
lukechampine.com/blake3 v1.2.1 // indirect
|
||||
)
|
||||
|
||||
13
go.sum
13
go.sum
@ -45,6 +45,7 @@ github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbV
|
||||
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
|
||||
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
|
||||
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
|
||||
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
|
||||
github.com/goccy/go-graphviz v0.1.1 h1:MGrsnzBxTyt7KG8FhHsFPDTGvF7UaQMmSa6A610DqPg=
|
||||
@ -54,6 +55,7 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 h1:DACJavvAHhabrF08vX0COfcOBJRhZ8lUbR+ZWIs0Y5g=
|
||||
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
|
||||
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
|
||||
github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs=
|
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
|
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
|
||||
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
|
||||
@ -63,6 +65,7 @@ github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8 h1:n6vlPhxsA+BW/XsS5+uqi7GyzaLa5MH7qlSLBZtRdiA=
|
||||
github.com/google/pprof v0.0.0-20230705174524-200ffdc848b8/go.mod h1:Jh3hGz2jkYak8qXPD19ryItVnUgpgeqzdkY/D0EaeuA=
|
||||
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
|
||||
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
@ -213,8 +216,9 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS
|
||||
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
|
||||
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
|
||||
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY=
|
||||
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||
github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU=
|
||||
github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM=
|
||||
github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc=
|
||||
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
|
||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
|
||||
@ -234,8 +238,11 @@ github.com/prometheus/procfs v0.10.1 h1:kYK1Va/YMlutzCGazswoHKo//tZVlFpKYh+Pymzi
|
||||
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
|
||||
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
|
||||
github.com/quic-go/qtls-go1-19 v0.3.2 h1:tFxjCFcTQzK+oMxG6Zcvp4Dq8dx4yD3dDiIiyc86Z5U=
|
||||
github.com/quic-go/qtls-go1-19 v0.3.2/go.mod h1:ySOI96ew8lnoKPtSqx2BlI5wCpUVPT05RMAlajtnyOI=
|
||||
github.com/quic-go/qtls-go1-20 v0.2.2 h1:WLOPx6OY/hxtTxKV1Zrq20FtXtDEkeY00CGQm8GEa3E=
|
||||
github.com/quic-go/qtls-go1-20 v0.2.2/go.mod h1:JKtK6mjbAVcUTN/9jZpvLbGxvdWIKS8uT7EiStoU1SM=
|
||||
github.com/quic-go/quic-go v0.36.2 h1:ZX/UNQ4gvpCv2RmwdbA6lrRjF6EBm5yZ7TMoT4NQVrA=
|
||||
github.com/quic-go/quic-go v0.36.2/go.mod h1:zPetvwDlILVxt15n3hr3Gf/I3mDf7LpLKPhR4Ez0AZQ=
|
||||
github.com/quic-go/webtransport-go v0.5.3 h1:5XMlzemqB4qmOlgIus5zB45AcZ2kCgCy2EptUrfOPWU=
|
||||
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
|
||||
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
|
||||
@ -247,6 +254,7 @@ github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
|
||||
@ -312,6 +320,7 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
|
||||
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
|
||||
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
|
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
|
||||
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
|
||||
@ -365,10 +374,12 @@ golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtn
|
||||
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
|
||||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
|
||||
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
|
||||
golang.org/x/tools v0.11.0 h1:EMCa6U9S2LtZXLAMoWiR/R8dAQFRqbAitmbJ2UKhoi8=
|
||||
golang.org/x/tools v0.11.0/go.mod h1:anzJrxPjNtfgiYQYirP2CPGzGLxrH2u2QBhn6Bf3qY8=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
||||
@ -34,9 +34,10 @@ func New() SecureService {
|
||||
|
||||
type SecureService interface {
|
||||
SecureOutbound(ctx context.Context, conn net.Conn) (cctx context.Context, err error)
|
||||
HandshakeOutbound(ctx context.Context, conn io.ReadWriteCloser, peerId string) (cctx context.Context, err error)
|
||||
SecureInbound(ctx context.Context, conn net.Conn) (cctx context.Context, err error)
|
||||
HandshakeInbound(ctx context.Context, conn io.ReadWriteCloser, remotePeerId string) (cctx context.Context, err error)
|
||||
ServerTlsConfig() (*tls.Config, error)
|
||||
TlsConfig() (*tls.Config, <-chan crypto.PubKey, error)
|
||||
app.Component
|
||||
}
|
||||
|
||||
@ -115,7 +116,10 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx
|
||||
if err != nil {
|
||||
return nil, handshake.HandshakeError{Err: err}
|
||||
}
|
||||
peerId := sc.RemotePeer().String()
|
||||
return s.HandshakeOutbound(ctx, sc, sc.RemotePeer().String())
|
||||
}
|
||||
|
||||
func (s *secureService) HandshakeOutbound(ctx context.Context, conn io.ReadWriteCloser, peerId string) (cctx context.Context, err error) {
|
||||
confTypes := s.nodeconf.NodeTypes(peerId)
|
||||
var checker handshake.CredentialChecker
|
||||
if len(confTypes) > 0 {
|
||||
@ -123,23 +127,23 @@ func (s *secureService) SecureOutbound(ctx context.Context, conn net.Conn) (cctx
|
||||
} else {
|
||||
checker = s.noVerifyChecker
|
||||
}
|
||||
res, err := handshake.OutgoingHandshake(ctx, sc, sc.RemotePeer().String(), checker)
|
||||
res, err := handshake.OutgoingHandshake(ctx, conn, peerId, checker)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cctx = context.Background()
|
||||
cctx = peer.CtxWithPeerId(cctx, sc.RemotePeer().String())
|
||||
cctx = peer.CtxWithPeerId(cctx, peerId)
|
||||
cctx = peer.CtxWithIdentity(cctx, res.Identity)
|
||||
cctx = peer.CtxWithClientVersion(cctx, res.ClientVersion)
|
||||
return cctx, nil
|
||||
}
|
||||
|
||||
func (s *secureService) ServerTlsConfig() (*tls.Config, error) {
|
||||
func (s *secureService) TlsConfig() (*tls.Config, <-chan crypto.PubKey, error) {
|
||||
p2pIdn, err := libp2ptls.NewIdentity(s.key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, nil, err
|
||||
}
|
||||
conf, _ := p2pIdn.ConfigForPeer("")
|
||||
conf, keyCh := p2pIdn.ConfigForPeer("")
|
||||
conf.NextProtos = []string{"anysync"}
|
||||
return conf, nil
|
||||
return conf, keyCh, nil
|
||||
}
|
||||
|
||||
12
net/transport/quic/config.go
Normal file
12
net/transport/quic/config.go
Normal file
@ -0,0 +1,12 @@
|
||||
package quic
|
||||
|
||||
type configGetter interface {
|
||||
GetQuic() Config
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
ListenAddrs []string `yaml:"listenAddrs"`
|
||||
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
|
||||
DialTimeoutSec int `yaml:"dialTimeoutSec"`
|
||||
MaxStreams int64 `yaml:"maxStreams"`
|
||||
}
|
||||
78
net/transport/quic/conn.go
Normal file
78
net/transport/quic/conn.go
Normal file
@ -0,0 +1,78 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anyproto/any-sync/net/transport"
|
||||
"github.com/quic-go/quic-go"
|
||||
"net"
|
||||
)
|
||||
|
||||
func newConn(cctx context.Context, qconn quic.Connection) transport.MultiConn {
|
||||
return &quicMultiConn{
|
||||
cctx: cctx,
|
||||
Connection: qconn,
|
||||
}
|
||||
}
|
||||
|
||||
type quicMultiConn struct {
|
||||
cctx context.Context
|
||||
quic.Connection
|
||||
}
|
||||
|
||||
func (q *quicMultiConn) Context() context.Context {
|
||||
return q.cctx
|
||||
}
|
||||
|
||||
func (q *quicMultiConn) Accept() (conn net.Conn, err error) {
|
||||
stream, err := q.Connection.AcceptStream(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return quicNetConn{
|
||||
Stream: stream,
|
||||
localAddr: q.LocalAddr(),
|
||||
remoteAddr: q.RemoteAddr(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *quicMultiConn) Open(ctx context.Context) (conn net.Conn, err error) {
|
||||
stream, err := q.OpenStreamSync(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return quicNetConn{
|
||||
Stream: stream,
|
||||
localAddr: q.LocalAddr(),
|
||||
remoteAddr: q.RemoteAddr(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (q *quicMultiConn) Addr() string {
|
||||
return q.RemoteAddr().String()
|
||||
}
|
||||
|
||||
func (q *quicMultiConn) IsClosed() bool {
|
||||
select {
|
||||
case <-q.Connection.Context().Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (q *quicMultiConn) Close() error {
|
||||
return q.Connection.CloseWithError(2, "")
|
||||
}
|
||||
|
||||
type quicNetConn struct {
|
||||
quic.Stream
|
||||
localAddr, remoteAddr net.Addr
|
||||
}
|
||||
|
||||
func (q quicNetConn) LocalAddr() net.Addr {
|
||||
return q.localAddr
|
||||
}
|
||||
|
||||
func (q quicNetConn) RemoteAddr() net.Addr {
|
||||
return q.remoteAddr
|
||||
}
|
||||
182
net/transport/quic/quic.go
Normal file
182
net/transport/quic/quic.go
Normal file
@ -0,0 +1,182 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/app/logger"
|
||||
"github.com/anyproto/any-sync/net/secureservice"
|
||||
"github.com/anyproto/any-sync/net/transport"
|
||||
libp2crypto "github.com/libp2p/go-libp2p/core/crypto"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
|
||||
"github.com/quic-go/quic-go"
|
||||
"go.uber.org/zap"
|
||||
"net"
|
||||
"time"
|
||||
)
|
||||
|
||||
const CName = "net.transport.quic"
|
||||
|
||||
var log = logger.NewNamed(CName)
|
||||
|
||||
func New() Quic {
|
||||
return new(quicTransport)
|
||||
}
|
||||
|
||||
type Quic interface {
|
||||
transport.Transport
|
||||
app.ComponentRunnable
|
||||
}
|
||||
|
||||
type quicTransport struct {
|
||||
secure secureservice.SecureService
|
||||
accepter transport.Accepter
|
||||
conf Config
|
||||
quicConf *quic.Config
|
||||
listeners []*quic.Listener
|
||||
listCtx context.Context
|
||||
listCtxCancel context.CancelFunc
|
||||
}
|
||||
|
||||
func (q *quicTransport) Init(a *app.App) (err error) {
|
||||
q.secure = a.MustComponent(secureservice.CName).(secureservice.SecureService)
|
||||
q.conf = a.MustComponent("config").(configGetter).GetQuic()
|
||||
q.quicConf = &quic.Config{
|
||||
HandshakeIdleTimeout: time.Duration(q.conf.DialTimeoutSec) * time.Second,
|
||||
MaxIncomingStreams: q.conf.MaxStreams,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicTransport) Name() (name string) {
|
||||
return CName
|
||||
}
|
||||
|
||||
func (q *quicTransport) SetAccepter(accepter transport.Accepter) {
|
||||
q.accepter = accepter
|
||||
}
|
||||
|
||||
func (q *quicTransport) Run(ctx context.Context) (err error) {
|
||||
if q.accepter == nil {
|
||||
return fmt.Errorf("can't run service without accepter")
|
||||
}
|
||||
tlConf, _, err := q.secure.TlsConfig()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
for _, listAddr := range q.conf.ListenAddrs {
|
||||
list, err := quic.ListenAddr(listAddr, tlConf, q.quicConf)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
q.listeners = append(q.listeners, list)
|
||||
}
|
||||
q.listCtx, q.listCtxCancel = context.WithCancel(context.Background())
|
||||
for _, list := range q.listeners {
|
||||
go q.acceptLoop(q.listCtx, list)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (q *quicTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) {
|
||||
tlsConf, keyCh, err := q.secure.TlsConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
qConn, err := quic.DialAddr(ctx, addr, tlsConf, q.quicConf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var remotePubKey libp2crypto.PubKey
|
||||
select {
|
||||
case remotePubKey = <-keyCh:
|
||||
default:
|
||||
}
|
||||
if remotePubKey == nil {
|
||||
_ = qConn.CloseWithError(1, "")
|
||||
return nil, fmt.Errorf("libp2p tls handshake bug: no key")
|
||||
}
|
||||
|
||||
remotePeerId, err := peer.IDFromPublicKey(remotePubKey)
|
||||
if err != nil {
|
||||
_ = qConn.CloseWithError(1, "")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
stream, err := qConn.OpenStreamSync(ctx)
|
||||
if err != nil {
|
||||
_ = qConn.CloseWithError(1, "")
|
||||
return nil, err
|
||||
}
|
||||
defer func() {
|
||||
_ = stream.Close()
|
||||
}()
|
||||
|
||||
cctx, err := q.secure.HandshakeOutbound(ctx, stream, remotePeerId.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newConn(cctx, qConn), nil
|
||||
}
|
||||
|
||||
func (q *quicTransport) acceptLoop(ctx context.Context, list *quic.Listener) {
|
||||
l := log.With(zap.String("localAddr", list.Addr().String()))
|
||||
l.Info("quic listener started")
|
||||
defer func() {
|
||||
l.Debug("quic listener stopped")
|
||||
}()
|
||||
for {
|
||||
conn, err := list.Accept(ctx)
|
||||
if err != nil {
|
||||
if err != net.ErrClosed {
|
||||
l.Error("listener closed with error", zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
if err := q.accept(conn); err != nil {
|
||||
l.Info("accept error", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (q *quicTransport) accept(conn quic.Connection) (err error) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(q.conf.DialTimeoutSec))
|
||||
defer cancel()
|
||||
remotePubKey, err := libp2ptls.PubKeyFromCertChain(conn.ConnectionState().TLS.PeerCertificates)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
remotePeerId, err := peer.IDFromPublicKey(remotePubKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// wait new stream for any handshake
|
||||
stream, err := conn.AcceptStream(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
_ = stream.Close()
|
||||
}()
|
||||
cctx, err := q.secure.HandshakeInbound(ctx, stream, remotePeerId.String())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
mc := newConn(cctx, conn)
|
||||
return q.accepter.Accept(mc)
|
||||
}
|
||||
|
||||
func (q *quicTransport) Close(ctx context.Context) (err error) {
|
||||
if q.listCtx != nil {
|
||||
q.listCtxCancel()
|
||||
}
|
||||
for _, lis := range q.listeners {
|
||||
_ = lis.Close()
|
||||
}
|
||||
return
|
||||
}
|
||||
255
net/transport/quic/quic_test.go
Normal file
255
net/transport/quic/quic_test.go
Normal file
@ -0,0 +1,255 @@
|
||||
package quic
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/net/secureservice"
|
||||
"github.com/anyproto/any-sync/net/transport"
|
||||
"github.com/anyproto/any-sync/nodeconf"
|
||||
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
|
||||
"github.com/anyproto/any-sync/testutil/accounttest"
|
||||
"github.com/anyproto/any-sync/testutil/testnodeconf"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/mock/gomock"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ctx = context.Background()
|
||||
|
||||
func TestQuicTransport_Dial(t *testing.T) {
|
||||
fxS := newFixture(t)
|
||||
defer fxS.finish(t)
|
||||
fxC := newFixture(t)
|
||||
defer fxC.finish(t)
|
||||
|
||||
mcC, err := fxC.Dial(ctx, fxS.addr)
|
||||
require.NoError(t, err)
|
||||
var mcS transport.MultiConn
|
||||
select {
|
||||
case mcS = <-fxS.accepter.mcs:
|
||||
case <-time.After(time.Second * 5):
|
||||
require.True(t, false, "timeout")
|
||||
}
|
||||
|
||||
var (
|
||||
sData string
|
||||
acceptErr error
|
||||
copyErr error
|
||||
done = make(chan struct{})
|
||||
)
|
||||
|
||||
go func() {
|
||||
defer close(done)
|
||||
conn, serr := mcS.Accept()
|
||||
if serr != nil {
|
||||
acceptErr = serr
|
||||
return
|
||||
}
|
||||
buf := bytes.NewBuffer(nil)
|
||||
_, copyErr = io.Copy(buf, conn)
|
||||
sData = buf.String()
|
||||
return
|
||||
}()
|
||||
|
||||
conn, err := mcC.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
data := "some data"
|
||||
_, err = conn.Write([]byte(data))
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, conn.Close())
|
||||
<-done
|
||||
|
||||
assert.NoError(t, acceptErr)
|
||||
assert.Equal(t, data, sData)
|
||||
assert.NoError(t, copyErr)
|
||||
}
|
||||
|
||||
// no deadline - 69100 rps
|
||||
// common write deadline - 66700 rps
|
||||
// subconn write deadline - 67100 rps
|
||||
func TestWriteBenchReuse(t *testing.T) {
|
||||
t.Skip()
|
||||
var (
|
||||
numSubConn = 10
|
||||
numWrites = 10000
|
||||
)
|
||||
|
||||
fxS := newFixture(t)
|
||||
defer fxS.finish(t)
|
||||
fxC := newFixture(t)
|
||||
defer fxC.finish(t)
|
||||
|
||||
mcC, err := fxC.Dial(ctx, fxS.addr)
|
||||
require.NoError(t, err)
|
||||
mcS := <-fxS.accepter.mcs
|
||||
|
||||
go func() {
|
||||
for i := 0; i < numSubConn; i++ {
|
||||
conn, err := mcS.Accept()
|
||||
require.NoError(t, err)
|
||||
go func(sc net.Conn) {
|
||||
var b = make([]byte, 1024)
|
||||
for {
|
||||
n, _ := sc.Read(b)
|
||||
if n > 0 {
|
||||
sc.Write(b[:n])
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}(conn)
|
||||
}
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numSubConn)
|
||||
st := time.Now()
|
||||
for i := 0; i < numSubConn; i++ {
|
||||
conn, err := mcC.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
go func(sc net.Conn) {
|
||||
defer sc.Close()
|
||||
defer wg.Done()
|
||||
for j := 0; j < numWrites; j++ {
|
||||
var b = []byte("some data some data some data some data some data some data some data some data some data")
|
||||
sc.Write(b)
|
||||
sc.Read(b)
|
||||
}
|
||||
}(conn)
|
||||
}
|
||||
wg.Wait()
|
||||
dur := time.Since(st)
|
||||
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
|
||||
}
|
||||
|
||||
func TestWriteBenchNew(t *testing.T) {
|
||||
t.Skip()
|
||||
var (
|
||||
numSubConn = 10
|
||||
numWrites = 10000
|
||||
)
|
||||
|
||||
fxS := newFixture(t)
|
||||
defer fxS.finish(t)
|
||||
fxC := newFixture(t)
|
||||
defer fxC.finish(t)
|
||||
|
||||
mcC, err := fxC.Dial(ctx, fxS.addr)
|
||||
require.NoError(t, err)
|
||||
mcS := <-fxS.accepter.mcs
|
||||
|
||||
go func() {
|
||||
for i := 0; i < numSubConn; i++ {
|
||||
require.NoError(t, err)
|
||||
go func() {
|
||||
var b = make([]byte, 1024)
|
||||
for {
|
||||
conn, _ := mcS.Accept()
|
||||
n, _ := conn.Read(b)
|
||||
if n > 0 {
|
||||
conn.Write(b[:n])
|
||||
} else {
|
||||
_ = conn.Close()
|
||||
break
|
||||
}
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(numSubConn)
|
||||
st := time.Now()
|
||||
for i := 0; i < numSubConn; i++ {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for j := 0; j < numWrites; j++ {
|
||||
sc, err := mcC.Open(ctx)
|
||||
require.NoError(t, err)
|
||||
var b = []byte("some data some data some data some data some data some data some data some data some data")
|
||||
sc.Write(b)
|
||||
sc.Read(b)
|
||||
sc.Close()
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
dur := time.Since(st)
|
||||
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
|
||||
}
|
||||
|
||||
type fixture struct {
|
||||
*quicTransport
|
||||
a *app.App
|
||||
ctrl *gomock.Controller
|
||||
mockNodeConf *mock_nodeconf.MockService
|
||||
acc *accounttest.AccountTestService
|
||||
accepter *testAccepter
|
||||
addr string
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T) *fixture {
|
||||
fx := &fixture{
|
||||
quicTransport: New().(*quicTransport),
|
||||
ctrl: gomock.NewController(t),
|
||||
acc: &accounttest.AccountTestService{},
|
||||
accepter: &testAccepter{mcs: make(chan transport.MultiConn, 100)},
|
||||
a: new(app.App),
|
||||
}
|
||||
|
||||
fx.mockNodeConf = mock_nodeconf.NewMockService(fx.ctrl)
|
||||
fx.mockNodeConf.EXPECT().Init(gomock.Any())
|
||||
fx.mockNodeConf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
|
||||
fx.mockNodeConf.EXPECT().Run(ctx)
|
||||
fx.mockNodeConf.EXPECT().Close(ctx)
|
||||
fx.mockNodeConf.EXPECT().NodeTypes(gomock.Any()).Return([]nodeconf.NodeType{nodeconf.NodeTypeTree}).AnyTimes()
|
||||
fx.a.Register(fx.acc).Register(newTestConf()).Register(fx.mockNodeConf).Register(secureservice.New()).Register(fx.quicTransport).Register(fx.accepter)
|
||||
require.NoError(t, fx.a.Start(ctx))
|
||||
fx.addr = fx.listeners[0].Addr().String()
|
||||
return fx
|
||||
}
|
||||
|
||||
func (fx *fixture) finish(t *testing.T) {
|
||||
require.NoError(t, fx.a.Close(ctx))
|
||||
fx.ctrl.Finish()
|
||||
}
|
||||
|
||||
func newTestConf() *testConf {
|
||||
return &testConf{testnodeconf.GenNodeConfig(1)}
|
||||
}
|
||||
|
||||
type testConf struct {
|
||||
*testnodeconf.Config
|
||||
}
|
||||
|
||||
func (c *testConf) GetQuic() Config {
|
||||
return Config{
|
||||
ListenAddrs: []string{"127.0.0.1:0"},
|
||||
WriteTimeoutSec: 10,
|
||||
DialTimeoutSec: 10,
|
||||
}
|
||||
}
|
||||
|
||||
type testAccepter struct {
|
||||
err error
|
||||
mcs chan transport.MultiConn
|
||||
}
|
||||
|
||||
func (t *testAccepter) Accept(mc transport.MultiConn) (err error) {
|
||||
t.mcs <- mc
|
||||
return t.err
|
||||
}
|
||||
|
||||
func (t *testAccepter) Init(a *app.App) (err error) {
|
||||
a.MustComponent(CName).(transport.Transport).SetAccepter(t)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *testAccepter) Name() (name string) { return "testAccepter" }
|
||||
Loading…
x
Reference in New Issue
Block a user