Fix sync bugs (connection with different nodes on stream and diff)
This commit is contained in:
parent
86359af3be
commit
c81724c36f
@ -2,16 +2,20 @@ package syncservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"time"
|
||||
)
|
||||
|
||||
type StreamChecker interface {
|
||||
CheckResponsiblePeers()
|
||||
CheckPeerConnection(peerId string) (err error)
|
||||
}
|
||||
|
||||
type streamChecker struct {
|
||||
@ -72,27 +76,58 @@ func (s *streamChecker) CheckResponsiblePeers() {
|
||||
}
|
||||
|
||||
for _, p := range newPeers {
|
||||
stream, err := s.clientFactory.Client(p).Stream(s.syncCtx)
|
||||
err := s.createStream(p)
|
||||
if err != nil {
|
||||
err = rpcerr.Unwrap(err)
|
||||
s.log.Error("failed to open stream", zap.Error(err))
|
||||
// so here probably the request is failed because there is no such space,
|
||||
// but diffService should handle such cases by sending pushSpace
|
||||
continue
|
||||
}
|
||||
// sending empty message for the server to understand from which space is it coming
|
||||
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
|
||||
if err != nil {
|
||||
err = rpcerr.Unwrap(err)
|
||||
s.log.Error("failed to send first message to stream", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
err = s.streamPool.AddAndReadStreamAsync(stream)
|
||||
if err != nil {
|
||||
s.log.Error("failed to read from stream async", zap.Error(err))
|
||||
log.With(zap.Error(err)).Error("failed to create stream")
|
||||
continue
|
||||
}
|
||||
s.log.Debug("reading stream for", zap.String("id", p.Id()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamChecker) CheckPeerConnection(peerId string) (err error) {
|
||||
if s.streamPool.HasActiveStream(peerId) {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
configuration = s.connector.Configuration()
|
||||
pool = s.connector.Pool()
|
||||
)
|
||||
nodeIds := configuration.NodeIds(s.spaceId)
|
||||
// we don't know the address of the peer
|
||||
if !slices.Contains(nodeIds, peerId) {
|
||||
err = fmt.Errorf("don't know the address of peer %s", peerId)
|
||||
return
|
||||
}
|
||||
|
||||
newPeer, err := pool.Dial(s.syncCtx, peerId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.createStream(newPeer)
|
||||
}
|
||||
|
||||
func (s *streamChecker) createStream(p peer.Peer) (err error) {
|
||||
stream, err := s.clientFactory.Client(p).Stream(s.syncCtx)
|
||||
if err != nil {
|
||||
// so here probably the request is failed because there is no such space,
|
||||
// but diffService should handle such cases by sending pushSpace
|
||||
err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err))
|
||||
return
|
||||
}
|
||||
|
||||
// sending empty message for the server to understand from which space is it coming
|
||||
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err))
|
||||
return
|
||||
}
|
||||
err = s.streamPool.AddAndReadStreamAsync(stream)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to read from stream async: %w", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -125,7 +125,7 @@ func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyn
|
||||
streams := getStreams()
|
||||
s.Unlock()
|
||||
|
||||
log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))).
|
||||
log.With(zap.String("objectId", message.ObjectId), zap.Int("existing peers len", len(streams)), zap.Strings("wanted peers", peers)).
|
||||
Debug("sending message to peers")
|
||||
for _, stream := range streams {
|
||||
err = stream.Send(message)
|
||||
|
||||
@ -50,6 +50,11 @@ func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (e
|
||||
}
|
||||
|
||||
func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||
err = s.checker.CheckPeerConnection(peerId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId)
|
||||
if err != nil {
|
||||
return
|
||||
@ -62,6 +67,7 @@ func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.Tr
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.configuration.IsResponsible(s.spaceId) {
|
||||
s.checker.CheckResponsiblePeers()
|
||||
return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg)
|
||||
|
||||
@ -138,16 +138,23 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
|
||||
objMsg, err := marshallTreeMessage(newTreeRequest, id, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = deps.SyncService.StreamChecker().CheckPeerConnection(peerId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := deps.SyncService.StreamPool().SendSync(peerId, objMsg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
msg = &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(resp.Payload, msg)
|
||||
return
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
|
||||
type ConfConnector interface {
|
||||
Configuration() Configuration
|
||||
Pool() pool.Pool
|
||||
GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error)
|
||||
DialInactiveResponsiblePeers(ctx context.Context, spaceId string, activeNodeIds []string) ([]peer.Peer, error)
|
||||
}
|
||||
@ -23,9 +24,14 @@ func NewConfConnector(conf Configuration, pool pool.Pool) ConfConnector {
|
||||
}
|
||||
|
||||
func (s *confConnector) Configuration() Configuration {
|
||||
// TODO: think about rewriting this, because these deps should not be exposed
|
||||
return s.conf
|
||||
}
|
||||
|
||||
func (s *confConnector) Pool() pool.Pool {
|
||||
return s.pool
|
||||
}
|
||||
|
||||
func (s *confConnector) GetResponsiblePeers(ctx context.Context, spaceId string) ([]peer.Peer, error) {
|
||||
return s.connectOneOrMany(ctx, spaceId, nil, s.pool.Get, s.pool.GetOneOf)
|
||||
}
|
||||
@ -62,7 +68,14 @@ func (s *confConnector) connectOneOrMany(
|
||||
} else if len(activeNodeIds) == 0 {
|
||||
// that means that all connected ids
|
||||
var p peer.Peer
|
||||
p, err = connectOneOf(ctx, allNodes)
|
||||
p, err = s.pool.GetOneOf(ctx, allNodes)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// if we are dialling someone, we want to dial to the same peer which we cached
|
||||
// thus communication through streams and through diff will go to the same node
|
||||
p, err = connectOne(ctx, p.Id())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
|
||||
app "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
|
||||
peer "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
|
||||
pool "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
|
||||
nodeconf "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
)
|
||||
@ -252,3 +253,17 @@ func (mr *MockConfConnectorMockRecorder) GetResponsiblePeers(arg0, arg1 interfac
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetResponsiblePeers", reflect.TypeOf((*MockConfConnector)(nil).GetResponsiblePeers), arg0, arg1)
|
||||
}
|
||||
|
||||
// Pool mocks base method.
|
||||
func (m *MockConfConnector) Pool() pool.Pool {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Pool")
|
||||
ret0, _ := ret[0].(pool.Pool)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Pool indicates an expected call of Pool.
|
||||
func (mr *MockConfConnectorMockRecorder) Pool() *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Pool", reflect.TypeOf((*MockConfConnector)(nil).Pool))
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user