Merge branch 'main' into acl-change
# Conflicts: # net/peer/peer.go
This commit is contained in:
commit
53e9c4ab02
@ -3,6 +3,9 @@ package objecttree
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/commonspace/object/accountdata"
|
"github.com/anyproto/any-sync/commonspace/object/accountdata"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
"github.com/anyproto/any-sync/commonspace/object/acl/list"
|
||||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||||
@ -10,8 +13,6 @@ import (
|
|||||||
"github.com/gogo/protobuf/proto"
|
"github.com/gogo/protobuf/proto"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type testTreeContext struct {
|
type testTreeContext struct {
|
||||||
@ -123,6 +124,7 @@ func TestObjectTree(t *testing.T) {
|
|||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.GreaterOrEqual(t, start.Unix(), ch.Timestamp)
|
require.GreaterOrEqual(t, start.Unix(), ch.Timestamp)
|
||||||
require.LessOrEqual(t, end.Unix(), ch.Timestamp)
|
require.LessOrEqual(t, end.Unix(), ch.Timestamp)
|
||||||
|
require.Equal(t, res.Added[0].Id, oTree.(*objectTree).tree.lastIteratedHeadId)
|
||||||
})
|
})
|
||||||
t.Run("timestamp is set correctly", func(t *testing.T) {
|
t.Run("timestamp is set correctly", func(t *testing.T) {
|
||||||
someTs := time.Now().Add(time.Hour).Unix()
|
someTs := time.Now().Add(time.Hour).Unix()
|
||||||
@ -139,6 +141,7 @@ func TestObjectTree(t *testing.T) {
|
|||||||
ch, err := oTree.(*objectTree).changeBuilder.Unmarshall(res.Added[0], true)
|
ch, err := oTree.(*objectTree).changeBuilder.Unmarshall(res.Added[0], true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, ch.Timestamp, someTs)
|
require.Equal(t, ch.Timestamp, someTs)
|
||||||
|
require.Equal(t, res.Added[0].Id, oTree.(*objectTree).tree.lastIteratedHeadId)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@ -82,6 +82,7 @@ func (t *Tree) AddMergedHead(c *Change) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.headIds = []string{c.Id}
|
t.headIds = []string{c.Id}
|
||||||
|
t.lastIteratedHeadId = c.Id
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -2,10 +2,12 @@ package objecttree
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func newChange(id string, snapshotId string, prevIds ...string) *Change {
|
func newChange(id string, snapshotId string, prevIds ...string) *Change {
|
||||||
@ -26,6 +28,17 @@ func newSnapshot(id, snapshotId string, prevIds ...string) *Change {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTree_AddMergedHead(t *testing.T) {
|
||||||
|
tr := new(Tree)
|
||||||
|
_, _ = tr.Add(
|
||||||
|
newSnapshot("root", ""),
|
||||||
|
newChange("one", "root", "root"),
|
||||||
|
)
|
||||||
|
require.Equal(t, tr.lastIteratedHeadId, "one")
|
||||||
|
tr.AddMergedHead(newChange("two", "root", "one"))
|
||||||
|
require.Equal(t, tr.lastIteratedHeadId, "two")
|
||||||
|
}
|
||||||
|
|
||||||
func TestTree_Add(t *testing.T) {
|
func TestTree_Add(t *testing.T) {
|
||||||
t.Run("add first el", func(t *testing.T) {
|
t.Run("add first el", func(t *testing.T) {
|
||||||
tr := new(Tree)
|
tr := new(Tree)
|
||||||
|
|||||||
2
go.mod
2
go.mod
@ -25,7 +25,7 @@ require (
|
|||||||
github.com/ipfs/go-ipld-format v0.5.0
|
github.com/ipfs/go-ipld-format v0.5.0
|
||||||
github.com/ipfs/go-merkledag v0.11.0
|
github.com/ipfs/go-merkledag v0.11.0
|
||||||
github.com/ipfs/go-unixfs v0.4.6
|
github.com/ipfs/go-unixfs v0.4.6
|
||||||
github.com/libp2p/go-libp2p v0.28.0
|
github.com/libp2p/go-libp2p v0.28.1
|
||||||
github.com/mr-tron/base58 v1.2.0
|
github.com/mr-tron/base58 v1.2.0
|
||||||
github.com/multiformats/go-multibase v0.2.0
|
github.com/multiformats/go-multibase v0.2.0
|
||||||
github.com/multiformats/go-multihash v0.2.3
|
github.com/multiformats/go-multihash v0.2.3
|
||||||
|
|||||||
5
go.sum
5
go.sum
@ -166,8 +166,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
|||||||
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
|
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
|
||||||
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
|
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
|
||||||
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
|
github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c=
|
||||||
github.com/libp2p/go-libp2p v0.28.0 h1:zO8cY98nJiPzZpFv5w5gqqb8aVzt4ukQ0nVOSaaKhJ8=
|
github.com/libp2p/go-libp2p v0.28.1 h1:YurK+ZAI6cKfASLJBVFkpVBdl3wGhFi6fusOt725ii8=
|
||||||
github.com/libp2p/go-libp2p v0.28.0/go.mod h1:s3Xabc9LSwOcnv9UD4nORnXKTsWkPMkIMB/JIGXVnzk=
|
github.com/libp2p/go-libp2p v0.28.1/go.mod h1:s3Xabc9LSwOcnv9UD4nORnXKTsWkPMkIMB/JIGXVnzk=
|
||||||
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
|
github.com/libp2p/go-libp2p-asn-util v0.3.0 h1:gMDcMyYiZKkocGXDQ5nsUQyquC9+H+iLEQHwOCZ7s8s=
|
||||||
github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0=
|
github.com/libp2p/go-libp2p-record v0.2.0 h1:oiNUOCWno2BFuxt3my4i1frNrt7PerzB3queqa1NkQ0=
|
||||||
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
|
github.com/libp2p/go-libp2p-testing v0.12.0 h1:EPvBb4kKMWO29qP4mZGyhVzUyR25dvfUIK5WDu6iPUA=
|
||||||
@ -214,6 +214,7 @@ github.com/multiformats/go-varint v0.0.5/go.mod h1:3Ls8CIEsrijN6+B7PbrXRPxHRPuXS
|
|||||||
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
|
github.com/multiformats/go-varint v0.0.7 h1:sWSGR+f/eu5ABZA2ZpYKBILXTTs9JWpdEM/nEGOHFS8=
|
||||||
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
|
github.com/multiformats/go-varint v0.0.7/go.mod h1:r8PUYw/fD/SjBCiKOoDlGF6QawOELpZAu9eioSos/OU=
|
||||||
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY=
|
github.com/nfnt/resize v0.0.0-20160724205520-891127d8d1b5 h1:BvoENQQU+fZ9uukda/RzCAL/191HHwJA5b13R6diVlY=
|
||||||
|
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
|
||||||
github.com/onsi/ginkgo/v2 v2.9.7 h1:06xGQy5www2oN160RtEZoTvnP2sPhEfePYmCDc2szss=
|
github.com/onsi/ginkgo/v2 v2.9.7 h1:06xGQy5www2oN160RtEZoTvnP2sPhEfePYmCDc2szss=
|
||||||
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
|
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
|
||||||
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
|
||||||
|
|||||||
18
net/peer/limiter.go
Normal file
18
net/peer/limiter.go
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type limiter struct {
|
||||||
|
startThreshold int
|
||||||
|
slowDownStep time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l limiter) wait(count int) <-chan time.Time {
|
||||||
|
if count > l.startThreshold {
|
||||||
|
wait := l.slowDownStep * time.Duration(count-l.startThreshold)
|
||||||
|
return time.After(wait)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/anyproto/any-sync/app/logger"
|
"github.com/anyproto/any-sync/app/logger"
|
||||||
@ -36,8 +37,15 @@ func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) {
|
|||||||
active: map[*subConn]struct{}{},
|
active: map[*subConn]struct{}{},
|
||||||
MultiConn: mc,
|
MultiConn: mc,
|
||||||
ctrl: ctrl,
|
ctrl: ctrl,
|
||||||
created: time.Now(),
|
limiter: limiter{
|
||||||
|
// start throttling after 10 sub conns
|
||||||
|
startThreshold: 10,
|
||||||
|
slowDownStep: time.Millisecond * 100,
|
||||||
|
},
|
||||||
|
subConnRelease: make(chan drpc.Conn),
|
||||||
|
created: time.Now(),
|
||||||
}
|
}
|
||||||
|
pr.acceptCtx, pr.acceptCtxCancel = context.WithCancel(context.Background())
|
||||||
if pr.id, err = CtxPeerId(ctx); err != nil {
|
if pr.id, err = CtxPeerId(ctx); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -71,13 +79,22 @@ type peer struct {
|
|||||||
ctrl connCtrl
|
ctrl connCtrl
|
||||||
|
|
||||||
// drpc conn pool
|
// drpc conn pool
|
||||||
inactive []*subConn
|
// outgoing
|
||||||
active map[*subConn]struct{}
|
inactive []*subConn
|
||||||
|
active map[*subConn]struct{}
|
||||||
|
subConnRelease chan drpc.Conn
|
||||||
|
openingWaitCount atomic.Int32
|
||||||
|
|
||||||
|
incomingCount atomic.Int32
|
||||||
|
acceptCtx context.Context
|
||||||
|
|
||||||
|
acceptCtxCancel context.CancelFunc
|
||||||
|
|
||||||
|
limiter limiter
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
created time.Time
|
created time.Time
|
||||||
|
|
||||||
transport.MultiConn
|
transport.MultiConn
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,7 +105,20 @@ func (p *peer) Id() string {
|
|||||||
func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
if len(p.inactive) == 0 {
|
if len(p.inactive) == 0 {
|
||||||
|
wait := p.limiter.wait(len(p.active) + int(p.openingWaitCount.Load()))
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
|
if wait != nil {
|
||||||
|
p.openingWaitCount.Add(1)
|
||||||
|
defer p.openingWaitCount.Add(-1)
|
||||||
|
// throttle new connection opening
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, ctx.Err()
|
||||||
|
case dconn := <-p.subConnRelease:
|
||||||
|
return dconn, nil
|
||||||
|
case <-wait:
|
||||||
|
}
|
||||||
|
}
|
||||||
dconn, err := p.openDrpcConn(ctx)
|
dconn, err := p.openDrpcConn(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -111,6 +141,21 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) ReleaseDrpcConn(conn drpc.Conn) {
|
func (p *peer) ReleaseDrpcConn(conn drpc.Conn) {
|
||||||
|
// do nothing if it's closed connection
|
||||||
|
select {
|
||||||
|
case <-conn.Closed():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// try to send this connection to acquire if anyone is waiting for it
|
||||||
|
select {
|
||||||
|
case p.subConnRelease <- conn:
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// return to pool
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
sc, ok := conn.(*subConn)
|
sc, ok := conn.(*subConn)
|
||||||
@ -163,12 +208,21 @@ func (p *peer) acceptLoop() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
for {
|
for {
|
||||||
|
if wait := p.limiter.wait(int(p.incomingCount.Load())); wait != nil {
|
||||||
|
select {
|
||||||
|
case <-wait:
|
||||||
|
case <-p.acceptCtx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
conn, err := p.Accept()
|
conn, err := p.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exitErr = err
|
exitErr = err
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
go func() {
|
go func() {
|
||||||
|
p.incomingCount.Add(1)
|
||||||
|
defer p.incomingCount.Add(-1)
|
||||||
serveErr := p.serve(conn)
|
serveErr := p.serve(conn)
|
||||||
if serveErr != io.EOF && serveErr != transport.ErrConnClosed {
|
if serveErr != io.EOF && serveErr != transport.ErrConnClosed {
|
||||||
log.InfoCtx(p.Context(), "serve connection error", zap.Error(serveErr))
|
log.InfoCtx(p.Context(), "serve connection error", zap.Error(serveErr))
|
||||||
|
|||||||
@ -12,6 +12,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
_ "net/http/pprof"
|
_ "net/http/pprof"
|
||||||
|
"storj.io/drpc"
|
||||||
|
"storj.io/drpc/drpcconn"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -19,32 +21,86 @@ import (
|
|||||||
var ctx = context.Background()
|
var ctx = context.Background()
|
||||||
|
|
||||||
func TestPeer_AcquireDrpcConn(t *testing.T) {
|
func TestPeer_AcquireDrpcConn(t *testing.T) {
|
||||||
|
t.Run("generic", func(t *testing.T) {
|
||||||
|
fx := newFixture(t, "p1")
|
||||||
|
defer fx.finish()
|
||||||
|
in, out := net.Pipe()
|
||||||
|
go func() {
|
||||||
|
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
|
||||||
|
}()
|
||||||
|
defer out.Close()
|
||||||
|
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
|
||||||
|
dc, err := fx.AcquireDrpcConn(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, dc)
|
||||||
|
defer dc.Close()
|
||||||
|
|
||||||
|
assert.Len(t, fx.active, 1)
|
||||||
|
assert.Len(t, fx.inactive, 0)
|
||||||
|
|
||||||
|
fx.ReleaseDrpcConn(dc)
|
||||||
|
|
||||||
|
assert.Len(t, fx.active, 0)
|
||||||
|
assert.Len(t, fx.inactive, 1)
|
||||||
|
|
||||||
|
dc, err = fx.AcquireDrpcConn(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, dc)
|
||||||
|
assert.Len(t, fx.active, 1)
|
||||||
|
assert.Len(t, fx.inactive, 0)
|
||||||
|
})
|
||||||
|
t.Run("closed sub conn", func(t *testing.T) {
|
||||||
|
fx := newFixture(t, "p1")
|
||||||
|
defer fx.finish()
|
||||||
|
|
||||||
|
closedIn, _ := net.Pipe()
|
||||||
|
dc := drpcconn.New(closedIn)
|
||||||
|
fx.ReleaseDrpcConn(&subConn{Conn: dc})
|
||||||
|
dc.Close()
|
||||||
|
|
||||||
|
in, out := net.Pipe()
|
||||||
|
go func() {
|
||||||
|
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
|
||||||
|
}()
|
||||||
|
defer out.Close()
|
||||||
|
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
|
||||||
|
_, err := fx.AcquireDrpcConn(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPeer_DrpcConn_OpenThrottling(t *testing.T) {
|
||||||
fx := newFixture(t, "p1")
|
fx := newFixture(t, "p1")
|
||||||
defer fx.finish()
|
defer fx.finish()
|
||||||
in, out := net.Pipe()
|
|
||||||
|
acquire := func() (func(), drpc.Conn, error) {
|
||||||
|
in, out := net.Pipe()
|
||||||
|
go func() {
|
||||||
|
_, err := handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}()
|
||||||
|
|
||||||
|
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
|
||||||
|
dconn, err := fx.AcquireDrpcConn(ctx)
|
||||||
|
return func() { out.Close() }, dconn, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var conCount = fx.limiter.startThreshold + 3
|
||||||
|
var conns []drpc.Conn
|
||||||
|
for i := 0; i < conCount; i++ {
|
||||||
|
cc, dc, err := acquire()
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer cc()
|
||||||
|
conns = append(conns, dc)
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
handshake.IncomingProtoHandshake(ctx, out, defaultProtoChecker)
|
time.Sleep(fx.limiter.slowDownStep)
|
||||||
|
fx.ReleaseDrpcConn(conns[0])
|
||||||
|
conns = conns[1:]
|
||||||
}()
|
}()
|
||||||
defer out.Close()
|
_, err := fx.AcquireDrpcConn(ctx)
|
||||||
fx.mc.EXPECT().Open(gomock.Any()).Return(in, nil)
|
|
||||||
dc, err := fx.AcquireDrpcConn(ctx)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.NotEmpty(t, dc)
|
|
||||||
defer dc.Close()
|
|
||||||
|
|
||||||
assert.Len(t, fx.active, 1)
|
|
||||||
assert.Len(t, fx.inactive, 0)
|
|
||||||
|
|
||||||
fx.ReleaseDrpcConn(dc)
|
|
||||||
|
|
||||||
assert.Len(t, fx.active, 0)
|
|
||||||
assert.Len(t, fx.inactive, 1)
|
|
||||||
|
|
||||||
dc, err = fx.AcquireDrpcConn(ctx)
|
|
||||||
require.NoError(t, err)
|
|
||||||
assert.NotEmpty(t, dc)
|
|
||||||
assert.Len(t, fx.active, 1)
|
|
||||||
assert.Len(t, fx.inactive, 0)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestPeerAccept(t *testing.T) {
|
func TestPeerAccept(t *testing.T) {
|
||||||
@ -63,6 +119,26 @@ func TestPeerAccept(t *testing.T) {
|
|||||||
assert.NoError(t, <-outHandshakeCh)
|
assert.NoError(t, <-outHandshakeCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPeer_DrpcConn_AcceptThrottling(t *testing.T) {
|
||||||
|
fx := newFixture(t, "p1")
|
||||||
|
defer fx.finish()
|
||||||
|
|
||||||
|
var conCount = fx.limiter.startThreshold + 3
|
||||||
|
for i := 0; i < conCount; i++ {
|
||||||
|
in, out := net.Pipe()
|
||||||
|
defer out.Close()
|
||||||
|
|
||||||
|
var outHandshakeCh = make(chan error)
|
||||||
|
go func() {
|
||||||
|
outHandshakeCh <- handshake.OutgoingProtoHandshake(ctx, out, handshakeproto.ProtoType_DRPC)
|
||||||
|
}()
|
||||||
|
fx.acceptCh <- acceptedConn{conn: in}
|
||||||
|
cn := <-fx.testCtrl.serveConn
|
||||||
|
assert.Equal(t, in, cn)
|
||||||
|
assert.NoError(t, <-outHandshakeCh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPeer_TryClose(t *testing.T) {
|
func TestPeer_TryClose(t *testing.T) {
|
||||||
t.Run("not close in first minute", func(t *testing.T) {
|
t.Run("not close in first minute", func(t *testing.T) {
|
||||||
fx := newFixture(t, "p1")
|
fx := newFixture(t, "p1")
|
||||||
|
|||||||
@ -49,8 +49,8 @@ func (p *poolService) Init(a *app.App) (err error) {
|
|||||||
return p.dialer.Dial(ctx, id)
|
return p.dialer.Dial(ctx, id)
|
||||||
},
|
},
|
||||||
ocache.WithLogger(log.Sugar()),
|
ocache.WithLogger(log.Sugar()),
|
||||||
ocache.WithGCPeriod(time.Minute),
|
ocache.WithGCPeriod(time.Minute/2),
|
||||||
ocache.WithTTL(time.Minute*5),
|
ocache.WithTTL(time.Minute),
|
||||||
ocache.WithPrometheus(p.metricReg, "netpool", "outgoing"),
|
ocache.WithPrometheus(p.metricReg, "netpool", "outgoing"),
|
||||||
)
|
)
|
||||||
p.pool.incoming = ocache.New(
|
p.pool.incoming = ocache.New(
|
||||||
@ -58,8 +58,8 @@ func (p *poolService) Init(a *app.App) (err error) {
|
|||||||
return nil, ocache.ErrNotExists
|
return nil, ocache.ErrNotExists
|
||||||
},
|
},
|
||||||
ocache.WithLogger(log.Sugar()),
|
ocache.WithLogger(log.Sugar()),
|
||||||
ocache.WithGCPeriod(time.Minute),
|
ocache.WithGCPeriod(time.Minute/2),
|
||||||
ocache.WithTTL(time.Minute*5),
|
ocache.WithTTL(time.Minute),
|
||||||
ocache.WithPrometheus(p.metricReg, "netpool", "incoming"),
|
ocache.WithPrometheus(p.metricReg, "netpool", "incoming"),
|
||||||
)
|
)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
29
net/rpc/rpctest/multiconntest/multiconntest.go
Normal file
29
net/rpc/rpctest/multiconntest/multiconntest.go
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
package multiconntest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"github.com/anyproto/any-sync/net/connutil"
|
||||||
|
"github.com/anyproto/any-sync/net/transport"
|
||||||
|
yamux2 "github.com/anyproto/any-sync/net/transport/yamux"
|
||||||
|
"github.com/hashicorp/yamux"
|
||||||
|
"net"
|
||||||
|
)
|
||||||
|
|
||||||
|
func MultiConnPair(peerServCtx, peerClientCtx context.Context) (serv, client transport.MultiConn) {
|
||||||
|
sc, cc := net.Pipe()
|
||||||
|
var servConn = make(chan transport.MultiConn, 1)
|
||||||
|
go func() {
|
||||||
|
sess, err := yamux.Server(sc, yamux.DefaultConfig())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
servConn <- yamux2.NewMultiConn(peerServCtx, connutil.NewLastUsageConn(sc), "", sess)
|
||||||
|
}()
|
||||||
|
sess, err := yamux.Client(cc, yamux.DefaultConfig())
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
client = yamux2.NewMultiConn(peerClientCtx, connutil.NewLastUsageConn(cc), "", sess)
|
||||||
|
serv = <-servConn
|
||||||
|
return
|
||||||
|
}
|
||||||
@ -2,29 +2,11 @@ package rpctest
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/anyproto/any-sync/net/connutil"
|
|
||||||
"github.com/anyproto/any-sync/net/peer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
|
"github.com/anyproto/any-sync/net/rpc/rpctest/multiconntest"
|
||||||
"github.com/anyproto/any-sync/net/transport"
|
"github.com/anyproto/any-sync/net/transport"
|
||||||
yamux2 "github.com/anyproto/any-sync/net/transport/yamux"
|
|
||||||
"github.com/hashicorp/yamux"
|
|
||||||
"net"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func MultiConnPair(peerIdServ, peerIdClient string) (serv, client transport.MultiConn) {
|
func MultiConnPair(peerIdServ, peerIdClient string) (serv, client transport.MultiConn) {
|
||||||
sc, cc := net.Pipe()
|
return multiconntest.MultiConnPair(peer.CtxWithPeerId(context.Background(), peerIdServ), peer.CtxWithPeerId(context.Background(), peerIdClient))
|
||||||
var servConn = make(chan transport.MultiConn, 1)
|
|
||||||
go func() {
|
|
||||||
sess, err := yamux.Server(sc, yamux.DefaultConfig())
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
servConn <- yamux2.NewMultiConn(peer.CtxWithPeerId(context.Background(), peerIdServ), connutil.NewLastUsageConn(sc), "", sess)
|
|
||||||
}()
|
|
||||||
sess, err := yamux.Client(cc, yamux.DefaultConfig())
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
client = yamux2.NewMultiConn(peer.CtxWithPeerId(context.Background(), peerIdClient), connutil.NewLastUsageConn(cc), "", sess)
|
|
||||||
serv = <-servConn
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -11,19 +11,20 @@ type ProtoChecker struct {
|
|||||||
AllowedProtoTypes []handshakeproto.ProtoType
|
AllowedProtoTypes []handshakeproto.ProtoType
|
||||||
}
|
}
|
||||||
|
|
||||||
func OutgoingProtoHandshake(ctx context.Context, conn net.Conn, pt handshakeproto.ProtoType) (err error) {
|
func OutgoingProtoHandshake(ctx context.Context, conn net.Conn, pt handshakeproto.ProtoType) error {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
h := newHandshake()
|
h := newHandshake()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
var err error
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
err = outgoingProtoHandshake(h, conn, pt)
|
err = outgoingProtoHandshake(h, conn, pt)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
return
|
return err
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
return ctx.Err()
|
return ctx.Err()
|
||||||
@ -54,19 +55,23 @@ func outgoingProtoHandshake(h *handshake, conn net.Conn, pt handshakeproto.Proto
|
|||||||
return HandshakeError{e: msg.ack.Error}
|
return HandshakeError{e: msg.ack.Error}
|
||||||
}
|
}
|
||||||
|
|
||||||
func IncomingProtoHandshake(ctx context.Context, conn net.Conn, pt ProtoChecker) (protoType handshakeproto.ProtoType, err error) {
|
func IncomingProtoHandshake(ctx context.Context, conn net.Conn, pt ProtoChecker) (handshakeproto.ProtoType, error) {
|
||||||
if ctx == nil {
|
if ctx == nil {
|
||||||
ctx = context.Background()
|
ctx = context.Background()
|
||||||
}
|
}
|
||||||
h := newHandshake()
|
h := newHandshake()
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
|
var (
|
||||||
|
protoType handshakeproto.ProtoType
|
||||||
|
err error
|
||||||
|
)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(done)
|
defer close(done)
|
||||||
protoType, err = incomingProtoHandshake(h, conn, pt)
|
protoType, err = incomingProtoHandshake(h, conn, pt)
|
||||||
}()
|
}()
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
return
|
return protoType, err
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
return 0, ctx.Err()
|
return 0, ctx.Err()
|
||||||
|
|||||||
@ -6,6 +6,7 @@ import (
|
|||||||
"github.com/anyproto/any-sync/net/peer"
|
"github.com/anyproto/any-sync/net/peer"
|
||||||
"github.com/anyproto/any-sync/net/transport"
|
"github.com/anyproto/any-sync/net/transport"
|
||||||
"github.com/hashicorp/yamux"
|
"github.com/hashicorp/yamux"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -48,7 +49,7 @@ func (y *yamuxConn) Addr() string {
|
|||||||
|
|
||||||
func (y *yamuxConn) Accept() (conn net.Conn, err error) {
|
func (y *yamuxConn) Accept() (conn net.Conn, err error) {
|
||||||
if conn, err = y.Session.Accept(); err != nil {
|
if conn, err = y.Session.Accept(); err != nil {
|
||||||
if err == yamux.ErrSessionShutdown {
|
if err == yamux.ErrSessionShutdown || err == io.EOF {
|
||||||
err = transport.ErrConnClosed
|
err = transport.ErrConnClosed
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|||||||
@ -30,8 +30,12 @@ func TestYamuxTransport_Dial(t *testing.T) {
|
|||||||
|
|
||||||
mcC, err := fxC.Dial(ctx, fxS.addr)
|
mcC, err := fxC.Dial(ctx, fxS.addr)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, fxS.accepter.mcs, 1)
|
var mcS transport.MultiConn
|
||||||
mcS := <-fxS.accepter.mcs
|
select {
|
||||||
|
case mcS = <-fxS.accepter.mcs:
|
||||||
|
case <-time.After(time.Second * 5):
|
||||||
|
require.True(t, false, "timeout")
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
sData string
|
sData string
|
||||||
@ -69,11 +73,11 @@ func TestYamuxTransport_Dial(t *testing.T) {
|
|||||||
// no deadline - 69100 rps
|
// no deadline - 69100 rps
|
||||||
// common write deadline - 66700 rps
|
// common write deadline - 66700 rps
|
||||||
// subconn write deadline - 67100 rps
|
// subconn write deadline - 67100 rps
|
||||||
func TestWriteBench(t *testing.T) {
|
func TestWriteBenchReuse(t *testing.T) {
|
||||||
t.Skip()
|
t.Skip()
|
||||||
var (
|
var (
|
||||||
numSubConn = 10
|
numSubConn = 10
|
||||||
numWrites = 100000
|
numWrites = 10000
|
||||||
)
|
)
|
||||||
|
|
||||||
fxS := newFixture(t)
|
fxS := newFixture(t)
|
||||||
@ -124,6 +128,63 @@ func TestWriteBench(t *testing.T) {
|
|||||||
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
|
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestWriteBenchNew(t *testing.T) {
|
||||||
|
t.Skip()
|
||||||
|
var (
|
||||||
|
numSubConn = 10
|
||||||
|
numWrites = 10000
|
||||||
|
)
|
||||||
|
|
||||||
|
fxS := newFixture(t)
|
||||||
|
defer fxS.finish(t)
|
||||||
|
fxC := newFixture(t)
|
||||||
|
defer fxC.finish(t)
|
||||||
|
|
||||||
|
mcC, err := fxC.Dial(ctx, fxS.addr)
|
||||||
|
require.NoError(t, err)
|
||||||
|
mcS := <-fxS.accepter.mcs
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for i := 0; i < numSubConn; i++ {
|
||||||
|
require.NoError(t, err)
|
||||||
|
go func() {
|
||||||
|
var b = make([]byte, 1024)
|
||||||
|
for {
|
||||||
|
conn, _ := mcS.Accept()
|
||||||
|
n, _ := conn.Read(b)
|
||||||
|
if n > 0 {
|
||||||
|
conn.Write(b[:n])
|
||||||
|
} else {
|
||||||
|
_ = conn.Close()
|
||||||
|
break
|
||||||
|
}
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(numSubConn)
|
||||||
|
st := time.Now()
|
||||||
|
for i := 0; i < numSubConn; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for j := 0; j < numWrites; j++ {
|
||||||
|
sc, err := mcC.Open(ctx)
|
||||||
|
require.NoError(t, err)
|
||||||
|
var b = []byte("some data some data some data some data some data some data some data some data some data")
|
||||||
|
sc.Write(b)
|
||||||
|
sc.Read(b)
|
||||||
|
sc.Close()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
dur := time.Since(st)
|
||||||
|
t.Logf("%.2f req per sec", float64(numWrites*numSubConn)/dur.Seconds())
|
||||||
|
}
|
||||||
|
|
||||||
type fixture struct {
|
type fixture struct {
|
||||||
*yamuxTransport
|
*yamuxTransport
|
||||||
a *app.App
|
a *app.App
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user