From 2f1ba8c7059c6aa18fd1796bbf2852f12963ce67 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 7 Jun 2023 13:31:40 +0200 Subject: [PATCH] wip: quic transport --- net/transport/quic/config.go | 12 +++ net/transport/quic/conn.go | 52 +++++++++++++ net/transport/quic/quic.go | 144 +++++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+) create mode 100644 net/transport/quic/config.go create mode 100644 net/transport/quic/conn.go create mode 100644 net/transport/quic/quic.go diff --git a/net/transport/quic/config.go b/net/transport/quic/config.go new file mode 100644 index 00000000..a08dbbda --- /dev/null +++ b/net/transport/quic/config.go @@ -0,0 +1,12 @@ +package quic + +type configGetter interface { + GetQuic() Config +} + +type Config struct { + ListenAddrs []string `yaml:"listenAddrs"` + WriteTimeoutSec int `yaml:"writeTimeoutSec"` + DialTimeoutSec int `yaml:"dialTimeoutSec"` + MaxStreams int64 `yaml:"maxStreams"` +} diff --git a/net/transport/quic/conn.go b/net/transport/quic/conn.go new file mode 100644 index 00000000..3154bdec --- /dev/null +++ b/net/transport/quic/conn.go @@ -0,0 +1,52 @@ +package quic + +import ( + "context" + "github.com/anyproto/any-sync/net/transport" + "github.com/quic-go/quic-go" + "net" + "time" +) + +func newConn(cctx context.Context, qconn quic.Connection) transport.MultiConn { + return &quicMultiConn{qconn} +} + +type quicMultiConn struct { + quic.Connection +} + +func (q *quicMultiConn) Context() context.Context { + //TODO implement me + panic("implement me") +} + +func (q *quicMultiConn) Accept() (conn net.Conn, err error) { + //TODO implement me + panic("implement me") +} + +func (q *quicMultiConn) Open(ctx context.Context) (conn net.Conn, err error) { + //TODO implement me + panic("implement me") +} + +func (q *quicMultiConn) LastUsage() time.Time { + //TODO implement me + panic("implement me") +} + +func (q *quicMultiConn) Addr() string { + //TODO implement me + panic("implement me") +} + +func (q *quicMultiConn) IsClosed() bool { + //TODO implement me + panic("implement me") +} + +func (q *quicMultiConn) Close() error { + //TODO implement me + panic("implement me") +} diff --git a/net/transport/quic/quic.go b/net/transport/quic/quic.go new file mode 100644 index 00000000..797c28a2 --- /dev/null +++ b/net/transport/quic/quic.go @@ -0,0 +1,144 @@ +package quic + +import ( + "context" + "fmt" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/net/secureservice" + "github.com/anyproto/any-sync/net/transport" + "github.com/libp2p/go-libp2p/core/peer" + libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" + "github.com/quic-go/quic-go" + "go.uber.org/zap" + "net" + "time" +) + +const CName = "net.transport.quic" + +var log = logger.NewNamed(CName) + +func New() Quic { + return new(quicTransport) +} + +type Quic interface { + transport.Transport + app.ComponentRunnable +} + +type quicTransport struct { + secure secureservice.SecureService + accepter transport.Accepter + conf Config + quicConf *quic.Config + listeners []*quic.Listener + listCtx context.Context + listCtxCancel context.CancelFunc +} + +func (q *quicTransport) Init(a *app.App) (err error) { + q.secure = a.MustComponent(secureservice.CName).(secureservice.SecureService) + q.conf = a.MustComponent("config").(configGetter).GetQuic() + q.quicConf = &quic.Config{ + HandshakeIdleTimeout: time.Duration(q.conf.DialTimeoutSec) * time.Second, + MaxIncomingStreams: q.conf.MaxStreams, + } + return +} + +func (q *quicTransport) Name() (name string) { + return CName +} + +func (q *quicTransport) SetAccepter(accepter transport.Accepter) { + q.accepter = accepter +} + +func (q *quicTransport) Run(ctx context.Context) (err error) { + if q.accepter == nil { + return fmt.Errorf("can't run service without accepter") + } + tlConf, err := q.secure.ServerTlsConfig() + if err != nil { + return + } + for _, listAddr := range q.conf.ListenAddrs { + list, err := quic.ListenAddr(listAddr, tlConf, q.quicConf) + if err != nil { + return err + } + q.listeners = append(q.listeners, list) + } + q.listCtx, q.listCtxCancel = context.WithCancel(context.Background()) + for _, list := range q.listeners { + go q.acceptLoop(q.listCtx, list) + } + return +} + +func (q *quicTransport) Dial(ctx context.Context, addr string) (mc transport.MultiConn, err error) { + //TODO implement me + panic("implement me") +} + +func (q *quicTransport) acceptLoop(ctx context.Context, list *quic.Listener) { + l := log.With(zap.String("localAddr", list.Addr().String())) + l.Info("quic listener started") + defer func() { + l.Debug("quic listener stopped") + }() + for { + conn, err := list.Accept(ctx) + if err != nil { + if err != net.ErrClosed { + l.Error("listener closed with error", zap.Error(err)) + } + return + } + go func() { + if err := q.accept(conn); err != nil { + l.Info("accept error", zap.Error(err)) + } + }() + } +} + +func (q *quicTransport) accept(conn quic.Connection) (err error) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(q.conf.DialTimeoutSec)) + defer cancel() + remotePubKey, err := libp2ptls.PubKeyFromCertChain(conn.ConnectionState().TLS.PeerCertificates) + if err != nil { + return err + } + remotePeerId, err := peer.IDFromPublicKey(remotePubKey) + if err != nil { + return err + } + + // wait new stream for any handshake + stream, err := conn.AcceptStream(ctx) + if err != nil { + return err + } + defer func() { + _ = stream.Close() + }() + cctx, err := q.secure.HandshakeInbound(ctx, stream, remotePeerId.String()) + if err != nil { + return + } + mc := newConn(cctx, conn) + return q.accepter.Accept(mc) +} + +func (q *quicTransport) Close(ctx context.Context) (err error) { + if q.listCtx != nil { + q.listCtxCancel() + } + for _, lis := range q.listeners { + _ = lis.Close() + } + return +}