Fix waiters in streampool

This commit is contained in:
mcrakhman 2022-12-27 19:59:27 +01:00
parent f86337ffc8
commit fa076ee83f
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B

View File

@ -246,59 +246,61 @@ func (s *streamPool) Close() (err error) {
}
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
log.With(zap.String("replyId", peerId)).Debug("reading stream from peer")
defer s.wg.Done()
log := log.With(zap.String("peerId", peerId))
defer func() {
log.Debug("stopped reading stream from peer")
s.removePeer(peerId, stream)
s.wg.Done()
}()
log.Debug("started reading stream from peer")
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{}
}
process := func(msg *spacesyncproto.ObjectSyncMessage) {
s.lastUsage.Store(time.Now().Unix())
log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)).
Debug("getting message with reply id")
if msg.ReplyId == "" {
s.messageHandler(stream.Context(), peerId, msg)
return
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
log.Debug("getting message with reply id")
err = s.messageHandler(stream.Context(), peerId, msg)
if err != nil {
log.With(zap.Error(err)).Debug("message handling failed")
}
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
if !exists {
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id not exists")
s.waitersMx.Unlock()
s.messageHandler(stream.Context(), peerId, msg)
return
}
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id exists")
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
}
Loop:
for {
var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
break
return
}
if msg.ReplyId != "" {
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
if exists {
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
continue
}
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
}
select {
case <-limiter:
case <-stream.Context().Done():
log.Debug("stream context done")
break Loop
return
}
go func() {
go func(msg *spacesyncproto.ObjectSyncMessage) {
process(msg)
limiter <- struct{}{}
}()
}(msg)
}
log.With(zap.String("peerId", peerId)).Debug("stopped reading stream from peer")
s.removePeer(peerId, stream)
return
}
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.SpaceStream) (err error) {