Update peer reading logic

This commit is contained in:
mcrakhman 2022-09-15 14:11:49 +02:00
parent 34ef43786c
commit a48e8f357a
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 44 additions and 16 deletions

View File

@ -19,6 +19,12 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR
return remotediff.HandlerRangeRequest(ctx, r.s.diff, req) return remotediff.HandlerRangeRequest(ctx, r.s.diff, req)
} }
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) {
return r.s.SyncService().StreamPool().AddStream(stream) err = r.s.SyncService().StreamPool().AddAndReadStream(stream)
if err != nil {
return
}
<-stream.Context().Done()
return
} }

View File

@ -151,5 +151,5 @@ func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) {
func (s *space) Close() error { func (s *space) Close() error {
s.periodicSync.Close() s.periodicSync.Close()
return nil return s.syncService.Close()
} }

View File

@ -18,9 +18,10 @@ const maxSimultaneousOperationsPerStream = 10
// StreamPool can be made generic to work with different streams // StreamPool can be made generic to work with different streams
type StreamPool interface { type StreamPool interface {
AddStream(stream spacesyncproto.SpaceStream) (err error) AddAndReadStream(stream spacesyncproto.SpaceStream) (err error)
HasStream(peerId string) bool HasStream(peerId string) bool
SyncClient SyncClient
Close() (err error)
} }
type SyncClient interface { type SyncClient interface {
@ -34,12 +35,14 @@ type streamPool struct {
sync.Mutex sync.Mutex
peerStreams map[string]spacesyncproto.SpaceStream peerStreams map[string]spacesyncproto.SpaceStream
messageHandler MessageHandler messageHandler MessageHandler
wg *sync.WaitGroup
} }
func newStreamPool(messageHandler MessageHandler) StreamPool { func newStreamPool(messageHandler MessageHandler) StreamPool {
return &streamPool{ return &streamPool{
peerStreams: make(map[string]spacesyncproto.SpaceStream), peerStreams: make(map[string]spacesyncproto.SpaceStream),
messageHandler: messageHandler, messageHandler: messageHandler,
wg: &sync.WaitGroup{},
} }
} }
@ -104,7 +107,7 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (
return nil return nil
} }
func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) AddAndReadStream(stream spacesyncproto.SpaceStream) (err error) {
s.Lock() s.Lock()
peerId, err := getPeerIdFromStream(stream) peerId, err := getPeerIdFromStream(stream)
if err != nil { if err != nil {
@ -113,12 +116,25 @@ func (s *streamPool) AddStream(stream spacesyncproto.SpaceStream) (err error) {
} }
s.peerStreams[peerId] = stream s.peerStreams[peerId] = stream
s.wg.Add(1)
s.Unlock() s.Unlock()
return s.readPeerLoop(peerId, stream) go s.readPeerLoop(peerId, stream)
return
}
func (s *streamPool) Close() (err error) {
s.Lock()
wg := s.wg
s.Unlock()
if wg != nil {
wg.Wait()
}
return nil
} }
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) { func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStream) (err error) {
defer s.wg.Done()
limiter := make(chan struct{}, maxSimultaneousOperationsPerStream) limiter := make(chan struct{}, maxSimultaneousOperationsPerStream)
for i := 0; i < maxSimultaneousOperationsPerStream; i++ { for i := 0; i < maxSimultaneousOperationsPerStream; i++ {
limiter <- struct{}{} limiter <- struct{}{}

View File

@ -11,6 +11,7 @@ import (
type SyncService interface { type SyncService interface {
NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error)
StreamPool() StreamPool StreamPool() StreamPool
Close() (err error)
} }
type syncService struct { type syncService struct {
@ -20,6 +21,10 @@ type syncService struct {
spaceId string spaceId string
} }
func (s *syncService) Close() (err error) {
return s.streamPool.Close()
}
func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) {
msg := spacesyncproto.WrapHeadUpdate(update, header, treeId) msg := spacesyncproto.WrapHeadUpdate(update, header, treeId)
peers, err := s.configuration.AllPeers(context.Background(), s.spaceId) peers, err := s.configuration.AllPeers(context.Background(), s.spaceId)
@ -27,19 +32,20 @@ func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, heade
return return
} }
for _, peer := range peers { for _, peer := range peers {
if !s.streamPool.HasStream(peer.Id()) { if s.streamPool.HasStream(peer.Id()) {
continue
}
cl := spacesyncproto.NewDRPCSpaceClient(peer) cl := spacesyncproto.NewDRPCSpaceClient(peer)
stream, err := cl.Stream(ctx) stream, err := cl.Stream(ctx)
if err != nil { if err != nil {
continue continue
} }
s.streamPool.AddStream(stream) err = s.streamPool.AddAndReadStream(stream)
if err != nil { if err != nil {
continue continue
} }
} }
}
return s.streamPool.BroadcastAsync(msg) return s.streamPool.BroadcastAsync(msg)
} }