dial pool

This commit is contained in:
Sergey Cherepanov 2023-01-30 16:43:52 +03:00 committed by Mikhail Iudin
parent b155031cb1
commit f6d5ea6495
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
4 changed files with 22 additions and 18 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey"
"github.com/anytypeio/any-sync/util/multiqueue" "github.com/anytypeio/any-sync/util/multiqueue"
"github.com/anytypeio/any-sync/util/slice" "github.com/anytypeio/any-sync/util/slice"
"github.com/cheggaaa/mb/v3"
"github.com/zeebo/errs" "github.com/zeebo/errs"
"go.uber.org/zap" "go.uber.org/zap"
"strconv" "strconv"
@ -359,7 +360,13 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error)
_ = s.handleQueue.CloseThread(threadId) _ = s.handleQueue.CloseThread(threadId)
}() }()
} }
return s.handleQueue.Add(ctx, threadId, hm) err = s.handleQueue.Add(ctx, threadId, hm)
if err == mb.ErrOverflowed {
log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId))
// skip overflowed error
return nil
}
return
} }
func (s *space) handleMessage(msg HandleMessage) { func (s *space) handleMessage(msg HandleMessage) {

View File

@ -6,11 +6,11 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
// newStreamSender creates new sendPool // newExecPool creates new execPool
// workers - how many processes will execute tasks // workers - how many processes will execute tasks
// maxSize - limit for queue size // maxSize - limit for queue size
func newStreamSender(workers, maxSize int) *sendPool { func newExecPool(workers, maxSize int) *execPool {
ss := &sendPool{ ss := &execPool{
batch: mb.New[func()](maxSize), batch: mb.New[func()](maxSize),
} }
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
@ -19,16 +19,16 @@ func newStreamSender(workers, maxSize int) *sendPool {
return ss return ss
} }
// sendPool needed for parallel execution of the incoming send tasks // execPool needed for parallel execution of the incoming send tasks
type sendPool struct { type execPool struct {
batch *mb.MB[func()] batch *mb.MB[func()]
} }
func (ss *sendPool) Add(ctx context.Context, f ...func()) (err error) { func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) {
return ss.batch.Add(ctx, f...) return ss.batch.Add(ctx, f...)
} }
func (ss *sendPool) sendLoop() { func (ss *execPool) sendLoop() {
for { for {
f, err := ss.batch.WaitOne(context.Background()) f, err := ss.batch.WaitOne(context.Background())
if err != nil { if err != nil {
@ -39,6 +39,6 @@ func (ss *sendPool) sendLoop() {
} }
} }
func (ss *sendPool) Close() (err error) { func (ss *execPool) Close() (err error) {
return ss.batch.Close() return ss.batch.Close()
} }

View File

@ -50,8 +50,9 @@ type streamPool struct {
streamIdsByTag map[string][]uint32 streamIdsByTag map[string][]uint32
streams map[uint32]*stream streams map[uint32]*stream
opening map[string]*openingProcess opening map[string]*openingProcess
exec *sendPool exec *execPool
mu sync.RWMutex dial *execPool
mu sync.Mutex
lastStreamId uint32 lastStreamId uint32
} }
@ -59,11 +60,6 @@ type openingProcess struct {
ch chan struct{} ch chan struct{}
err error err error
} }
type handleMessage struct {
ctx context.Context
msg drpc.Message
peerId string
}
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
st := s.addStream(peerId, drpcStream, tags...) st := s.addStream(peerId, drpcStream, tags...)
@ -108,7 +104,7 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter Peer
} }
} }
} }
return s.exec.Add(ctx, func() { return s.dial.Add(ctx, func() {
peers, dialErr := peerGetter(ctx) peers, dialErr := peerGetter(ctx)
if dialErr != nil { if dialErr != nil {
log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr))

View File

@ -28,7 +28,8 @@ func (s *service) NewStreamPool(h StreamHandler) StreamPool {
streamIdsByTag: map[string][]uint32{}, streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{}, streams: map[uint32]*stream{},
opening: map[string]*openingProcess{}, opening: map[string]*openingProcess{},
exec: newStreamSender(10, 100), exec: newExecPool(10, 100),
dial: newExecPool(4, 100),
} }
return sp return sp
} }