From f6d5ea64952c81859fc604eb4cad6adb255b4aa5 Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Mon, 30 Jan 2023 16:43:52 +0300 Subject: [PATCH] dial pool --- commonspace/space.go | 9 ++++++++- net/streampool/sendpool.go | 16 ++++++++-------- net/streampool/streampool.go | 12 ++++-------- net/streampool/streampoolservice.go | 3 ++- 4 files changed, 22 insertions(+), 18 deletions(-) diff --git a/commonspace/space.go b/commonspace/space.go index b4ab1da3..378490d3 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -27,6 +27,7 @@ import ( "github.com/anytypeio/any-sync/util/keys/asymmetric/signingkey" "github.com/anytypeio/any-sync/util/multiqueue" "github.com/anytypeio/any-sync/util/slice" + "github.com/cheggaaa/mb/v3" "github.com/zeebo/errs" "go.uber.org/zap" "strconv" @@ -359,7 +360,13 @@ func (s *space) HandleMessage(ctx context.Context, hm HandleMessage) (err error) _ = s.handleQueue.CloseThread(threadId) }() } - return s.handleQueue.Add(ctx, threadId, hm) + err = s.handleQueue.Add(ctx, threadId, hm) + if err == mb.ErrOverflowed { + log.InfoCtx(ctx, "queue overflowed", zap.String("spaceId", s.id), zap.String("objectId", threadId)) + // skip overflowed error + return nil + } + return } func (s *space) handleMessage(msg HandleMessage) { diff --git a/net/streampool/sendpool.go b/net/streampool/sendpool.go index 899da5c5..5bf075e7 100644 --- a/net/streampool/sendpool.go +++ b/net/streampool/sendpool.go @@ -6,11 +6,11 @@ import ( "go.uber.org/zap" ) -// newStreamSender creates new sendPool +// newExecPool creates new execPool // workers - how many processes will execute tasks // maxSize - limit for queue size -func newStreamSender(workers, maxSize int) *sendPool { - ss := &sendPool{ +func newExecPool(workers, maxSize int) *execPool { + ss := &execPool{ batch: mb.New[func()](maxSize), } for i := 0; i < workers; i++ { @@ -19,16 +19,16 @@ func newStreamSender(workers, maxSize int) *sendPool { return ss } -// sendPool needed for parallel execution of the incoming send tasks -type sendPool struct { +// execPool needed for parallel execution of the incoming send tasks +type execPool struct { batch *mb.MB[func()] } -func (ss *sendPool) Add(ctx context.Context, f ...func()) (err error) { +func (ss *execPool) Add(ctx context.Context, f ...func()) (err error) { return ss.batch.Add(ctx, f...) } -func (ss *sendPool) sendLoop() { +func (ss *execPool) sendLoop() { for { f, err := ss.batch.WaitOne(context.Background()) if err != nil { @@ -39,6 +39,6 @@ func (ss *sendPool) sendLoop() { } } -func (ss *sendPool) Close() (err error) { +func (ss *execPool) Close() (err error) { return ss.batch.Close() } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index a846ef51..fb53cd69 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -50,8 +50,9 @@ type streamPool struct { streamIdsByTag map[string][]uint32 streams map[uint32]*stream opening map[string]*openingProcess - exec *sendPool - mu sync.RWMutex + exec *execPool + dial *execPool + mu sync.Mutex lastStreamId uint32 } @@ -59,11 +60,6 @@ type openingProcess struct { ch chan struct{} err error } -type handleMessage struct { - ctx context.Context - msg drpc.Message - peerId string -} func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { st := s.addStream(peerId, drpcStream, tags...) @@ -108,7 +104,7 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter Peer } } } - return s.exec.Add(ctx, func() { + return s.dial.Add(ctx, func() { peers, dialErr := peerGetter(ctx) if dialErr != nil { log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr)) diff --git a/net/streampool/streampoolservice.go b/net/streampool/streampoolservice.go index e4d5965f..fe6c9ce2 100644 --- a/net/streampool/streampoolservice.go +++ b/net/streampool/streampoolservice.go @@ -28,7 +28,8 @@ func (s *service) NewStreamPool(h StreamHandler) StreamPool { streamIdsByTag: map[string][]uint32{}, streams: map[uint32]*stream{}, opening: map[string]*openingProcess{}, - exec: newStreamSender(10, 100), + exec: newExecPool(10, 100), + dial: newExecPool(4, 100), } return sp }