any-sync/net/streampool/streampoolservice.go
2023-05-26 11:29:21 +02:00

61 lines
1.3 KiB
Go

package streampool
import (
"github.com/anyproto/any-sync/app"
"github.com/anyproto/any-sync/app/logger"
"github.com/anyproto/any-sync/metric"
)
const CName = "common.net.streampool"
var log = logger.NewNamed(CName)
func New() Service {
return new(service)
}
type StreamConfig struct {
// SendQueueSize size of the queue for write per peer
SendQueueSize int
// DialQueueWorkers how many workers will dial to peers
DialQueueWorkers int
// DialQueueSize size of the dial queue
DialQueueSize int
}
type Service interface {
NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool
app.Component
}
type service struct {
metric metric.Metric
}
func (s *service) NewStreamPool(h StreamHandler, conf StreamConfig) StreamPool {
pl := NewExecPool(conf.DialQueueWorkers, conf.DialQueueSize)
sp := &streamPool{
handler: h,
writeQueueSize: conf.SendQueueSize,
streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{},
opening: map[string]*openingProcess{},
dial: pl,
}
pl.Run()
if s.metric != nil {
registerMetrics(s.metric.Registry(), sp, "")
}
return sp
}
func (s *service) Init(a *app.App) (err error) {
s.metric, _ = a.Component(metric.CName).(metric.Metric)
return nil
}
func (s *service) Name() (name string) {
return CName
}