Fix sending reply with empty rootchange
This commit is contained in:
parent
3b3a0199bd
commit
52816f0f44
@ -243,7 +243,7 @@ func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error)
|
|||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
||||||
return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "")
|
return s.syncClient.SendWithReply(ctx, peerId, headUpdate.RootChange.Id, headUpdate, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncTree) afterBuild() {
|
func (s *syncTree) afterBuild() {
|
||||||
|
|||||||
@ -82,10 +82,11 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
fullRequest *treechangeproto.TreeSyncMessage
|
fullRequest *treechangeproto.TreeSyncMessage
|
||||||
isEmptyUpdate = len(update.Changes) == 0
|
isEmptyUpdate = len(update.Changes) == 0
|
||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
|
treeId = objTree.Id()
|
||||||
)
|
)
|
||||||
log := log.With(
|
log := log.With(
|
||||||
zap.Strings("update heads", update.Heads),
|
zap.Strings("update heads", update.Heads),
|
||||||
zap.String("treeId", objTree.Id()),
|
zap.String("treeId", treeId),
|
||||||
zap.String("spaceId", s.spaceId),
|
zap.String("spaceId", s.spaceId),
|
||||||
zap.Int("len(update changes)", len(update.Changes)))
|
zap.Int("len(update changes)", len(update.Changes)))
|
||||||
log.DebugCtx(ctx, "received head update message")
|
log.DebugCtx(ctx, "received head update message")
|
||||||
@ -118,7 +119,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId)
|
return s.syncClient.SendWithReply(ctx, senderId, treeId, fullRequest, replyId)
|
||||||
}
|
}
|
||||||
|
|
||||||
if s.alreadyHasHeads(objTree, update.Heads) {
|
if s.alreadyHasHeads(objTree, update.Heads) {
|
||||||
@ -142,7 +143,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId)
|
return s.syncClient.SendWithReply(ctx, senderId, treeId, fullRequest, replyId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncTreeHandler) handleFullSyncRequest(
|
func (s *syncTreeHandler) handleFullSyncRequest(
|
||||||
@ -154,11 +155,12 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
fullResponse *treechangeproto.TreeSyncMessage
|
fullResponse *treechangeproto.TreeSyncMessage
|
||||||
header = s.objTree.Header()
|
header = s.objTree.Header()
|
||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
|
treeId = s.objTree.Id()
|
||||||
)
|
)
|
||||||
|
|
||||||
log := log.With(zap.String("senderId", senderId),
|
log := log.With(zap.String("senderId", senderId),
|
||||||
zap.Strings("request heads", request.Heads),
|
zap.Strings("request heads", request.Heads),
|
||||||
zap.String("treeId", s.objTree.Id()),
|
zap.String("treeId", treeId),
|
||||||
zap.String("replyId", replyId),
|
zap.String("replyId", replyId),
|
||||||
zap.String("spaceId", s.spaceId),
|
zap.String("spaceId", s.spaceId),
|
||||||
zap.Int("len(request changes)", len(request.Changes)))
|
zap.Int("len(request changes)", len(request.Changes)))
|
||||||
@ -167,7 +169,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
|
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
|
||||||
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId)
|
s.syncClient.SendWithReply(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId)
|
||||||
return
|
return
|
||||||
} else if fullResponse != nil {
|
} else if fullResponse != nil {
|
||||||
cnt := fullResponse.Content.GetFullSyncResponse()
|
cnt := fullResponse.Content.GetFullSyncResponse()
|
||||||
@ -190,7 +192,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.syncClient.SendWithReply(ctx, senderId, fullResponse, replyId)
|
return s.syncClient.SendWithReply(ctx, senderId, treeId, fullResponse, replyId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncTreeHandler) handleFullSyncResponse(
|
func (s *syncTreeHandler) handleFullSyncResponse(
|
||||||
@ -199,10 +201,11 @@ func (s *syncTreeHandler) handleFullSyncResponse(
|
|||||||
response *treechangeproto.TreeFullSyncResponse) (err error) {
|
response *treechangeproto.TreeFullSyncResponse) (err error) {
|
||||||
var (
|
var (
|
||||||
objTree = s.objTree
|
objTree = s.objTree
|
||||||
|
treeId = s.objTree.Id()
|
||||||
)
|
)
|
||||||
log := log.With(
|
log := log.With(
|
||||||
zap.Strings("heads", response.Heads),
|
zap.Strings("heads", response.Heads),
|
||||||
zap.String("treeId", s.objTree.Id()),
|
zap.String("treeId", treeId),
|
||||||
zap.String("spaceId", s.spaceId),
|
zap.String("spaceId", s.spaceId),
|
||||||
zap.Int("len(changes)", len(response.Changes)))
|
zap.Int("len(changes)", len(response.Changes)))
|
||||||
log.DebugCtx(ctx, "received full sync response message")
|
log.DebugCtx(ctx, "received full sync response message")
|
||||||
|
|||||||
@ -154,7 +154,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
|||||||
fx.syncClientMock.EXPECT().
|
fx.syncClientMock.EXPECT().
|
||||||
CreateFullSyncRequest(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
CreateFullSyncRequest(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||||
Return(fullRequest, nil)
|
Return(fullRequest, nil)
|
||||||
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullRequest), gomock.Eq(""))
|
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullRequest), gomock.Eq(""))
|
||||||
|
|
||||||
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -200,7 +200,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) {
|
|||||||
fx.syncClientMock.EXPECT().
|
fx.syncClientMock.EXPECT().
|
||||||
CreateFullSyncRequest(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
CreateFullSyncRequest(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||||
Return(fullRequest, nil)
|
Return(fullRequest, nil)
|
||||||
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullRequest), gomock.Eq(""))
|
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullRequest), gomock.Eq(""))
|
||||||
|
|
||||||
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -266,7 +266,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
|||||||
fx.syncClientMock.EXPECT().
|
fx.syncClientMock.EXPECT().
|
||||||
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||||
Return(fullResponse, nil)
|
Return(fullResponse, nil)
|
||||||
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullResponse), gomock.Eq(""))
|
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullResponse), gomock.Eq(""))
|
||||||
|
|
||||||
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -295,7 +295,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
|||||||
fx.syncClientMock.EXPECT().
|
fx.syncClientMock.EXPECT().
|
||||||
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||||
Return(fullResponse, nil)
|
Return(fullResponse, nil)
|
||||||
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullResponse), gomock.Eq(""))
|
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullResponse), gomock.Eq(""))
|
||||||
|
|
||||||
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -322,7 +322,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
|||||||
fx.syncClientMock.EXPECT().
|
fx.syncClientMock.EXPECT().
|
||||||
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})).
|
||||||
Return(fullResponse, nil)
|
Return(fullResponse, nil)
|
||||||
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullResponse), gomock.Eq(replyId))
|
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullResponse), gomock.Eq(replyId))
|
||||||
|
|
||||||
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
@ -357,7 +357,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) {
|
|||||||
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId},
|
||||||
})).
|
})).
|
||||||
Return(objecttree.AddResult{}, fmt.Errorf(""))
|
Return(objecttree.AddResult{}, fmt.Errorf(""))
|
||||||
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Any(), gomock.Eq(""))
|
fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Any(), gomock.Eq(""))
|
||||||
|
|
||||||
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|||||||
@ -124,15 +124,15 @@ func (mr *MockSyncClientMockRecorder) PeerManager() *gomock.Call {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SendWithReply mocks base method.
|
// SendWithReply mocks base method.
|
||||||
func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1 string, arg2 *treechangeproto.TreeSyncMessage, arg3 string) error {
|
func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1, arg2 string, arg3 *treechangeproto.TreeSyncMessage, arg4 string) error {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3)
|
ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3, arg4)
|
||||||
ret0, _ := ret[0].(error)
|
ret0, _ := ret[0].(error)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendWithReply indicates an expected call of SendWithReply.
|
// SendWithReply indicates an expected call of SendWithReply.
|
||||||
func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3, arg4)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -84,30 +84,30 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
|||||||
log = log.With(zap.Bool("isDeleted", true))
|
log = log.With(zap.Bool("isDeleted", true))
|
||||||
// preventing sync with other clients if they are not just syncing the settings tree
|
// preventing sync with other clients if they are not just syncing the settings tree
|
||||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||||
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId)
|
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId, msg.ObjectId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.DebugCtx(ctx, "handling message")
|
log.DebugCtx(ctx, "handling message")
|
||||||
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
|
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId)
|
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId)
|
||||||
}
|
}
|
||||||
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
|
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
|
||||||
if !hasTree {
|
if !hasTree {
|
||||||
treeMsg := &treechangeproto.TreeSyncMessage{}
|
treeMsg := &treechangeproto.TreeSyncMessage{}
|
||||||
err = proto.Unmarshal(msg.Payload, treeMsg)
|
err = proto.Unmarshal(msg.Payload, treeMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ReplyId)
|
return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.ReplyId)
|
||||||
}
|
}
|
||||||
// this means that we don't have the tree locally and therefore can't return it
|
// this means that we don't have the tree locally and therefore can't return it
|
||||||
if s.isEmptyFullSyncRequest(treeMsg) {
|
if s.isEmptyFullSyncRequest(treeMsg) {
|
||||||
return s.sendError(ctx, treeMsg.RootChange, treechangeproto.ErrGetTree, senderId, msg.ReplyId)
|
return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.ReplyId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.DebugCtx(ctx, "failed to get object")
|
log.DebugCtx(ctx, "failed to get object")
|
||||||
return s.unmarshallSendError(ctx, msg, err, senderId)
|
return s.unmarshallSendError(ctx, msg, err, msg.ObjectId, senderId)
|
||||||
}
|
}
|
||||||
return obj.HandleMessage(ctx, senderId, msg)
|
return obj.HandleMessage(ctx, senderId, msg)
|
||||||
}
|
}
|
||||||
@ -116,18 +116,18 @@ func (s *objectSync) MessagePool() MessagePool {
|
|||||||
return s.messagePool
|
return s.messagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId string) (err error) {
|
func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) (err error) {
|
||||||
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
||||||
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, msg.ReplyId)
|
return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.ReplyId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, replyId string) (err error) {
|
func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) (err error) {
|
||||||
resp := treechangeproto.WrapError(respErr, root)
|
resp := treechangeproto.WrapError(respErr, root)
|
||||||
return s.syncClient.SendWithReply(ctx, senderId, resp, replyId)
|
return s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {
|
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {
|
||||||
|
|||||||
@ -10,7 +10,7 @@ import (
|
|||||||
type SyncClient interface {
|
type SyncClient interface {
|
||||||
RequestFactory
|
RequestFactory
|
||||||
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
|
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
|
||||||
SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
|
SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
|
||||||
PeerManager() peermanager.PeerManager
|
PeerManager() peermanager.PeerManager
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,8 +39,8 @@ func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyn
|
|||||||
return s.peerManager.Broadcast(ctx, objMsg)
|
return s.peerManager.Broadcast(ctx, objMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||||
objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, replyId)
|
objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, replyId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user