commit
3cfa70c291
1
Makefile
1
Makefile
@ -25,6 +25,7 @@ deps:
|
|||||||
go mod download
|
go mod download
|
||||||
go build -o deps storj.io/drpc/cmd/protoc-gen-go-drpc
|
go build -o deps storj.io/drpc/cmd/protoc-gen-go-drpc
|
||||||
go build -o deps github.com/gogo/protobuf/protoc-gen-gogofaster
|
go build -o deps github.com/gogo/protobuf/protoc-gen-gogofaster
|
||||||
|
go build -o deps github.com/golang/mock/mockgen
|
||||||
|
|
||||||
test:
|
test:
|
||||||
go test ./... --cover
|
go test ./... --cover
|
||||||
|
|||||||
@ -35,6 +35,7 @@ func NewPeer(mc transport.MultiConn, ctrl connCtrl) (p Peer, err error) {
|
|||||||
active: map[*subConn]struct{}{},
|
active: map[*subConn]struct{}{},
|
||||||
MultiConn: mc,
|
MultiConn: mc,
|
||||||
ctrl: ctrl,
|
ctrl: ctrl,
|
||||||
|
created: time.Now(),
|
||||||
}
|
}
|
||||||
if pr.id, err = CtxPeerId(ctx); err != nil {
|
if pr.id, err = CtxPeerId(ctx); err != nil {
|
||||||
return
|
return
|
||||||
@ -74,6 +75,8 @@ type peer struct {
|
|||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
|
created time.Time
|
||||||
|
|
||||||
transport.MultiConn
|
transport.MultiConn
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,6 +98,12 @@ func (p *peer) AcquireDrpcConn(ctx context.Context) (drpc.Conn, error) {
|
|||||||
idx := len(p.inactive) - 1
|
idx := len(p.inactive) - 1
|
||||||
res := p.inactive[idx]
|
res := p.inactive[idx]
|
||||||
p.inactive = p.inactive[:idx]
|
p.inactive = p.inactive[:idx]
|
||||||
|
select {
|
||||||
|
case <-res.Closed():
|
||||||
|
p.mu.Unlock()
|
||||||
|
return p.AcquireDrpcConn(ctx)
|
||||||
|
default:
|
||||||
|
}
|
||||||
p.active[res] = struct{}{}
|
p.active[res] = struct{}{}
|
||||||
p.mu.Unlock()
|
p.mu.Unlock()
|
||||||
return res, nil
|
return res, nil
|
||||||
@ -184,14 +193,14 @@ func (p *peer) serve(conn net.Conn) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
func (p *peer) TryClose(objectTTL time.Duration) (res bool, err error) {
|
||||||
p.gc(objectTTL)
|
aliveCount := p.gc(objectTTL)
|
||||||
if time.Now().Sub(p.LastUsage()) < objectTTL {
|
if aliveCount == 0 && p.created.Add(time.Minute).Before(time.Now()) {
|
||||||
return false, nil
|
return true, p.Close()
|
||||||
}
|
}
|
||||||
return true, p.Close()
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) gc(ttl time.Duration) {
|
func (p *peer) gc(ttl time.Duration) (aliveCount int) {
|
||||||
p.mu.Lock()
|
p.mu.Lock()
|
||||||
defer p.mu.Unlock()
|
defer p.mu.Unlock()
|
||||||
minLastUsage := time.Now().Add(-ttl)
|
minLastUsage := time.Now().Add(-ttl)
|
||||||
@ -232,6 +241,7 @@ func (p *peer) gc(ttl time.Duration) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return len(p.active) + len(p.inactive)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *peer) Close() (err error) {
|
func (p *peer) Close() (err error) {
|
||||||
|
|||||||
@ -64,11 +64,9 @@ func TestPeerAccept(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPeer_TryClose(t *testing.T) {
|
func TestPeer_TryClose(t *testing.T) {
|
||||||
t.Run("ttl", func(t *testing.T) {
|
t.Run("not close in first minute", func(t *testing.T) {
|
||||||
fx := newFixture(t, "p1")
|
fx := newFixture(t, "p1")
|
||||||
defer fx.finish()
|
defer fx.finish()
|
||||||
lu := time.Now()
|
|
||||||
fx.mc.EXPECT().LastUsage().Return(lu)
|
|
||||||
res, err := fx.TryClose(time.Second)
|
res, err := fx.TryClose(time.Second)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.False(t, res)
|
assert.False(t, res)
|
||||||
@ -76,8 +74,7 @@ func TestPeer_TryClose(t *testing.T) {
|
|||||||
t.Run("close", func(t *testing.T) {
|
t.Run("close", func(t *testing.T) {
|
||||||
fx := newFixture(t, "p1")
|
fx := newFixture(t, "p1")
|
||||||
defer fx.finish()
|
defer fx.finish()
|
||||||
lu := time.Now().Add(-time.Second * 2)
|
fx.peer.created = fx.peer.created.Add(-time.Minute * 2)
|
||||||
fx.mc.EXPECT().LastUsage().Return(lu)
|
|
||||||
res, err := fx.TryClose(time.Second)
|
res, err := fx.TryClose(time.Second)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
assert.True(t, res)
|
assert.True(t, res)
|
||||||
@ -85,8 +82,7 @@ func TestPeer_TryClose(t *testing.T) {
|
|||||||
t.Run("gc", func(t *testing.T) {
|
t.Run("gc", func(t *testing.T) {
|
||||||
fx := newFixture(t, "p1")
|
fx := newFixture(t, "p1")
|
||||||
defer fx.finish()
|
defer fx.finish()
|
||||||
now := time.Now()
|
//now := time.Now()
|
||||||
fx.mc.EXPECT().LastUsage().Return(now.Add(time.Millisecond * 100))
|
|
||||||
|
|
||||||
// make one inactive
|
// make one inactive
|
||||||
in, out := net.Pipe()
|
in, out := net.Pipe()
|
||||||
|
|||||||
@ -8,7 +8,6 @@ import (
|
|||||||
context "context"
|
context "context"
|
||||||
net "net"
|
net "net"
|
||||||
reflect "reflect"
|
reflect "reflect"
|
||||||
time "time"
|
|
||||||
|
|
||||||
transport "github.com/anyproto/any-sync/net/transport"
|
transport "github.com/anyproto/any-sync/net/transport"
|
||||||
gomock "github.com/golang/mock/gomock"
|
gomock "github.com/golang/mock/gomock"
|
||||||
@ -158,20 +157,6 @@ func (mr *MockMultiConnMockRecorder) IsClosed() *gomock.Call {
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockMultiConn)(nil).IsClosed))
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsClosed", reflect.TypeOf((*MockMultiConn)(nil).IsClosed))
|
||||||
}
|
}
|
||||||
|
|
||||||
// LastUsage mocks base method.
|
|
||||||
func (m *MockMultiConn) LastUsage() time.Time {
|
|
||||||
m.ctrl.T.Helper()
|
|
||||||
ret := m.ctrl.Call(m, "LastUsage")
|
|
||||||
ret0, _ := ret[0].(time.Time)
|
|
||||||
return ret0
|
|
||||||
}
|
|
||||||
|
|
||||||
// LastUsage indicates an expected call of LastUsage.
|
|
||||||
func (mr *MockMultiConnMockRecorder) LastUsage() *gomock.Call {
|
|
||||||
mr.mock.ctrl.T.Helper()
|
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockMultiConn)(nil).LastUsage))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Open mocks base method.
|
// Open mocks base method.
|
||||||
func (m *MockMultiConn) Open(arg0 context.Context) (net.Conn, error) {
|
func (m *MockMultiConn) Open(arg0 context.Context) (net.Conn, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|||||||
@ -5,7 +5,6 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"net"
|
"net"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -29,8 +28,6 @@ type MultiConn interface {
|
|||||||
Accept() (conn net.Conn, err error)
|
Accept() (conn net.Conn, err error)
|
||||||
// Open opens new sub connection
|
// Open opens new sub connection
|
||||||
Open(ctx context.Context) (conn net.Conn, err error)
|
Open(ctx context.Context) (conn net.Conn, err error)
|
||||||
// LastUsage returns the time of the last connection activity
|
|
||||||
LastUsage() time.Time
|
|
||||||
// Addr returns remote peer address
|
// Addr returns remote peer address
|
||||||
Addr() string
|
Addr() string
|
||||||
// IsClosed returns true when connection is closed
|
// IsClosed returns true when connection is closed
|
||||||
|
|||||||
@ -5,7 +5,8 @@ type configGetter interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
ListenAddrs []string `yaml:"listenAddrs"`
|
ListenAddrs []string `yaml:"listenAddrs"`
|
||||||
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
|
WriteTimeoutSec int `yaml:"writeTimeoutSec"`
|
||||||
DialTimeoutSec int `yaml:"dialTimeoutSec"`
|
DialTimeoutSec int `yaml:"dialTimeoutSec"`
|
||||||
|
KeepAlivePeriodSec int `yaml:"keepAlivePeriodSec"`
|
||||||
}
|
}
|
||||||
|
|||||||
@ -51,8 +51,16 @@ func (y *yamuxTransport) Init(a *app.App) (err error) {
|
|||||||
if y.conf.WriteTimeoutSec <= 0 {
|
if y.conf.WriteTimeoutSec <= 0 {
|
||||||
y.conf.WriteTimeoutSec = 10
|
y.conf.WriteTimeoutSec = 10
|
||||||
}
|
}
|
||||||
|
|
||||||
y.yamuxConf = yamux.DefaultConfig()
|
y.yamuxConf = yamux.DefaultConfig()
|
||||||
y.yamuxConf.EnableKeepAlive = false
|
if y.conf.KeepAlivePeriodSec < 0 {
|
||||||
|
y.yamuxConf.EnableKeepAlive = false
|
||||||
|
} else {
|
||||||
|
y.yamuxConf.EnableKeepAlive = true
|
||||||
|
if y.conf.KeepAlivePeriodSec != 0 {
|
||||||
|
y.yamuxConf.KeepAliveInterval = time.Duration(y.conf.KeepAlivePeriodSec) * time.Second
|
||||||
|
}
|
||||||
|
}
|
||||||
y.yamuxConf.StreamOpenTimeout = time.Duration(y.conf.DialTimeoutSec) * time.Second
|
y.yamuxConf.StreamOpenTimeout = time.Duration(y.conf.DialTimeoutSec) * time.Second
|
||||||
y.yamuxConf.ConnectionWriteTimeout = time.Duration(y.conf.WriteTimeoutSec) * time.Second
|
y.yamuxConf.ConnectionWriteTimeout = time.Duration(y.conf.WriteTimeoutSec) * time.Second
|
||||||
y.listCtx, y.listCtxCancel = context.WithCancel(context.Background())
|
y.listCtx, y.listCtxCancel = context.WithCancel(context.Background())
|
||||||
@ -106,7 +114,7 @@ func (y *yamuxTransport) Dial(ctx context.Context, addr string) (mc transport.Mu
|
|||||||
_ = conn.Close()
|
_ = conn.Close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
luc := connutil.NewLastUsageConn(conn)
|
luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
|
||||||
sess, err := yamux.Client(luc, y.yamuxConf)
|
sess, err := yamux.Client(luc, y.yamuxConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -152,7 +160,7 @@ func (y *yamuxTransport) accept(conn net.Conn) {
|
|||||||
log.Warn("incoming connection handshake error", zap.Error(err))
|
log.Warn("incoming connection handshake error", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
luc := connutil.NewLastUsageConn(conn)
|
luc := connutil.NewLastUsageConn(connutil.NewTimeout(conn, time.Duration(y.conf.WriteTimeoutSec)*time.Second))
|
||||||
sess, err := yamux.Server(luc, y.yamuxConf)
|
sess, err := yamux.Server(luc, y.yamuxConf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("incoming connection yamux session error", zap.Error(err))
|
log.Warn("incoming connection yamux session error", zap.Error(err))
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user