diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index c139845f..f3d872a7 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -16,7 +16,10 @@ import ( var ErrEmptyPeer = errors.New("don't have such a peer") var ErrStreamClosed = errors.New("stream is already closed") -const maxSimultaneousOperationsPerStream = 10 +var maxSimultaneousOperationsPerStream = 10 +var syncWaitPeriod = 2 * time.Second + +var ErrSyncTimeout = errors.New("too long wait on sync receive") // StreamPool can be made generic to work with different streams type StreamPool interface { @@ -79,7 +82,7 @@ func (s *streamPool) SendSync( s.waitersMx.Lock() waiter := responseWaiter{ - ch: make(chan *spacesyncproto.ObjectSyncMessage), + ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), } s.waiters[msg.TrackingId] = waiter s.waitersMx.Unlock() @@ -88,10 +91,20 @@ func (s *streamPool) SendSync( if err != nil { return } - log.With("trackingId", msg.TrackingId).Debug("waiting for id") - // TODO: limit wait time here and remove the waiter - reply = <-waiter.ch - log.With("trackingId", msg.TrackingId).Debug("finished waiting for id") + delay := time.NewTimer(syncWaitPeriod) + select { + case <-delay.C: + s.waitersMx.Lock() + delete(s.waiters, msg.TrackingId) + s.waitersMx.Unlock() + + log.With("trackingId", msg.TrackingId).Error("time elapsed when waiting") + err = ErrSyncTimeout + case reply = <-waiter.ch: + if !delay.Stop() { + <-delay.C + } + } return } diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index 16879b36..32c1b00e 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -90,7 +90,7 @@ func (s *syncHandler) handleHeadUpdate( if fullRequest != nil { log.With("senderId", senderId). - With("heads", update.Heads). + With("heads", fullRequest.GetContent().GetFullSyncRequest().Heads). With("treeId", msg.TreeId). Debug("sending full sync request") return s.syncClient.SendAsync([]string{senderId}, fullRequest)