Some streampool fixes
This commit is contained in:
parent
98b8584efc
commit
a103c960a4
@ -61,6 +61,12 @@ func (q *actionQueue) read() {
|
||||
return
|
||||
}
|
||||
for _, msg := range actions {
|
||||
select {
|
||||
case <-q.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
<-limiter
|
||||
go func(action ActionFunc) {
|
||||
err = action()
|
||||
|
||||
@ -247,50 +247,41 @@ func (s *streamPool) Close() (err error) {
|
||||
|
||||
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
var (
|
||||
msg *spacesyncproto.ObjectSyncMessage
|
||||
mx = sync.Mutex{}
|
||||
maxWaitMsgs = 500
|
||||
queue = make([]*spacesyncproto.ObjectSyncMessage, 0, 10)
|
||||
readers = 0
|
||||
log = log.With(zap.String("peerId", peerId))
|
||||
log = log.With(zap.String("peerId", peerId))
|
||||
queue = NewActionQueue()
|
||||
)
|
||||
queue.Run()
|
||||
|
||||
defer func() {
|
||||
log.Debug("stopped reading stream from peer")
|
||||
s.removePeer(peerId, stream)
|
||||
queue.Close()
|
||||
s.wg.Done()
|
||||
}()
|
||||
|
||||
log.Debug("started reading stream from peer")
|
||||
|
||||
readQueue := func() {
|
||||
defer func() {
|
||||
mx.Lock()
|
||||
readers--
|
||||
mx.Unlock()
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
mx.Lock()
|
||||
if len(queue) == 0 {
|
||||
mx.Unlock()
|
||||
return
|
||||
}
|
||||
newEl := queue[0]
|
||||
queue = queue[1:]
|
||||
mx.Unlock()
|
||||
|
||||
log := log.With(zap.String("replyId", newEl.ReplyId), zap.String("object id", newEl.ObjectId))
|
||||
log.Debug("getting message with reply id")
|
||||
err = s.messageHandler(stream.Context(), peerId, newEl)
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("message handling failed")
|
||||
}
|
||||
stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool {
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.ReplyId]
|
||||
if exists {
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
waiter.ch <- msg
|
||||
return true
|
||||
}
|
||||
s.waitersMx.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
process := func(msg *spacesyncproto.ObjectSyncMessage) error {
|
||||
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
|
||||
log.Debug("getting message with reply id")
|
||||
err = s.messageHandler(stream.Context(), peerId, msg)
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("message handling failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
@ -299,43 +290,25 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyn
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
var msg *spacesyncproto.ObjectSyncMessage
|
||||
msg, err = stream.Recv()
|
||||
s.lastUsage.Store(time.Now().Unix())
|
||||
if err != nil {
|
||||
stream.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if msg.ReplyId != "" {
|
||||
// then we can send it directly to waiters without adding to queue or starting a reader
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.ReplyId]
|
||||
if exists {
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
waiter.ch <- msg
|
||||
if stopWaiter(msg) {
|
||||
continue
|
||||
}
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
|
||||
}
|
||||
|
||||
mx.Lock()
|
||||
queue = append(queue, msg)
|
||||
if len(queue) > maxWaitMsgs {
|
||||
queue = queue[len(queue)-maxWaitMsgs:]
|
||||
}
|
||||
// if we already have max goroutines reading the queue in parallel
|
||||
if readers >= maxStreamReaders {
|
||||
mx.Unlock()
|
||||
continue
|
||||
}
|
||||
readers++
|
||||
mx.Unlock()
|
||||
|
||||
// starting another reader
|
||||
go readQueue()
|
||||
queue.Send(func() error {
|
||||
return process(msg)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user