From 7958b43da6636f742f848b30e98d19f28b8da4a9 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Fri, 26 May 2023 20:06:22 +0200 Subject: [PATCH] peer service --- net/dialer/dialer.go | 137 --------------------------------- net/peer/peer.go | 75 +++--------------- net/peerservice/peerservice.go | 107 +++++++++++++++++++++++++ net/pool/pool.go | 22 ++---- net/pool/pool_test.go | 39 ++++++---- net/pool/poolservice.go | 12 ++- net/transport/transport.go | 4 + net/transport/yamux/conn.go | 5 ++ net/transport/yamux/yamux.go | 2 + 9 files changed, 167 insertions(+), 236 deletions(-) delete mode 100644 net/dialer/dialer.go create mode 100644 net/peerservice/peerservice.go diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go deleted file mode 100644 index f585c1b4..00000000 --- a/net/dialer/dialer.go +++ /dev/null @@ -1,137 +0,0 @@ -package dialer - -import ( - "context" - "errors" - "fmt" - "github.com/anyproto/any-sync/app" - "github.com/anyproto/any-sync/app/logger" - net2 "github.com/anyproto/any-sync/net" - "github.com/anyproto/any-sync/net/connutil" - "github.com/anyproto/any-sync/net/peer" - "github.com/anyproto/any-sync/net/secureservice" - "github.com/anyproto/any-sync/net/secureservice/handshake" - "github.com/anyproto/any-sync/nodeconf" - "github.com/libp2p/go-libp2p/core/sec" - "go.uber.org/zap" - "net" - "storj.io/drpc" - "storj.io/drpc/drpcconn" - "storj.io/drpc/drpcmanager" - "storj.io/drpc/drpcwire" - "sync" - "time" -) - -const CName = "common.net.dialer" - -var ( - ErrAddrsNotFound = errors.New("addrs for peer not found") - ErrPeerIdIsUnexpected = errors.New("expected to connect with other peer id") -) - -var log = logger.NewNamed(CName) - -func New() Dialer { - return &dialer{peerAddrs: map[string][]string{}} -} - -type Dialer interface { - Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) - SetPeerAddrs(peerId string, addrs []string) - app.Component -} - -type dialer struct { - transport secureservice.SecureService - config net2.Config - nodeConf nodeconf.NodeConf - peerAddrs map[string][]string - - mu sync.RWMutex -} - -func (d *dialer) Init(a *app.App) (err error) { - d.transport = a.MustComponent(secureservice.CName).(secureservice.SecureService) - d.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) - d.config = a.MustComponent("config").(net2.ConfigGetter).GetNet() - return -} - -func (d *dialer) Name() (name string) { - return CName -} - -func (d *dialer) SetPeerAddrs(peerId string, addrs []string) { - d.mu.Lock() - defer d.mu.Unlock() - d.peerAddrs[peerId] = addrs -} - -func (d *dialer) getPeerAddrs(peerId string) ([]string, error) { - if addrs, ok := d.nodeConf.PeerAddresses(peerId); ok { - return addrs, nil - } - addrs, ok := d.peerAddrs[peerId] - if !ok || len(addrs) == 0 { - return nil, ErrAddrsNotFound - } - return addrs, nil -} - -func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) { - var ctxCancel context.CancelFunc - ctx, ctxCancel = context.WithTimeout(ctx, time.Second*10) - defer ctxCancel() - d.mu.RLock() - defer d.mu.RUnlock() - - addrs, err := d.getPeerAddrs(peerId) - if err != nil { - return - } - - var ( - conn drpc.Conn - sc sec.SecureConn - ) - log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) - for _, addr := range addrs { - conn, sc, err = d.handshake(ctx, addr, peerId) - if err != nil { - log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err)) - } else { - break - } - } - if err != nil { - return - } - return peer.NewPeer(sc, conn), nil -} - -func (d *dialer) handshake(ctx context.Context, addr, peerId string) (conn drpc.Conn, sc sec.SecureConn, err error) { - st := time.Now() - // TODO: move dial timeout to config - tcpConn, err := net.DialTimeout("tcp", addr, time.Second*3) - if err != nil { - return nil, nil, fmt.Errorf("dialTimeout error: %v; since start: %v", err, time.Since(st)) - } - - timeoutConn := connutil.NewConn(tcpConn, time.Millisecond*time.Duration(d.config.Stream.TimeoutMilliseconds)) - sc, err = d.transport.SecureOutbound(ctx, timeoutConn) - if err != nil { - if he, ok := err.(handshake.HandshakeError); ok { - return nil, nil, he - } - return nil, nil, fmt.Errorf("tls handshaeke error: %v; since start: %v", err, time.Since(st)) - } - if peerId != sc.RemotePeer().String() { - return nil, nil, ErrPeerIdIsUnexpected - } - log.Info("connected with remote host", zap.String("serverPeer", sc.RemotePeer().String()), zap.String("addr", addr)) - conn = drpcconn.NewWithOptions(sc, drpcconn.Options{Manager: drpcmanager.Options{ - Reader: drpcwire.ReaderOptions{MaximumBufferSize: d.config.Stream.MaxMsgSizeMb * (1 << 20)}, - }}) - return conn, sc, err -} diff --git a/net/peer/peer.go b/net/peer/peer.go index 99304ebe..cfe0b636 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -1,85 +1,39 @@ package peer import ( - "context" - "sync/atomic" + "github.com/anyproto/any-sync/net/transport" "time" "github.com/anyproto/any-sync/app/logger" - "github.com/libp2p/go-libp2p/core/sec" "go.uber.org/zap" - "storj.io/drpc" ) var log = logger.NewNamed("common.net.peer") -func NewPeer(sc sec.SecureConn, conn drpc.Conn) Peer { - return &peer{ - id: sc.RemotePeer().String(), - lastUsage: time.Now().Unix(), - sc: sc, - Conn: conn, +func NewPeer(mc transport.MultiConn) (p Peer, err error) { + ctx := mc.Context() + pr := &peer{} + if pr.id, err = CtxPeerId(ctx); err != nil { + return } + return pr, nil } type Peer interface { Id() string - LastUsage() time.Time - UpdateLastUsage() - Addr() string TryClose(objectTTL time.Duration) (res bool, err error) - drpc.Conn + transport.MultiConn } type peer struct { - id string - ttl time.Duration - lastUsage int64 - sc sec.SecureConn - drpc.Conn + id string + transport.MultiConn } func (p *peer) Id() string { return p.id } -func (p *peer) LastUsage() time.Time { - select { - case <-p.Closed(): - return time.Unix(0, 0) - default: - } - return time.Unix(atomic.LoadInt64(&p.lastUsage), 0) -} - -func (p *peer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { - defer p.UpdateLastUsage() - return p.Conn.Invoke(ctx, rpc, enc, in, out) -} - -func (p *peer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { - defer p.UpdateLastUsage() - return p.Conn.NewStream(ctx, rpc, enc) -} - -func (p *peer) Read(b []byte) (n int, err error) { - if n, err = p.sc.Read(b); err != nil { - p.UpdateLastUsage() - } - return -} - -func (p *peer) Write(b []byte) (n int, err error) { - if n, err = p.sc.Write(b); err != nil { - p.UpdateLastUsage() - } - return -} - -func (p *peer) UpdateLastUsage() { - atomic.StoreInt64(&p.lastUsage, time.Now().Unix()) -} - func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) { if time.Now().Sub(p.LastUsage()) < objectTTL { return false, nil @@ -87,14 +41,7 @@ func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) { return true, p.Close() } -func (p *peer) Addr() string { - if p.sc != nil { - return p.sc.RemoteAddr().String() - } - return "" -} - func (p *peer) Close() (err error) { log.Debug("peer close", zap.String("peerId", p.id)) - return p.Conn.Close() + return p.MultiConn.Close() } diff --git a/net/peerservice/peerservice.go b/net/peerservice/peerservice.go new file mode 100644 index 00000000..3394c5e8 --- /dev/null +++ b/net/peerservice/peerservice.go @@ -0,0 +1,107 @@ +package peerservice + +import ( + "context" + "errors" + "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/app/logger" + "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/pool" + "github.com/anyproto/any-sync/net/transport" + "github.com/anyproto/any-sync/net/transport/yamux" + "github.com/anyproto/any-sync/nodeconf" + "go.uber.org/zap" + "sync" +) + +const CName = "net.peerservice" + +var log = logger.NewNamed(CName) + +var ( + ErrAddrsNotFound = errors.New("addrs for peer not found") +) + +func New() PeerService { + return new(peerService) +} + +type PeerService interface { + Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) + SetPeerAddrs(peerId string, addrs []string) + transport.Accepter + app.Component +} + +type peerService struct { + yamux transport.Transport + nodeConf nodeconf.NodeConf + peerAddrs map[string][]string + pool pool.Pool + mu sync.RWMutex +} + +func (p *peerService) Init(a *app.App) (err error) { + p.yamux = a.MustComponent(yamux.CName).(transport.Transport) + p.nodeConf = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) + p.pool = a.MustComponent(pool.CName).(pool.Pool) + p.peerAddrs = map[string][]string{} + return nil +} + +func (p *peerService) Name() (name string) { + return CName +} + +func (p *peerService) Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) { + p.mu.RLock() + defer p.mu.RUnlock() + + addrs, err := p.getPeerAddrs(peerId) + if err != nil { + return + } + + var mc transport.MultiConn + log.InfoCtx(ctx, "dial", zap.String("peerId", peerId), zap.Strings("addrs", addrs)) + for _, addr := range addrs { + mc, err = p.yamux.Dial(ctx, addr) + if err != nil { + log.InfoCtx(ctx, "can't connect to host", zap.String("addr", addr), zap.Error(err)) + } else { + break + } + } + if err != nil { + return + } + return peer.NewPeer(mc) +} + +func (p *peerService) Accept(mc transport.MultiConn) (err error) { + pr, err := peer.NewPeer(mc) + if err != nil { + return err + } + if err = p.pool.AddPeer(pr); err != nil { + _ = pr.Close() + } + return +} + +func (p *peerService) SetPeerAddrs(peerId string, addrs []string) { + p.mu.Lock() + defer p.mu.Unlock() + p.peerAddrs[peerId] = addrs +} + +func (p *peerService) getPeerAddrs(peerId string) ([]string, error) { + if addrs, ok := p.nodeConf.PeerAddresses(peerId); ok { + return addrs, nil + } + addrs, ok := p.peerAddrs[peerId] + if !ok || len(addrs) == 0 { + return nil, ErrAddrsNotFound + } + return addrs, nil +} diff --git a/net/pool/pool.go b/net/pool/pool.go index bd0b58eb..f36318df 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -4,7 +4,6 @@ import ( "context" "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/net" - "github.com/anyproto/any-sync/net/dialer" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/secureservice/handshake" "go.uber.org/zap" @@ -18,13 +17,12 @@ type Pool interface { // GetOneOf searches at least one existing connection in outgoing or creates a new one from a randomly selected id from given list GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) // AddPeer adds incoming peer to the pool - AddPeer(ctx context.Context, p peer.Peer) (err error) + AddPeer(p peer.Peer) (err error) } type pool struct { outgoing ocache.OCache incoming ocache.OCache - dialer dialer.Dialer } func (p *pool) Name() (name string) { @@ -46,36 +44,26 @@ func (p *pool) get(ctx context.Context, source ocache.OCache, id string) (peer.P return nil, err } pr := v.(peer.Peer) - select { - case <-pr.Closed(): - default: + if !pr.IsClosed() { return pr, nil } _, _ = source.Remove(ctx, id) return p.Get(ctx, id) } -func (p *pool) Dial(ctx context.Context, id string) (peer.Peer, error) { - return p.dialer.Dial(ctx, id) -} - func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) { // finding existing connection for _, peerId := range peerIds { if v, err := p.incoming.Pick(ctx, peerId); err == nil { pr := v.(peer.Peer) - select { - case <-pr.Closed(): - default: + if !pr.IsClosed() { return pr, nil } _, _ = p.incoming.Remove(ctx, peerId) } if v, err := p.outgoing.Pick(ctx, peerId); err == nil { pr := v.(peer.Peer) - select { - case <-pr.Closed(): - default: + if !pr.IsClosed() { return pr, nil } _, _ = p.outgoing.Remove(ctx, peerId) @@ -101,6 +89,6 @@ func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error return nil, lastErr } -func (p *pool) AddPeer(ctx context.Context, pr peer.Peer) (err error) { +func (p *pool) AddPeer(pr peer.Peer) (err error) { return p.incoming.Add(pr.Id(), pr) } diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index c82533e8..71bfa1c6 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -6,12 +6,11 @@ import ( "fmt" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/net" - "github.com/anyproto/any-sync/net/dialer" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/secureservice/handshake" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "storj.io/drpc" + net2 "net" "testing" "time" ) @@ -158,7 +157,7 @@ type fixture struct { t *testing.T } -var _ dialer.Dialer = (*dialerMock)(nil) +var _ dialer = (*dialerMock)(nil) type dialerMock struct { dial func(ctx context.Context, peerId string) (peer peer.Peer, err error) @@ -181,7 +180,7 @@ func (d *dialerMock) Init(a *app.App) (err error) { } func (d *dialerMock) Name() (name string) { - return dialer.CName + return "net.peerservice" } func newTestPeer(id string) *testPeer { @@ -196,6 +195,21 @@ type testPeer struct { closed chan struct{} } +func (t *testPeer) Context() context.Context { + //TODO implement me + panic("implement me") +} + +func (t *testPeer) Accept() (conn net2.Conn, err error) { + //TODO implement me + panic("implement me") +} + +func (t *testPeer) Open(ctx context.Context) (conn net2.Conn, err error) { + //TODO implement me + panic("implement me") +} + func (t *testPeer) Addr() string { return "" } @@ -224,14 +238,11 @@ func (t *testPeer) Close() error { return nil } -func (t *testPeer) Closed() <-chan struct{} { - return t.closed -} - -func (t *testPeer) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { - return fmt.Errorf("call Invoke on test peer") -} - -func (t *testPeer) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (drpc.Stream, error) { - return nil, fmt.Errorf("call NewStream on test peer") +func (t *testPeer) IsClosed() bool { + select { + case <-t.closed: + return true + default: + return false + } } diff --git a/net/pool/poolservice.go b/net/pool/poolservice.go index 68daafc5..2f84e5d0 100644 --- a/net/pool/poolservice.go +++ b/net/pool/poolservice.go @@ -6,7 +6,7 @@ import ( "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/app/ocache" "github.com/anyproto/any-sync/metric" - "github.com/anyproto/any-sync/net/dialer" + "github.com/anyproto/any-sync/net/peer" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "time" @@ -27,16 +27,20 @@ type Service interface { app.ComponentRunnable } +type dialer interface { + Dial(ctx context.Context, peerId string) (pr peer.Peer, err error) +} + type poolService struct { // default pool *pool - dialer dialer.Dialer + dialer dialer metricReg *prometheus.Registry } func (p *poolService) Init(a *app.App) (err error) { - p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer) - p.pool = &pool{dialer: p.dialer} + p.dialer = a.MustComponent("net.peerservice").(dialer) + p.pool = &pool{} if m := a.Component(metric.CName); m != nil { p.metricReg = m.(metric.Metric).Registry() } diff --git a/net/transport/transport.go b/net/transport/transport.go index 3eba1414..127c9c06 100644 --- a/net/transport/transport.go +++ b/net/transport/transport.go @@ -25,6 +25,10 @@ type MultiConn interface { Open(ctx context.Context) (conn net.Conn, err error) // LastUsage returns the time of the last connection activity LastUsage() time.Time + // Addr returns remote peer address + Addr() string + // IsClosed returns true when connection is closed + IsClosed() bool // Close closes the connection and all sub connections Close() error } diff --git a/net/transport/yamux/conn.go b/net/transport/yamux/conn.go index e9ad7f59..541b8257 100644 --- a/net/transport/yamux/conn.go +++ b/net/transport/yamux/conn.go @@ -11,6 +11,7 @@ import ( type yamuxConn struct { ctx context.Context luConn *connutil.LastUsageConn + addr string *yamux.Session } @@ -25,3 +26,7 @@ func (y *yamuxConn) LastUsage() time.Time { func (y *yamuxConn) Context() context.Context { return y.ctx } + +func (y *yamuxConn) Addr() string { + return y.addr +} diff --git a/net/transport/yamux/yamux.go b/net/transport/yamux/yamux.go index 9d73c1f1..44729392 100644 --- a/net/transport/yamux/yamux.go +++ b/net/transport/yamux/yamux.go @@ -100,6 +100,7 @@ func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.Mu ctx: cctx, luConn: luc, Session: sess, + addr: addr, } return } @@ -151,6 +152,7 @@ func (y *yamuxTransport) accept(conn net.Conn) { ctx: cctx, luConn: luc, Session: sess, + addr: conn.RemoteAddr().String(), } if err = y.accepter.Accept(mc); err != nil { log.Warn("connection accept error", zap.Error(err))