diff --git a/commonspace/object/tree/objecttree/objecttree_test.go b/commonspace/object/tree/objecttree/objecttree_test.go index 0dd255f8..1ac1a844 100644 --- a/commonspace/object/tree/objecttree/objecttree_test.go +++ b/commonspace/object/tree/objecttree/objecttree_test.go @@ -3,6 +3,9 @@ package objecttree import ( "context" "fmt" + "testing" + "time" + "github.com/anyproto/any-sync/commonspace/object/accountdata" "github.com/anyproto/any-sync/commonspace/object/acl/list" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" @@ -10,8 +13,6 @@ import ( "github.com/gogo/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "testing" - "time" ) type testTreeContext struct { @@ -123,6 +124,7 @@ func TestObjectTree(t *testing.T) { require.NoError(t, err) require.GreaterOrEqual(t, start.Unix(), ch.Timestamp) require.LessOrEqual(t, end.Unix(), ch.Timestamp) + require.Equal(t, res.Added[0].Id, oTree.(*objectTree).tree.lastIteratedHeadId) }) t.Run("timestamp is set correctly", func(t *testing.T) { someTs := time.Now().Add(time.Hour).Unix() @@ -139,6 +141,7 @@ func TestObjectTree(t *testing.T) { ch, err := oTree.(*objectTree).changeBuilder.Unmarshall(res.Added[0], true) require.NoError(t, err) require.Equal(t, ch.Timestamp, someTs) + require.Equal(t, res.Added[0].Id, oTree.(*objectTree).tree.lastIteratedHeadId) }) }) diff --git a/commonspace/object/tree/objecttree/tree.go b/commonspace/object/tree/objecttree/tree.go index 51a3b444..1a070e4c 100644 --- a/commonspace/object/tree/objecttree/tree.go +++ b/commonspace/object/tree/objecttree/tree.go @@ -82,6 +82,7 @@ func (t *Tree) AddMergedHead(c *Change) error { } } t.headIds = []string{c.Id} + t.lastIteratedHeadId = c.Id return nil } diff --git a/commonspace/object/tree/objecttree/tree_test.go b/commonspace/object/tree/objecttree/tree_test.go index 4bda3527..3fe0649c 100644 --- a/commonspace/object/tree/objecttree/tree_test.go +++ b/commonspace/object/tree/objecttree/tree_test.go @@ -2,10 +2,12 @@ package objecttree import ( "fmt" - "github.com/stretchr/testify/assert" "math/rand" "testing" "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func newChange(id string, snapshotId string, prevIds ...string) *Change { @@ -26,6 +28,17 @@ func newSnapshot(id, snapshotId string, prevIds ...string) *Change { } } +func TestTree_AddMergedHead(t *testing.T) { + tr := new(Tree) + _, _ = tr.Add( + newSnapshot("root", ""), + newChange("one", "root", "root"), + ) + require.Equal(t, tr.lastIteratedHeadId, "one") + tr.AddMergedHead(newChange("two", "root", "one")) + require.Equal(t, tr.lastIteratedHeadId, "two") +} + func TestTree_Add(t *testing.T) { t.Run("add first el", func(t *testing.T) { tr := new(Tree) diff --git a/go.mod b/go.mod index 7d036e80..925e5cd6 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-merkledag v0.11.0 github.com/ipfs/go-unixfs v0.4.6 - github.com/libp2p/go-libp2p v0.28.0 + github.com/libp2p/go-libp2p v0.28.1 github.com/mr-tron/base58 v1.2.0 github.com/multiformats/go-multibase v0.2.0 github.com/multiformats/go-multihash v0.2.3 diff --git a/go.sum b/go.sum index 6f95ef34..e36885d7 100644 --- a/go.sum +++ b/go.sum @@ -166,8 +166,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c= -github.com/libp2p/go-libp2p v0.28.0 h1:zO8cY98nJiPzZpFv5w5gqqb8aVzt4ukQ0nVOSaaKhJ8= -github.com/libp2p/go-libp2p v0.28.0/go.mod h1:s3Xabc9LSwOcnv9UD4nORnXKTsWkPMkIMB/JIGXVnzk= +github.com/libp2p/go-libp2p v0.28.1 h1:YurK+ZAI6cKfASLJBVFkpVBdl3wGhFi6fusOt725ii8= +github.com/libp2p/go-libp2p v0.28.1/go.mod h1:s3Xabc9LSwOcnv9UD4nORnXKTsWkPMkIMB/JIGXVnzk= github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s= github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0= github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA= @@ -214,6 +214,7 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8= github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU= github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.9.7 h1:06xGQy5www2oN160RtEZoTvnP2sPhEfePYmCDc2szss= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= diff --git a/net/peer/limiter.go b/net/peer/limiter.go new file mode 100644 index 00000000..4c8e1589 --- /dev/null +++ b/net/peer/limiter.go @@ -0,0 +1,18 @@ +package peer + +import ( + "time" +) + +type limiter struct { + startThreshold int + slowDownStep time.Duration +} + +func (l limiter) wait(count int) <-chan time.Time { + if count > l.startThreshold { + wait := l.slowDownStep * time.Duration(count-l.startThreshold) + return time.After(wait) + } + return nil +} diff --git a/net/peer/peer.go b/net/peer/peer.go index 5bd81a8b..25ffbb20 100644 --- a/net/peer/peer.go +++ b/net/peer/peer.go @@ -6,6 +6,7 @@ import ( "io" "net" "sync" + "sync/atomic" "time" "github.com/anyproto/any-sync/app/logger" @@ -36,8 +37,15 @@ func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) { active: map[*subConn]struct{}{}, MultiConn: mc, ctrl: ctrl, - created: time.Now(), + limiter: limiter{ + // start throttling after 10 sub conns + startThreshold: 10, + slowDownStep: time.Millisecond * 100, + }, + subConnRelease: make(chan drpc.Conn), + created: time.Now(), } + pr.acceptCtx, pr.acceptCtxCancel = context.WithCancel(context.Background()) if pr.id, err = CtxPeerId(ctx); err != nil { return } @@ -71,13 +79,22 @@ type peer struct { ctrl connCtrl // drpc conn pool - inactive []*subConn - active map[*subConn]struct{} + // outgoing + inactive []*subConn + active map[*subConn]struct{} + subConnRelease chan drpc.Conn + openingWaitCount atomic.Int32 + + incomingCount atomic.Int32 + acceptCtx context.Context + + acceptCtxCancel context.CancelFunc + + limiter limiter mu sync.Mutex created time.Time - transport.MultiConn } @@ -88,7 +105,20 @@ func (p *peer) Id() string { func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) { p.mu.Lock() if len(p.inactive) == 0 { + wait := p.limiter.wait(len(p.active) + int(p.openingWaitCount.Load())) p.mu.Unlock() + if wait != nil { + p.openingWaitCount.Add(1) + defer p.openingWaitCount.Add(-1) + // throttle new connection opening + select { + case <-ctx.Done(): + return nil, ctx.Err() + case dconn := <-p.subConnRelease: + return dconn, nil + case <-wait: + } + } dconn, err := p.openDrpcConn(ctx) if err != nil { return nil, err @@ -111,6 +141,21 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) { } func (p *peer) ReleaseDrpcConn(conn drpc.Conn) { + // do nothing if it's closed connection + select { + case <-conn.Closed(): + return + default: + } + + // try to send this connection to acquire if anyone is waiting for it + select { + case p.subConnRelease <- conn: + return + default: + } + + // return to pool p.mu.Lock() defer p.mu.Unlock() sc, ok := conn.(*subConn) @@ -163,12 +208,21 @@ func (p *peer) acceptLoop() { } }() for { + if wait := p.limiter.wait(int(p.incomingCount.Load())); wait != nil { + select { + case <-wait: + case <-p.acceptCtx.Done(): + return + } + } conn, err := p.Accept() if err != nil { exitErr = err return } go func() { + p.incomingCount.Add(1) + defer p.incomingCount.Add(-1) serveErr := p.serve(conn) if serveErr != io.EOF && serveErr != transport.ErrConnClosed { log.InfoCtx(p.Context(), "serve connection error", zap.Error(serveErr)) diff --git a/net/peer/peer_test.go b/net/peer/peer_test.go index ac06f8d6..7aa8428c 100644 --- a/net/peer/peer_test.go +++ b/net/peer/peer_test.go @@ -12,6 +12,8 @@ import ( "io" "net" _ "net/http/pprof" + "storj.io/drpc" + "storj.io/drpc/drpcconn" "testing" "time" ) @@ -19,32 +21,86 @@ import ( var ctx = context.Background() func TestPeer_AcquireDrpcConn(t *testing.T) { + t.Run("generic", func(t *testing.T) { + fx := newFixture(t, "p1") + defer fx.finish() + in, out := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker) + }() + defer out.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) + dc, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + assert.NotEmpty(t, dc) + defer dc.Close() + + assert.Len(t, fx.active, 1) + assert.Len(t, fx.inactive, 0) + + fx.ReleaseDrpcConn(dc) + + assert.Len(t, fx.active, 0) + assert.Len(t, fx.inactive, 1) + + dc, err = fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + assert.NotEmpty(t, dc) + assert.Len(t, fx.active, 1) + assert.Len(t, fx.inactive, 0) + }) + t.Run("closed sub conn", func(t *testing.T) { + fx := newFixture(t, "p1") + defer fx.finish() + + closedIn, _ := net.Pipe() + dc := drpcconn.New(closedIn) + fx.ReleaseDrpcConn(&subConn{Conn: dc}) + dc.Close() + + in, out := net.Pipe() + go func() { + handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker) + }() + defer out.Close() + fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) + _, err := fx.AcquireDrpcConn(ctx) + require.NoError(t, err) + }) +} + +func TestPeer_DrpcConn_OpenThrottling(t *testing.T) { fx := newFixture(t, "p1") defer fx.finish() - in, out := net.Pipe() + + acquire := func() (func(), drpc.Conn, error) { + in, out := net.Pipe() + go func() { + _, err := handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker) + require.NoError(t, err) + }() + + fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) + dconn, err := fx.AcquireDrpcConn(ctx) + return func() { out.Close() }, dconn, err + } + + var conCount = fx.limiter.startThreshold + 3 + var conns []drpc.Conn + for i := 0; i < conCount; i++ { + cc, dc, err := acquire() + require.NoError(t, err) + defer cc() + conns = append(conns, dc) + } + go func() { - handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker) + time.Sleep(fx.limiter.slowDownStep) + fx.ReleaseDrpcConn(conns[0]) + conns = conns[1:] }() - defer out.Close() - fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil) - dc, err := fx.AcquireDrpcConn(ctx) + _, err := fx.AcquireDrpcConn(ctx) require.NoError(t, err) - assert.NotEmpty(t, dc) - defer dc.Close() - - assert.Len(t, fx.active, 1) - assert.Len(t, fx.inactive, 0) - - fx.ReleaseDrpcConn(dc) - - assert.Len(t, fx.active, 0) - assert.Len(t, fx.inactive, 1) - - dc, err = fx.AcquireDrpcConn(ctx) - require.NoError(t, err) - assert.NotEmpty(t, dc) - assert.Len(t, fx.active, 1) - assert.Len(t, fx.inactive, 0) } func TestPeerAccept(t *testing.T) { @@ -63,6 +119,26 @@ func TestPeerAccept(t *testing.T) { assert.NoError(t, <-outHandshakeCh) } +func TestPeer_DrpcConn_AcceptThrottling(t *testing.T) { + fx := newFixture(t, "p1") + defer fx.finish() + + var conCount = fx.limiter.startThreshold + 3 + for i := 0; i < conCount; i++ { + in, out := net.Pipe() + defer out.Close() + + var outHandshakeCh = make(chan error) + go func() { + outHandshakeCh <- handshake.OutgoingProtoHandshake(ctx, out, handshakeproto.ProtoType_DRPC) + }() + fx.acceptCh <- acceptedConn{conn: in} + cn := <-fx.testCtrl.serveConn + assert.Equal(t, in, cn) + assert.NoError(t, <-outHandshakeCh) + } +} + func TestPeer_TryClose(t *testing.T) { t.Run("not close in first minute", func(t *testing.T) { fx := newFixture(t, "p1") diff --git a/net/pool/poolservice.go b/net/pool/poolservice.go index 2f84e5d0..0c574eb4 100644 --- a/net/pool/poolservice.go +++ b/net/pool/poolservice.go @@ -49,8 +49,8 @@ func (p *poolService) Init(a *app.App) (err error) { return p.dialer.Dial(ctx, id) }, ocache.WithLogger(log.Sugar()), - ocache.WithGCPeriod(time.Minute), - ocache.WithTTL(time.Minute*5), + ocache.WithGCPeriod(time.Minute/2), + ocache.WithTTL(time.Minute), ocache.WithPrometheus(p.metricReg, "netpool", "outgoing"), ) p.pool.incoming = ocache.New( @@ -58,8 +58,8 @@ func (p *poolService) Init(a *app.App) (err error) { return nil, ocache.ErrNotExists }, ocache.WithLogger(log.Sugar()), - ocache.WithGCPeriod(time.Minute), - ocache.WithTTL(time.Minute*5), + ocache.WithGCPeriod(time.Minute/2), + ocache.WithTTL(time.Minute), ocache.WithPrometheus(p.metricReg, "netpool", "incoming"), ) return nil diff --git a/net/rpc/rpctest/multiconntest/multiconntest.go b/net/rpc/rpctest/multiconntest/multiconntest.go new file mode 100644 index 00000000..a99d1083 --- /dev/null +++ b/net/rpc/rpctest/multiconntest/multiconntest.go @@ -0,0 +1,29 @@ +package multiconntest + +import ( + "context" + "github.com/anyproto/any-sync/net/connutil" + "github.com/anyproto/any-sync/net/transport" + yamux2 "github.com/anyproto/any-sync/net/transport/yamux" + "github.com/hashicorp/yamux" + "net" +) + +func MultiConnPair(peerServCtx, peerClientCtx context.Context) (serv, client transport.MultiConn) { + sc, cc := net.Pipe() + var servConn = make(chan transport.MultiConn, 1) + go func() { + sess, err := yamux.Server(sc, yamux.DefaultConfig()) + if err != nil { + panic(err) + } + servConn <- yamux2.NewMultiConn(peerServCtx, connutil.NewLastUsageConn(sc), "", sess) + }() + sess, err := yamux.Client(cc, yamux.DefaultConfig()) + if err != nil { + panic(err) + } + client = yamux2.NewMultiConn(peerClientCtx, connutil.NewLastUsageConn(cc), "", sess) + serv = <-servConn + return +} diff --git a/net/rpc/rpctest/peer.go b/net/rpc/rpctest/peer.go index a5fef8be..902547bb 100644 --- a/net/rpc/rpctest/peer.go +++ b/net/rpc/rpctest/peer.go @@ -2,29 +2,11 @@ package rpctest import ( "context" - "github.com/anyproto/any-sync/net/connutil" "github.com/anyproto/any-sync/net/peer" + "github.com/anyproto/any-sync/net/rpc/rpctest/multiconntest" "github.com/anyproto/any-sync/net/transport" - yamux2 "github.com/anyproto/any-sync/net/transport/yamux" - "github.com/hashicorp/yamux" - "net" ) func MultiConnPair(peerIdServ, peerIdClient string) (serv, client transport.MultiConn) { - sc, cc := net.Pipe() - var servConn = make(chan transport.MultiConn, 1) - go func() { - sess, err := yamux.Server(sc, yamux.DefaultConfig()) - if err != nil { - panic(err) - } - servConn <- yamux2.NewMultiConn(peer.CtxWithPeerId(context.Background(), peerIdServ), connutil.NewLastUsageConn(sc), "", sess) - }() - sess, err := yamux.Client(cc, yamux.DefaultConfig()) - if err != nil { - panic(err) - } - client = yamux2.NewMultiConn(peer.CtxWithPeerId(context.Background(), peerIdClient), connutil.NewLastUsageConn(cc), "", sess) - serv = <-servConn - return + return multiconntest.MultiConnPair(peer.CtxWithPeerId(context.Background(), peerIdServ), peer.CtxWithPeerId(context.Background(), peerIdClient)) } diff --git a/net/secureservice/handshake/proto.go b/net/secureservice/handshake/proto.go index 45e95ab5..6cd82110 100644 --- a/net/secureservice/handshake/proto.go +++ b/net/secureservice/handshake/proto.go @@ -11,19 +11,20 @@ type ProtoChecker struct { AllowedProtoTypes []handshakeproto.ProtoType } -func OutgoingProtoHandshake(ctx context.Context, conn net.Conn, pt handshakeproto.ProtoType) (err error) { +func OutgoingProtoHandshake(ctx context.Context, conn net.Conn, pt handshakeproto.ProtoType) error { if ctx == nil { ctx = context.Background() } h := newHandshake() done := make(chan struct{}) + var err error go func() { defer close(done) err = outgoingProtoHandshake(h, conn, pt) }() select { case <-done: - return + return err case <-ctx.Done(): _ = conn.Close() return ctx.Err() @@ -54,19 +55,23 @@ func outgoingProtoHandshake(h *handshake, conn net.Conn, pt handshakeproto.Proto return HandshakeError{e: msg.ack.Error} } -func IncomingProtoHandshake(ctx context.Context, conn net.Conn, pt ProtoChecker) (protoType handshakeproto.ProtoType, err error) { +func IncomingProtoHandshake(ctx context.Context, conn net.Conn, pt ProtoChecker) (handshakeproto.ProtoType, error) { if ctx == nil { ctx = context.Background() } h := newHandshake() done := make(chan struct{}) + var ( + protoType handshakeproto.ProtoType + err error + ) go func() { defer close(done) protoType, err = incomingProtoHandshake(h, conn, pt) }() select { case <-done: - return + return protoType, err case <-ctx.Done(): _ = conn.Close() return 0, ctx.Err() diff --git a/net/transport/yamux/conn.go b/net/transport/yamux/conn.go index 0013f2bb..6a473796 100644 --- a/net/transport/yamux/conn.go +++ b/net/transport/yamux/conn.go @@ -6,6 +6,7 @@ import ( "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/transport" "github.com/hashicorp/yamux" + "io" "net" "time" ) @@ -48,7 +49,7 @@ func (y *yamuxConn) Addr() string { func (y *yamuxConn) Accept() (conn net.Conn, err error) { if conn, err = y.Session.Accept(); err != nil { - if err == yamux.ErrSessionShutdown { + if err == yamux.ErrSessionShutdown || err == io.EOF { err = transport.ErrConnClosed } return diff --git a/net/transport/yamux/yamux_test.go b/net/transport/yamux/yamux_test.go index 02e1c322..9b209054 100644 --- a/net/transport/yamux/yamux_test.go +++ b/net/transport/yamux/yamux_test.go @@ -30,8 +30,12 @@ func TestYamuxTransport_Dial(t *testing.T) { mcC, err := fxC.Dial(ctx, fxS.addr) require.NoError(t, err) - require.Len(t, fxS.accepter.mcs, 1) - mcS := <-fxS.accepter.mcs + var mcS transport.MultiConn + select { + case mcS = <-fxS.accepter.mcs: + case <-time.After(time.Second * 5): + require.True(t, false, "timeout") + } var ( sData string @@ -69,11 +73,11 @@ func TestYamuxTransport_Dial(t *testing.T) { // no deadline - 69100 rps // common write deadline - 66700 rps // subconn write deadline - 67100 rps -func TestWriteBench(t *testing.T) { +func TestWriteBenchReuse(t *testing.T) { t.Skip() var ( numSubConn = 10 - numWrites = 100000 + numWrites = 10000 ) fxS := newFixture(t) @@ -124,6 +128,63 @@ func TestWriteBench(t *testing.T) { t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds()) } +func TestWriteBenchNew(t *testing.T) { + t.Skip() + var ( + numSubConn = 10 + numWrites = 10000 + ) + + fxS := newFixture(t) + defer fxS.finish(t) + fxC := newFixture(t) + defer fxC.finish(t) + + mcC, err := fxC.Dial(ctx, fxS.addr) + require.NoError(t, err) + mcS := <-fxS.accepter.mcs + + go func() { + for i := 0; i < numSubConn; i++ { + require.NoError(t, err) + go func() { + var b = make([]byte, 1024) + for { + conn, _ := mcS.Accept() + n, _ := conn.Read(b) + if n > 0 { + conn.Write(b[:n]) + } else { + _ = conn.Close() + break + } + conn.Close() + } + }() + } + }() + + var wg sync.WaitGroup + wg.Add(numSubConn) + st := time.Now() + for i := 0; i < numSubConn; i++ { + go func() { + defer wg.Done() + for j := 0; j < numWrites; j++ { + sc, err := mcC.Open(ctx) + require.NoError(t, err) + var b = []byte("some data some data some data some data some data some data some data some data some data") + sc.Write(b) + sc.Read(b) + sc.Close() + } + }() + } + wg.Wait() + dur := time.Since(st) + t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds()) +} + type fixture struct { *yamuxTransport a *app.App