From 82eb76c1ea2ee373198a19f2039d3440763ee8c1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 3 Dec 2022 17:54:10 +0100 Subject: [PATCH] Fix syncservice loop --- common/commonspace/syncservice/syncservice.go | 23 +++++++----- common/nodeconf/confconnector.go | 36 ++++++++++++++----- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/common/commonspace/syncservice/syncservice.go b/common/commonspace/syncservice/syncservice.go index 1dfd9243..ed9f6427 100644 --- a/common/commonspace/syncservice/syncservice.go +++ b/common/commonspace/syncservice/syncservice.go @@ -97,17 +97,24 @@ func (s *syncService) HandleMessage(ctx context.Context, senderId string, messag func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { defer close(s.streamLoopDone) checkResponsiblePeers := func() { - s.log.Debug("dialing responsible peers") - respPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId) + var ( + activeNodeIds []string + configuration = s.connector.Configuration() + ) + for _, nodeId := range configuration.NodeIds(s.spaceId) { + if s.streamPool.HasActiveStream(nodeId) { + s.log.Debug("has active stream for", zap.String("id", nodeId)) + activeNodeIds = append(activeNodeIds, nodeId) + continue + } + } + newPeers, err := s.connector.DialResponsiblePeers(ctx, s.spaceId, activeNodeIds) if err != nil { s.log.Error("failed to dial peers", zap.Error(err)) return } - for _, p := range respPeers { - if s.streamPool.HasActiveStream(p.Id()) { - s.log.Debug("has active stream for", zap.String("id", p.Id())) - continue - } + + for _, p := range newPeers { stream, err := s.clientFactory.Client(p).Stream(ctx) if err != nil { err = rpcerr.Unwrap(err) @@ -123,7 +130,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) { s.log.Errorf("failed to send first message to stream: %v", err) continue } - s.log.Debug("continue reading stream for", zap.String("id", p.Id())) + s.log.Debug("reading stream for", zap.String("id", p.Id())) s.streamPool.AddAndReadStreamAsync(stream) } } diff --git a/common/nodeconf/confconnector.go b/common/nodeconf/confconnector.go index bb2c302a..e970b327 100644 --- a/common/nodeconf/confconnector.go +++ b/common/nodeconf/confconnector.go @@ -4,11 +4,13 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/slice" ) type ConfConnector interface { + Configuration() Configuration GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) - DialResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) + DialResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) } type confConnector struct { @@ -20,21 +22,36 @@ func NewConfConnector(conf Configuration, pool pool.Pool) ConfConnector { return &confConnector{conf: conf, pool: pool} } -func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { - return s.connectOneOrMany(ctx, spaceId, s.pool.Get, s.pool.GetOneOf) +func (s *confConnector) Configuration() Configuration { + return s.conf } -func (s *confConnector) DialResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { - return s.connectOneOrMany(ctx, spaceId, s.pool.Dial, s.pool.DialOneOf) +func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { + return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get, s.pool.GetOneOf) +} + +func (s *confConnector) DialResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) { + return s.connectOneOrMany(ctx, spaceId, activeNodeIds, s.pool.Dial, s.pool.DialOneOf) } func (s *confConnector) connectOneOrMany( - ctx context.Context, spaceId string, + ctx context.Context, + spaceId string, + activeNodeIds []string, connectOne func(context.Context, string) (peer.Peer, error), connectOneOf func(context.Context, []string) (peer.Peer, error)) (peers []peer.Peer, err error) { - allNodes := s.conf.NodeIds(spaceId) + var ( + inactiveNodeIds []string + allNodes = s.conf.NodeIds(spaceId) + ) + for _, id := range allNodes { + if slice.FindPos(activeNodeIds, id) == -1 { + inactiveNodeIds = append(inactiveNodeIds, id) + } + } + if s.conf.IsResponsible(spaceId) { - for _, id := range allNodes { + for _, id := range inactiveNodeIds { var p peer.Peer p, err = connectOne(ctx, id) if err != nil { @@ -42,7 +59,8 @@ func (s *confConnector) connectOneOrMany( } peers = append(peers, p) } - } else { + } else if len(activeNodeIds) == 0 { + // that means that all connected ids var p peer.Peer p, err = connectOneOf(ctx, allNodes) if err != nil {