Some streampool fixes

This commit is contained in:
mcrakhman 2022-12-28 17:52:10 +01:00 committed by Mikhail Iudin
parent 5794ea1650
commit bf146e6676
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 38 additions and 59 deletions

View File

@ -61,6 +61,12 @@ func (q *actionQueue) read() {
return return
} }
for _, msg := range actions { for _, msg := range actions {
select {
case <-q.ctx.Done():
return
default:
}
<-limiter <-limiter
go func(action ActionFunc) { go func(action ActionFunc) {
err = action() err = action()

View File

@ -247,95 +247,68 @@ func (s *streamPool) Close() (err error) {
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
var ( var (
msg *spacesyncproto.ObjectSyncMessage
mx = sync.Mutex{}
maxWaitMsgs = 500
queue = make([]*spacesyncproto.ObjectSyncMessage, 0, 10)
readers = 0
log = log.With(zap.String("peerId", peerId)) log = log.With(zap.String("peerId", peerId))
queue = NewActionQueue()
) )
queue.Run()
defer func() { defer func() {
log.Debug("stopped reading stream from peer") log.Debug("stopped reading stream from peer")
s.removePeer(peerId, stream) s.removePeer(peerId, stream)
queue.Close()
s.wg.Done() s.wg.Done()
}() }()
log.Debug("started reading stream from peer") log.Debug("started reading stream from peer")
readQueue := func() { stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool {
defer func() {
mx.Lock()
readers--
mx.Unlock()
}()
for {
select {
case <-stream.Context().Done():
return
default:
}
mx.Lock()
if len(queue) == 0 {
mx.Unlock()
return
}
newEl := queue[0]
queue = queue[1:]
mx.Unlock()
log := log.With(zap.String("replyId", newEl.ReplyId), zap.String("object id", newEl.ObjectId))
log.Debug("getting message with reply id")
err = s.messageHandler(stream.Context(), peerId, newEl)
if err != nil {
log.With(zap.Error(err)).Debug("message handling failed")
}
}
}
for {
select {
case <-stream.Context().Done():
return
default:
}
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
return
}
if msg.ReplyId != "" {
// then we can send it directly to waiters without adding to queue or starting a reader
s.waitersMx.Lock() s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId] waiter, exists := s.waiters[msg.ReplyId]
if exists { if exists {
delete(s.waiters, msg.ReplyId) delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock() s.waitersMx.Unlock()
waiter.ch <- msg waiter.ch <- msg
continue return true
} }
s.waitersMx.Unlock() s.waitersMx.Unlock()
return false
}
process := func(msg *spacesyncproto.ObjectSyncMessage) error {
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
log.Debug("getting message with reply id")
err = s.messageHandler(stream.Context(), peerId, msg)
if err != nil {
log.With(zap.Error(err)).Debug("message handling failed")
}
return nil
}
for {
select {
case <-stream.Context().Done():
return
default:
}
var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
stream.Close()
return
}
if msg.ReplyId != "" {
// then we can send it directly to waiters without adding to queue or starting a reader
if stopWaiter(msg) {
continue
}
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
} }
mx.Lock() queue.Send(func() error {
queue = append(queue, msg) return process(msg)
if len(queue) > maxWaitMsgs { })
queue = queue[len(queue)-maxWaitMsgs:]
}
// if we already have max goroutines reading the queue in parallel
if readers >= maxStreamReaders {
mx.Unlock()
continue
}
readers++
mx.Unlock()
// starting another reader
go readQueue()
} }
} }