stream config

This commit is contained in:
Sergey Cherepanov 2023-01-30 17:03:31 +03:00
parent 78067eaa4c
commit 9dc3c807fb
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
2 changed files with 21 additions and 5 deletions

View File

@ -200,7 +200,12 @@ func newFixture(t *testing.T) *fixture {
require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh)) require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh))
fx.tp = rpctest.NewTestPool().WithServer(ts) fx.tp = rpctest.NewTestPool().WithServer(ts)
fx.th = &testHandler{} fx.th = &testHandler{}
fx.StreamPool = New().NewStreamPool(fx.th) fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{
SendQueueWorkers: 4,
SendQueueSize: 10,
DialQueueWorkers: 1,
DialQueueSize: 10,
})
return fx return fx
} }

View File

@ -13,23 +13,34 @@ func New() Service {
return new(service) return new(service)
} }
type StreamConfig struct {
// SendQueueWorkers how many workers will write message to streams
SendQueueWorkers int
// SendQueueSize size of the queue for write
SendQueueSize int
// DialQueueWorkers how many workers will dial to peers
DialQueueWorkers int
// DialQueueSize size of the dial queue
DialQueueSize int
}
type Service interface { type Service interface {
NewStreamPool(h StreamHandler) StreamPool NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
app.Component app.Component
} }
type service struct { type service struct {
} }
func (s *service) NewStreamPool(h StreamHandler) StreamPool { func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
sp := &streamPool{ sp := &streamPool{
handler: h, handler: h,
streamIdsByPeer: map[string][]uint32{}, streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{}, streams: map[uint32]*stream{},
opening: map[string]*openingProcess{}, opening: map[string]*openingProcess{},
exec: newExecPool(10, 100), exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize),
dial: newExecPool(4, 100), dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize),
} }
return sp return sp
} }