From b0b75ac8eaeccf724f710dd5b23ae37f52f5af72 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 21 Dec 2022 21:24:17 +0100 Subject: [PATCH] Fix sync bugs (connection with different nodes on stream and diff) --- .../commonspace/syncservice/streamchecker.go | 69 ++++++++++++++----- common/commonspace/syncservice/streampool.go | 2 +- common/commonspace/synctree/syncclient.go | 6 ++ common/commonspace/synctree/synctree.go | 7 ++ common/nodeconf/confconnector.go | 15 +++- .../nodeconf/mock_nodeconf/mock_nodeconf.go | 15 ++++ 6 files changed, 95 insertions(+), 19 deletions(-) diff --git a/common/commonspace/syncservice/streamchecker.go b/common/commonspace/syncservice/streamchecker.go index 85bd928d..1ff06f7a 100644 --- a/common/commonspace/syncservice/streamchecker.go +++ b/common/commonspace/syncservice/streamchecker.go @@ -2,16 +2,20 @@ package syncservice import ( "context" + "fmt" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/exp/slices" "time" ) type StreamChecker interface { CheckResponsiblePeers() + CheckPeerConnection(peerId string) (err error) } type streamChecker struct { @@ -72,27 +76,58 @@ func (s *streamChecker) CheckResponsiblePeers() { } for _, p := range newPeers { - stream, err := s.clientFactory.Client(p).Stream(s.syncCtx) + err := s.createStream(p) if err != nil { - err = rpcerr.Unwrap(err) - s.log.Error("failed to open stream", zap.Error(err)) - // so here probably the request is failed because there is no such space, - // but diffService should handle such cases by sending pushSpace - continue - } - // sending empty message for the server to understand from which space is it coming - err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) - if err != nil { - err = rpcerr.Unwrap(err) - s.log.Error("failed to send first message to stream", zap.Error(err)) - continue - } - err = s.streamPool.AddAndReadStreamAsync(stream) - if err != nil { - s.log.Error("failed to read from stream async", zap.Error(err)) + log.With(zap.Error(err)).Error("failed to create stream") continue } s.log.Debug("reading stream for", zap.String("id", p.Id())) } return } + +func (s *streamChecker) CheckPeerConnection(peerId string) (err error) { + if s.streamPool.HasActiveStream(peerId) { + return + } + + var ( + configuration = s.connector.Configuration() + pool = s.connector.Pool() + ) + nodeIds := configuration.NodeIds(s.spaceId) + // we don't know the address of the peer + if !slices.Contains(nodeIds, peerId) { + err = fmt.Errorf("don't know the address of peer %s", peerId) + return + } + + newPeer, err := pool.Dial(s.syncCtx, peerId) + if err != nil { + return + } + return s.createStream(newPeer) +} + +func (s *streamChecker) createStream(p peer.Peer) (err error) { + stream, err := s.clientFactory.Client(p).Stream(s.syncCtx) + if err != nil { + // so here probably the request is failed because there is no such space, + // but diffService should handle such cases by sending pushSpace + err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err)) + return + } + + // sending empty message for the server to understand from which space is it coming + err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) + if err != nil { + err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err)) + return + } + err = s.streamPool.AddAndReadStreamAsync(stream) + if err != nil { + err = fmt.Errorf("failed to read from stream async: %w", err) + return + } + return +} diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 4191aab0..b1bc0f09 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -125,7 +125,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn streams := getStreams() s.Unlock() - log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))). + log.With(zap.String("objectId", message.ObjectId), zap.Int("existing peers len", len(streams)), zap.Strings("wanted peers", peers)). Debug("sending message to peers") for _, stream := range streams { err = stream.Send(message) diff --git a/common/commonspace/synctree/syncclient.go b/common/commonspace/synctree/syncclient.go index ee49b14c..1091bf51 100644 --- a/common/commonspace/synctree/syncclient.go +++ b/common/commonspace/synctree/syncclient.go @@ -50,6 +50,11 @@ func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (e } func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { + err = s.checker.CheckPeerConnection(peerId) + if err != nil { + return + } + objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId) if err != nil { return @@ -62,6 +67,7 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr if err != nil { return } + if s.configuration.IsResponsible(s.spaceId) { s.checker.CheckResponsiblePeers() return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg) diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 3490acbd..2dfe90d4 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -138,16 +138,23 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t if err != nil { return } + newTreeRequest := GetRequestFactory().CreateNewTreeRequest() objMsg, err := marshallTreeMessage(newTreeRequest, id, "") if err != nil { return } + err = deps.SyncService.StreamChecker().CheckPeerConnection(peerId) + if err != nil { + return + } + resp, err := deps.SyncService.StreamPool().SendSync(peerId, objMsg) if err != nil { return } + msg = &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(resp.Payload, msg) return diff --git a/common/nodeconf/confconnector.go b/common/nodeconf/confconnector.go index c797fb3f..a4ac839c 100644 --- a/common/nodeconf/confconnector.go +++ b/common/nodeconf/confconnector.go @@ -9,6 +9,7 @@ import ( type ConfConnector interface { Configuration() Configuration + Pool() pool.Pool GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error) } @@ -23,9 +24,14 @@ func NewConfConnector(conf Configuration, pool pool.Pool) ConfConnector { } func (s *confConnector) Configuration() Configuration { + // TODO: think about rewriting this, because these deps should not be exposed return s.conf } +func (s *confConnector) Pool() pool.Pool { + return s.pool +} + func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) { return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get, s.pool.GetOneOf) } @@ -62,7 +68,14 @@ func (s *confConnector) connectOneOrMany( } else if len(activeNodeIds) == 0 { // that means that all connected ids var p peer.Peer - p, err = connectOneOf(ctx, allNodes) + p, err = s.pool.GetOneOf(ctx, allNodes) + if err != nil { + return + } + + // if we are dialling someone, we want to dial to the same peer which we cached + // thus communication through streams and through diff will go to the same node + p, err = connectOne(ctx, p.Id()) if err != nil { return } diff --git a/common/nodeconf/mock_nodeconf/mock_nodeconf.go b/common/nodeconf/mock_nodeconf/mock_nodeconf.go index 88e7505d..49bc9e84 100644 --- a/common/nodeconf/mock_nodeconf/mock_nodeconf.go +++ b/common/nodeconf/mock_nodeconf/mock_nodeconf.go @@ -10,6 +10,7 @@ import ( app "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" peer "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" + pool "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool" nodeconf "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" gomock "github.com/golang/mock/gomock" ) @@ -238,3 +239,17 @@ func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interfac mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1) } + +// Pool mocks base method. +func (m *MockConfConnector) Pool() pool.Pool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Pool") + ret0, _ := ret[0].(pool.Pool) + return ret0 +} + +// Pool indicates an expected call of Pool. +func (mr *MockConfConnectorMockRecorder) Pool() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pool", reflect.TypeOf((*MockConfConnector)(nil).Pool)) +}