stream config

This commit is contained in:
Sergey Cherepanov 2023-01-30 17:03:31 +03:00 committed by Mikhail Iudin
parent f6d5ea6495
commit cf15052b76
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 21 additions and 5 deletions

View File

@ -200,7 +200,12 @@ func newFixture(t *testing.T) *fixture {
require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh))
fx.tp = rpctest.NewTestPool().WithServer(ts)
fx.th = &testHandler{}
fx.StreamPool = New().NewStreamPool(fx.th)
fx.StreamPool = New().NewStreamPool(fx.th, StreamConfig{
SendQueueWorkers: 4,
SendQueueSize: 10,
DialQueueWorkers: 1,
DialQueueSize: 10,
})
return fx
}

View File

@ -13,23 +13,34 @@ func New() Service {
return new(service)
}
type StreamConfig struct {
// SendQueueWorkers how many workers will write message to streams
SendQueueWorkers int
// SendQueueSize size of the queue for write
SendQueueSize int
// DialQueueWorkers how many workers will dial to peers
DialQueueWorkers int
// DialQueueSize size of the dial queue
DialQueueSize int
}
type Service interface {
NewStreamPool(h StreamHandler) StreamPool
NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
app.Component
}
type service struct {
}
func (s *service) NewStreamPool(h StreamHandler) StreamPool {
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
sp := &streamPool{
handler: h,
streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{},
opening: map[string]*openingProcess{},
exec: newExecPool(10, 100),
dial: newExecPool(4, 100),
exec: newExecPool(conf.SendQueueWorkers, conf.SendQueueSize),
dial: newExecPool(conf.DialQueueWorkers, conf.DialQueueSize),
}
return sp
}