From d256bfbc4cd1900de02a2edaafe4312f628eb04a Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Wed, 18 Jan 2023 13:44:50 +0300 Subject: [PATCH] PoolService still keeps the default Pool but with ability to create new pools --- net/pool/pool.go | 37 +---------------------- net/pool/pool_test.go | 8 ++--- net/pool/poolservice.go | 67 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+), 40 deletions(-) create mode 100644 net/pool/poolservice.go diff --git a/net/pool/pool.go b/net/pool/pool.go index 0b202106..e5a8cc1d 100644 --- a/net/pool/pool.go +++ b/net/pool/pool.go @@ -3,31 +3,16 @@ package pool import ( "context" "errors" - "github.com/anytypeio/any-sync/app" - "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/ocache" - "github.com/anytypeio/any-sync/metric" "github.com/anytypeio/any-sync/net/dialer" "github.com/anytypeio/any-sync/net/peer" - "github.com/prometheus/client_golang/prometheus" "math/rand" - "time" ) -const ( - CName = "common.net.pool" -) - -var log = logger.NewNamed(CName) - var ( ErrUnableToConnect = errors.New("unable to connect") ) -func New() Pool { - return &pool{} -} - // Pool creates and caches outgoing connection type Pool interface { // Get lookups to peer in existing connections or creates and cache new one @@ -38,8 +23,6 @@ type Pool interface { GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) - - app.ComponentRunnable } type pool struct { @@ -47,24 +30,6 @@ type pool struct { dialer dialer.Dialer } -func (p *pool) Init(a *app.App) (err error) { - p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer) - var reg *prometheus.Registry - if m := a.Component(metric.CName); m != nil { - reg = m.(metric.Metric).Registry() - } - p.cache = ocache.New( - func(ctx context.Context, id string) (value ocache.Object, err error) { - return p.dialer.Dial(ctx, id) - }, - ocache.WithLogger(log.Sugar()), - ocache.WithGCPeriod(time.Minute), - ocache.WithTTL(time.Minute*5), - ocache.WithPrometheus(reg, "netpool", "cache"), - ) - return nil -} - func (p *pool) Name() (name string) { return CName } @@ -84,7 +49,7 @@ func (p *pool) Get(ctx context.Context, id string) (peer.Peer, error) { default: return pr, nil } - p.cache.Remove(id) + _, _ = p.cache.Remove(id) return p.Get(ctx, id) } diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index d96af5f5..94fde050 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -123,11 +123,11 @@ func TestPool_GetOneOf(t *testing.T) { func newFixture(t *testing.T) *fixture { fx := &fixture{ - Pool: New(), - Dialer: &dialerMock{}, + Service: New(), + Dialer: &dialerMock{}, } a := new(app.App) - a.Register(fx.Pool) + a.Register(fx.Service) a.Register(fx.Dialer) require.NoError(t, a.Start(context.Background())) fx.a = a @@ -140,7 +140,7 @@ func (fx *fixture) Finish() { } type fixture struct { - Pool + Service Dialer *dialerMock a *app.App t *testing.T diff --git a/net/pool/poolservice.go b/net/pool/poolservice.go new file mode 100644 index 00000000..f00e5ea3 --- /dev/null +++ b/net/pool/poolservice.go @@ -0,0 +1,67 @@ +package pool + +import ( + "context" + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/app/logger" + "github.com/anytypeio/any-sync/app/ocache" + "github.com/anytypeio/any-sync/metric" + "github.com/anytypeio/any-sync/net/dialer" + "github.com/prometheus/client_golang/prometheus" + "time" +) + +const ( + CName = "common.net.pool" +) + +var log = logger.NewNamed(CName) + +func New() Service { + return &poolService{} +} + +type Service interface { + Pool + NewPool(name string) Pool + app.ComponentRunnable +} + +type poolService struct { + *pool + dialer dialer.Dialer + metricReg *prometheus.Registry +} + +func (p *poolService) Init(a *app.App) (err error) { + p.pool = &pool{} + p.dialer = a.MustComponent(dialer.CName).(dialer.Dialer) + if m := a.Component(metric.CName); m != nil { + p.metricReg = m.(metric.Metric).Registry() + } + p.pool.cache = ocache.New( + func(ctx context.Context, id string) (value ocache.Object, err error) { + return p.dialer.Dial(ctx, id) + }, + ocache.WithLogger(log.Sugar()), + ocache.WithGCPeriod(time.Minute), + ocache.WithTTL(time.Minute*5), + ocache.WithPrometheus(p.metricReg, "netpool", "default"), + ) + return nil +} + +func (p *poolService) NewPool(name string) Pool { + return &pool{ + dialer: p.dialer, + cache: ocache.New( + func(ctx context.Context, id string) (value ocache.Object, err error) { + return p.dialer.Dial(ctx, id) + }, + ocache.WithLogger(log.Sugar()), + ocache.WithGCPeriod(time.Minute), + ocache.WithTTL(time.Minute*5), + ocache.WithPrometheus(p.metricReg, "netpool", name), + ), + } +}