use multiqueue for stream + MessageQueueId interface
This commit is contained in:
parent
34c6ce261e
commit
3d4ac4d130
@ -3,7 +3,7 @@ package streampool
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/anytypeio/any-sync/util/multiqueue"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/drpc"
|
||||
"sync/atomic"
|
||||
@ -17,12 +17,16 @@ type stream struct {
|
||||
streamId uint32
|
||||
closed atomic.Bool
|
||||
l logger.CtxLogger
|
||||
queue *mb.MB[drpc.Message]
|
||||
queue multiqueue.MultiQueue[drpc.Message]
|
||||
tags []string
|
||||
}
|
||||
|
||||
func (sr *stream) write(msg drpc.Message) (err error) {
|
||||
return sr.queue.TryAdd(msg)
|
||||
var queueId string
|
||||
if qId, ok := msg.(MessageQueueId); ok {
|
||||
queueId = qId.MessageQueueId()
|
||||
}
|
||||
return sr.queue.Add(sr.stream.Context(), queueId, msg)
|
||||
}
|
||||
|
||||
func (sr *stream) readLoop() error {
|
||||
@ -45,20 +49,13 @@ func (sr *stream) readLoop() error {
|
||||
}
|
||||
}
|
||||
|
||||
func (sr *stream) writeLoop() error {
|
||||
defer func() {
|
||||
func (sr *stream) writeToStream(msg drpc.Message) {
|
||||
if err := sr.stream.MsgSend(msg, EncodingProto); err != nil {
|
||||
sr.l.Warn("msg send error", zap.Error(err))
|
||||
sr.streamClose()
|
||||
}()
|
||||
sr.l.Debug("stream write started")
|
||||
for {
|
||||
msg, err := sr.queue.WaitOne(sr.stream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil {
|
||||
return err
|
||||
}
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (sr *stream) streamClose() {
|
||||
|
||||
@ -4,7 +4,7 @@ import (
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/pool"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/anytypeio/any-sync/util/multiqueue"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/net/context"
|
||||
@ -25,6 +25,10 @@ type StreamHandler interface {
|
||||
// PeerGetter should dial or return cached peers
|
||||
type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)
|
||||
|
||||
type MessageQueueId interface {
|
||||
MessageQueueId() string
|
||||
}
|
||||
|
||||
// StreamPool keeps and read streams
|
||||
type StreamPool interface {
|
||||
// AddStream adds new outgoing stream into the pool
|
||||
@ -115,19 +119,14 @@ func (s *streamPool) addStream(drpcStream drpc.Stream, tags ...string) (*stream,
|
||||
pool: s,
|
||||
streamId: streamId,
|
||||
l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)),
|
||||
queue: mb.New[drpc.Message](queueSize),
|
||||
tags: tags,
|
||||
}
|
||||
st.queue = multiqueue.New[drpc.Message](st.writeToStream, s.writeQueueSize)
|
||||
s.streams[streamId] = st
|
||||
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
|
||||
for _, tag := range tags {
|
||||
s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId)
|
||||
}
|
||||
go func() {
|
||||
if err := st.writeLoop(); err != nil {
|
||||
st.l.Info("stream closed", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
return st, nil
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user