From a48e8f357afc2c772d63d570065c30f835a3f13c Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 15 Sep 2022 14:11:49 +0200 Subject: [PATCH] Update peer reading logic --- common/commonspace/rpchandler.go | 10 +++++-- common/commonspace/space.go | 2 +- common/commonspace/syncservice/streampool.go | 22 +++++++++++++--- common/commonspace/syncservice/syncservice.go | 26 ++++++++++++------- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index e94df0b8..b0d0be69 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -19,6 +19,12 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR return remotediff.HandlerRangeRequest(ctx, r.s.diff, req) } -func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { - return r.s.SyncService().StreamPool().AddStream(stream) +func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { + err = r.s.SyncService().StreamPool().AddAndReadStream(stream) + if err != nil { + return + } + + <-stream.Context().Done() + return } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 58b4391e..757b1f32 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -151,5 +151,5 @@ func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) { func (s *space) Close() error { s.periodicSync.Close() - return nil + return s.syncService.Close() } diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 47d3063c..6b402294 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -18,9 +18,10 @@ const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { - AddStream(stream spacesyncproto.SpaceStream) (err error) + AddAndReadStream(stream spacesyncproto.SpaceStream) (err error) HasStream(peerId string) bool SyncClient + Close() (err error) } type SyncClient interface { @@ -34,12 +35,14 @@ type streamPool struct { sync.Mutex peerStreams map[string]spacesyncproto.SpaceStream messageHandler MessageHandler + wg *sync.WaitGroup } func newStreamPool(messageHandler MessageHandler) StreamPool { return &streamPool{ peerStreams: make(map[string]spacesyncproto.SpaceStream), messageHandler: messageHandler, + wg: &sync.WaitGroup{}, } } @@ -104,7 +107,7 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) ( return nil } -func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) { +func (s *streamPool) AddAndReadStream(stream spacesyncproto.SpaceStream) (err error) { s.Lock() peerId, err := getPeerIdFromStream(stream) if err != nil { @@ -113,12 +116,25 @@ func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) { } s.peerStreams[peerId] = stream + s.wg.Add(1) s.Unlock() - return s.readPeerLoop(peerId, stream) + go s.readPeerLoop(peerId, stream) + return +} + +func (s *streamPool) Close() (err error) { + s.Lock() + wg := s.wg + s.Unlock() + if wg != nil { + wg.Wait() + } + return nil } func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { + defer s.wg.Done() limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) for i := 0; i < maxSimultaneousOperationsPerStream; i++ { limiter <- struct{}{} diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 4570ce85..c3f85101 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -11,6 +11,7 @@ import ( type SyncService interface { NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) StreamPool() StreamPool + Close() (err error) } type syncService struct { @@ -20,6 +21,10 @@ type syncService struct { spaceId string } +func (s *syncService) Close() (err error) { + return s.streamPool.Close() +} + func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { msg := spacesyncproto.WrapHeadUpdate(update, header, treeId) peers, err := s.configuration.AllPeers(context.Background(), s.spaceId) @@ -27,17 +32,18 @@ func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, heade return } for _, peer := range peers { - if !s.streamPool.HasStream(peer.Id()) { - cl := spacesyncproto.NewDRPCSpaceClient(peer) - stream, err := cl.Stream(ctx) - if err != nil { - continue - } + if s.streamPool.HasStream(peer.Id()) { + continue + } + cl := spacesyncproto.NewDRPCSpaceClient(peer) + stream, err := cl.Stream(ctx) + if err != nil { + continue + } - s.streamPool.AddStream(stream) - if err != nil { - continue - } + err = s.streamPool.AddAndReadStream(stream) + if err != nil { + continue } } return s.streamPool.BroadcastAsync(msg)