Add timeout when waiting in streampool
This commit is contained in:
parent
2085e3ae40
commit
5b93c78ebd
@ -16,7 +16,10 @@ import (
|
|||||||
var ErrEmptyPeer = errors.New("don't have such a peer")
|
var ErrEmptyPeer = errors.New("don't have such a peer")
|
||||||
var ErrStreamClosed = errors.New("stream is already closed")
|
var ErrStreamClosed = errors.New("stream is already closed")
|
||||||
|
|
||||||
const maxSimultaneousOperationsPerStream = 10
|
var maxSimultaneousOperationsPerStream = 10
|
||||||
|
var syncWaitPeriod = 2 * time.Second
|
||||||
|
|
||||||
|
var ErrSyncTimeout = errors.New("too long wait on sync receive")
|
||||||
|
|
||||||
// StreamPool can be made generic to work with different streams
|
// StreamPool can be made generic to work with different streams
|
||||||
type StreamPool interface {
|
type StreamPool interface {
|
||||||
@ -79,7 +82,7 @@ func (s *streamPool) SendSync(
|
|||||||
|
|
||||||
s.waitersMx.Lock()
|
s.waitersMx.Lock()
|
||||||
waiter := responseWaiter{
|
waiter := responseWaiter{
|
||||||
ch: make(chan *spacesyncproto.ObjectSyncMessage),
|
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
||||||
}
|
}
|
||||||
s.waiters[msg.TrackingId] = waiter
|
s.waiters[msg.TrackingId] = waiter
|
||||||
s.waitersMx.Unlock()
|
s.waitersMx.Unlock()
|
||||||
@ -88,10 +91,20 @@ func (s *streamPool) SendSync(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.With("trackingId", msg.TrackingId).Debug("waiting for id")
|
delay := time.NewTimer(syncWaitPeriod)
|
||||||
// TODO: limit wait time here and remove the waiter
|
select {
|
||||||
reply = <-waiter.ch
|
case <-delay.C:
|
||||||
log.With("trackingId", msg.TrackingId).Debug("finished waiting for id")
|
s.waitersMx.Lock()
|
||||||
|
delete(s.waiters, msg.TrackingId)
|
||||||
|
s.waitersMx.Unlock()
|
||||||
|
|
||||||
|
log.With("trackingId", msg.TrackingId).Error("time elapsed when waiting")
|
||||||
|
err = ErrSyncTimeout
|
||||||
|
case reply = <-waiter.ch:
|
||||||
|
if !delay.Stop() {
|
||||||
|
<-delay.C
|
||||||
|
}
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -90,7 +90,7 @@ func (s *syncHandler) handleHeadUpdate(
|
|||||||
|
|
||||||
if fullRequest != nil {
|
if fullRequest != nil {
|
||||||
log.With("senderId", senderId).
|
log.With("senderId", senderId).
|
||||||
With("heads", update.Heads).
|
With("heads", fullRequest.GetContent().GetFullSyncRequest().Heads).
|
||||||
With("treeId", msg.TreeId).
|
With("treeId", msg.TreeId).
|
||||||
Debug("sending full sync request")
|
Debug("sending full sync request")
|
||||||
return s.syncClient.SendAsync([]string{senderId}, fullRequest)
|
return s.syncClient.SendAsync([]string{senderId}, fullRequest)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user