Fix streampool waiters bug

This commit is contained in:
mcrakhman 2022-12-27 19:57:01 +01:00 committed by Mikhail Iudin
parent 17e37fb8fb
commit e18f71111a
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0

View File

@ -246,59 +246,61 @@ func (s *streamPool) Close() (err error) {
} }
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
log.With(zap.String("replyId", peerId)).Debug("reading stream from peer") log := log.With(zap.String("peerId", peerId))
defer s.wg.Done() defer func() {
log.Debug("stopped reading stream from peer")
s.removePeer(peerId, stream)
s.wg.Done()
}()
log.Debug("started reading stream from peer")
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{} limiter <- struct{}{}
} }
process := func(msg *spacesyncproto.ObjectSyncMessage) { process := func(msg *spacesyncproto.ObjectSyncMessage) {
s.lastUsage.Store(time.Now().Unix()) log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)). log.Debug("getting message with reply id")
Debug("getting message with reply id") err = s.messageHandler(stream.Context(), peerId, msg)
if msg.ReplyId == "" { if err != nil {
s.messageHandler(stream.Context(), peerId, msg) log.With(zap.Error(err)).Debug("message handling failed")
return
} }
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
if !exists {
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id not exists")
s.waitersMx.Unlock()
s.messageHandler(stream.Context(), peerId, msg)
return
}
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id exists")
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
} }
Loop:
for { for {
var msg *spacesyncproto.ObjectSyncMessage var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv() msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix()) s.lastUsage.Store(time.Now().Unix())
if err != nil { if err != nil {
break return
} }
if msg.ReplyId != "" {
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
if exists {
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
continue
}
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
}
select { select {
case <-limiter: case <-limiter:
case <-stream.Context().Done(): case <-stream.Context().Done():
log.Debug("stream context done") return
break Loop
} }
go func() {
go func(msg *spacesyncproto.ObjectSyncMessage) {
process(msg) process(msg)
limiter <- struct{}{} limiter <- struct{}{}
}() }(msg)
} }
log.With(zap.String("peerId", peerId)).Debug("stopped reading stream from peer")
s.removePeer(peerId, stream)
return
} }
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {