From ca1d0a43b1e8779ca426c934438ed377b713e466 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 16 Sep 2022 11:20:43 +0200 Subject: [PATCH] Update stream configuration and sync service --- common/commonspace/rpchandler.go | 2 +- common/commonspace/space.go | 17 +----- common/commonspace/syncservice/streampool.go | 9 ++- common/commonspace/syncservice/syncservice.go | 58 ++++++++++++------- common/nodeconf/configuration.go | 16 +++++ 5 files changed, 65 insertions(+), 37 deletions(-) diff --git a/common/commonspace/rpchandler.go b/common/commonspace/rpchandler.go index be83b43c..8d8aacac 100644 --- a/common/commonspace/rpchandler.go +++ b/common/commonspace/rpchandler.go @@ -20,5 +20,5 @@ func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncR } func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpace_StreamStream) (err error) { - return r.s.SyncService().StreamPool().AddAndReadStream(stream) + return r.s.SyncService().StreamPool().AddAndReadStreamSync(stream) } diff --git a/common/commonspace/space.go b/common/commonspace/space.go index bece7ca7..5723201b 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -148,7 +148,8 @@ func (s *space) testFill() { func (s *space) sync(ctx context.Context) error { st := time.Now() - peers, err := s.getPeers(ctx) + // diffing with responsible peers according to configuration + peers, err := s.nconf.ResponsiblePeers(ctx, s.id) if err != nil { return err } @@ -168,6 +169,7 @@ func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { if err != nil { return nil } + s.pingTreesInCache(ctx, newIds) s.pingTreesInCache(ctx, changedIds) @@ -181,19 +183,6 @@ func (s *space) pingTreesInCache(ctx context.Context, trees []string) { } } -func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) { - if s.nconf.IsResponsible(s.id) { - return s.nconf.AllPeers(ctx, s.id) - } else { - var p peer.Peer - p, err = s.nconf.OnePeer(ctx, s.id) - if err != nil { - return nil, err - } - return []peer.Peer{p}, nil - } -} - func (s *space) Close() error { s.periodicSync.Close() return s.syncService.Close() diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 0ccd2d6b..cbfb7424 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -17,7 +17,8 @@ const maxSimultaneousOperationsPerStream = 10 // StreamPool can be made generic to work with different streams type StreamPool interface { - AddAndReadStream(stream spacesyncproto.SpaceStream) (err error) + AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) + AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) HasStream(peerId string) bool SyncClient Close() (err error) @@ -151,7 +152,11 @@ func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) ( return nil } -func (s *streamPool) AddAndReadStream(stream spacesyncproto.SpaceStream) (err error) { +func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.SpaceStream) { + go s.AddAndReadStreamSync(stream) +} + +func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.SpaceStream) (err error) { s.Lock() peerId, err := GetPeerIdFromStreamContext(stream.Context()) if err != nil { diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index c3f85101..2c155fea 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -6,6 +6,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "time" ) type SyncService interface { @@ -14,39 +15,56 @@ type SyncService interface { Close() (err error) } +const respPeersStreamCheckInterval = time.Second * 10 + type syncService struct { - syncHandler SyncHandler - streamPool StreamPool - configuration nodeconf.Configuration - spaceId string + syncHandler SyncHandler + streamPool StreamPool + configuration nodeconf.Configuration + spaceId string + streamLoopCtx context.Context + stopStreamLoop context.CancelFunc +} + +func (s *syncService) Run() { + s.streamLoopCtx, s.stopStreamLoop = context.WithCancel(context.Background()) + s.streamCheckLoop(s.streamLoopCtx) } func (s *syncService) Close() (err error) { + s.stopStreamLoop() return s.streamPool.Close() } func (s *syncService) NotifyHeadUpdate(ctx context.Context, treeId string, header *aclpb.TreeHeader, update *spacesyncproto.ObjectHeadUpdate) (err error) { - msg := spacesyncproto.WrapHeadUpdate(update, header, treeId) - peers, err := s.configuration.AllPeers(context.Background(), s.spaceId) - if err != nil { - return - } - for _, peer := range peers { - if s.streamPool.HasStream(peer.Id()) { - continue - } - cl := spacesyncproto.NewDRPCSpaceClient(peer) - stream, err := cl.Stream(ctx) - if err != nil { - continue - } + return s.streamPool.BroadcastAsync(spacesyncproto.WrapHeadUpdate(update, header, treeId)) +} - err = s.streamPool.AddAndReadStream(stream) +func (s *syncService) streamCheckLoop(ctx context.Context) { + for { + respPeers, err := s.configuration.ResponsiblePeers(ctx, s.spaceId) if err != nil { continue } + for _, peer := range respPeers { + if s.streamPool.HasStream(peer.Id()) { + continue + } + cl := spacesyncproto.NewDRPCSpaceClient(peer) + stream, err := cl.Stream(ctx) + if err != nil { + continue + } + + s.streamPool.AddAndReadStreamAsync(stream) + } + select { + case <-time.After(respPeersStreamCheckInterval): + break + case <-ctx.Done(): + return + } } - return s.streamPool.BroadcastAsync(msg) } func (s *syncService) StreamPool() StreamPool { diff --git a/common/nodeconf/configuration.go b/common/nodeconf/configuration.go index 5c055bbb..597d1018 100644 --- a/common/nodeconf/configuration.go +++ b/common/nodeconf/configuration.go @@ -19,6 +19,8 @@ type Configuration interface { AllPeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) // OnePeer returns one of peer for spaceId OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) + // ResponsiblePeers returns peers for the space id that are responsible for the space + ResponsiblePeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) // NodeIds returns list of peerId for given spaceId NodeIds(spaceId string) []string // IsResponsible checks if current account responsible for given spaceId @@ -51,6 +53,20 @@ func (c *configuration) AllPeers(ctx context.Context, spaceId string) (peers []p return } +func (c *configuration) ResponsiblePeers(ctx context.Context, spaceId string) (peers []peer.Peer, err error) { + if c.IsResponsible(spaceId) { + return c.AllPeers(ctx, spaceId) + } else { + var one peer.Peer + one, err = c.OnePeer(ctx, spaceId) + if err != nil { + return + } + peers = []peer.Peer{one} + return + } +} + func (c *configuration) OnePeer(ctx context.Context, spaceId string) (p peer.Peer, err error) { nodeIds := c.NodeIds(spaceId) return c.pool.GetOneOf(ctx, nodeIds)