Stream pool and synctree fixes
This commit is contained in:
parent
500725ffee
commit
453967fad8
@ -98,7 +98,7 @@ func (s *streamPool) SendSync(
|
|||||||
delete(s.waiters, msg.ReplyId)
|
delete(s.waiters, msg.ReplyId)
|
||||||
s.waitersMx.Unlock()
|
s.waitersMx.Unlock()
|
||||||
|
|
||||||
log.With("trackingId", msg.ReplyId).Error("time elapsed when waiting")
|
log.With("replyId", msg.ReplyId).Error("time elapsed when waiting")
|
||||||
err = ErrSyncTimeout
|
err = ErrSyncTimeout
|
||||||
case reply = <-waiter.ch:
|
case reply = <-waiter.ch:
|
||||||
if !delay.Stop() {
|
if !delay.Stop() {
|
||||||
@ -226,17 +226,17 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
|
|||||||
s.messageHandler(stream.Context(), peerId, msg)
|
s.messageHandler(stream.Context(), peerId, msg)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.With("trackingId", msg.ReplyId).Debug("getting message with tracking id")
|
log.With("replyId", msg.ReplyId).Debug("getting message with reply id")
|
||||||
s.waitersMx.Lock()
|
s.waitersMx.Lock()
|
||||||
waiter, exists := s.waiters[msg.ReplyId]
|
waiter, exists := s.waiters[msg.ReplyId]
|
||||||
|
|
||||||
if !exists {
|
if !exists {
|
||||||
log.With("trackingId", msg.ReplyId).Debug("tracking id not exists")
|
log.With("replyId", msg.ReplyId).Debug("reply id not exists")
|
||||||
s.waitersMx.Unlock()
|
s.waitersMx.Unlock()
|
||||||
s.messageHandler(stream.Context(), peerId, msg)
|
s.messageHandler(stream.Context(), peerId, msg)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.With("trackingId", msg.ReplyId).Debug("tracking id exists")
|
log.With("replyId", msg.ReplyId).Debug("reply id exists")
|
||||||
|
|
||||||
delete(s.waiters, msg.ReplyId)
|
delete(s.waiters, msg.ReplyId)
|
||||||
s.waitersMx.Unlock()
|
s.waitersMx.Unlock()
|
||||||
|
|||||||
@ -125,7 +125,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
|||||||
}
|
}
|
||||||
|
|
||||||
resp, err := deps.StreamPool.SendSync(peerId, objMsg)
|
resp, err := deps.StreamPool.SendSync(peerId, objMsg)
|
||||||
if resp != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
msg = &treechangeproto.TreeSyncMessage{}
|
msg = &treechangeproto.TreeSyncMessage{}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user