From 9dc3c807fb79da0de52529c603b50ebf372ecdeb Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 17:03:31 +0300 Subject: [PATCH] stream config --- net/streampool/streampool_test.go | 7 ++++++- net/streampool/streampoolservice.go | 19 +++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/net/streampool/streampool_test.go b/net/streampool/streampool_test.go index c67377cc..2ab15d26 100644 --- a/net/streampool/streampool_test.go +++ b/net/streampool/streampool_test.go @@ -200,7 +200,12 @@ func newFixture(t *testing.T) *fixture { require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh)) fx.tp = rpctest.NewTestPool().WithServer(ts) fx.th = &testHandler{} - fx.StreamPool = New().NewStreamPool(fx.th) + fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{ + SendQueueWorkers: 4, + SendQueueSize: 10, + DialQueueWorkers: 1, + DialQueueSize: 10, + }) return fx } diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index fe6c9ce2..25070497 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -13,23 +13,34 @@ func New() Service { return new(service) } +type StreamConfig struct { + // SendQueueWorkers how many workers will write message to streams + SendQueueWorkers int + // SendQueueSize size of the queue for write + SendQueueSize int + // DialQueueWorkers how many workers will dial to peers + DialQueueWorkers int + // DialQueueSize size of the dial queue + DialQueueSize int +} + type Service interface { - NewStreamPool(h StreamHandler) StreamPool + NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool app.Component } type service struct { } -func (s *service) NewStreamPool(h StreamHandler) StreamPool { +func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool { sp := &streamPool{ handler: h, streamIdsByPeer: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - exec: newExecPool(10, 100), - dial: newExecPool(4, 100), + exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize), + dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize), } return sp }