diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index 683079f5..f93de528 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -243,7 +243,7 @@ func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "") + return s.syncClient.SendWithReply(ctx, peerId, headUpdate.RootChange.Id, headUpdate, "") } func (s *syncTree) afterBuild() { diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 7b4cb2a8..dab63e92 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -82,10 +82,11 @@ func (s *syncTreeHandler) handleHeadUpdate( fullRequest *treechangeproto.TreeSyncMessage isEmptyUpdate = len(update.Changes) == 0 objTree = s.objTree + treeId = objTree.Id() ) log := log.With( zap.Strings("update heads", update.Heads), - zap.String("treeId", objTree.Id()), + zap.String("treeId", treeId), zap.String("spaceId", s.spaceId), zap.Int("len(update changes)", len(update.Changes))) log.DebugCtx(ctx, "received head update message") @@ -118,7 +119,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId) + return s.syncClient.SendWithReply(ctx, senderId, treeId, fullRequest, replyId) } if s.alreadyHasHeads(objTree, update.Heads) { @@ -142,7 +143,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId) + return s.syncClient.SendWithReply(ctx, senderId, treeId, fullRequest, replyId) } func (s *syncTreeHandler) handleFullSyncRequest( @@ -154,11 +155,12 @@ func (s *syncTreeHandler) handleFullSyncRequest( fullResponse *treechangeproto.TreeSyncMessage header = s.objTree.Header() objTree = s.objTree + treeId = s.objTree.Id() ) log := log.With(zap.String("senderId", senderId), zap.Strings("request heads", request.Heads), - zap.String("treeId", s.objTree.Id()), + zap.String("treeId", treeId), zap.String("replyId", replyId), zap.String("spaceId", s.spaceId), zap.Int("len(request changes)", len(request.Changes))) @@ -167,7 +169,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( defer func() { if err != nil { log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error") - s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId) + s.syncClient.SendWithReply(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId) return } else if fullResponse != nil { cnt := fullResponse.Content.GetFullSyncResponse() @@ -190,7 +192,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( return } - return s.syncClient.SendWithReply(ctx, senderId, fullResponse, replyId) + return s.syncClient.SendWithReply(ctx, senderId, treeId, fullResponse, replyId) } func (s *syncTreeHandler) handleFullSyncResponse( @@ -199,10 +201,11 @@ func (s *syncTreeHandler) handleFullSyncResponse( response *treechangeproto.TreeFullSyncResponse) (err error) { var ( objTree = s.objTree + treeId = s.objTree.Id() ) log := log.With( zap.Strings("heads", response.Heads), - zap.String("treeId", s.objTree.Id()), + zap.String("treeId", treeId), zap.String("spaceId", s.spaceId), zap.Int("len(changes)", len(response.Changes))) log.DebugCtx(ctx, "received full sync response message") diff --git a/commonspace/object/tree/synctree/synctreehandler_test.go b/commonspace/object/tree/synctree/synctreehandler_test.go index b95ffc82..401f9989 100644 --- a/commonspace/object/tree/synctree/synctreehandler_test.go +++ b/commonspace/object/tree/synctree/synctreehandler_test.go @@ -154,7 +154,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { fx.syncClientMock.EXPECT(). CreateFullSyncRequest(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})). Return(fullRequest, nil) - fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullRequest), gomock.Eq("")) + fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullRequest), gomock.Eq("")) err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg) require.NoError(t, err) @@ -200,7 +200,7 @@ func TestSyncHandler_HandleHeadUpdate(t *testing.T) { fx.syncClientMock.EXPECT(). CreateFullSyncRequest(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})). Return(fullRequest, nil) - fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullRequest), gomock.Eq("")) + fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullRequest), gomock.Eq("")) err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg) require.NoError(t, err) @@ -266,7 +266,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { fx.syncClientMock.EXPECT(). CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})). Return(fullResponse, nil) - fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullResponse), gomock.Eq("")) + fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullResponse), gomock.Eq("")) err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg) require.NoError(t, err) @@ -295,7 +295,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { fx.syncClientMock.EXPECT(). CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})). Return(fullResponse, nil) - fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullResponse), gomock.Eq("")) + fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullResponse), gomock.Eq("")) err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg) require.NoError(t, err) @@ -322,7 +322,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { fx.syncClientMock.EXPECT(). CreateFullSyncResponse(gomock.Eq(fx.objectTreeMock), gomock.Eq([]string{"h1"}), gomock.Eq([]string{"h1"})). Return(fullResponse, nil) - fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(fullResponse), gomock.Eq(replyId)) + fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Eq(fullResponse), gomock.Eq(replyId)) err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg) require.NoError(t, err) @@ -357,7 +357,7 @@ func TestSyncHandler_HandleFullSyncRequest(t *testing.T) { RawChanges: []*treechangeproto.RawTreeChangeWithId{chWithId}, })). Return(objecttree.AddResult{}, fmt.Errorf("")) - fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Any(), gomock.Eq("")) + fx.syncClientMock.EXPECT().SendWithReply(gomock.Any(), gomock.Eq(senderId), gomock.Eq(treeId), gomock.Any(), gomock.Eq("")) err := fx.syncHandler.HandleMessage(ctx, senderId, objectMsg) require.Error(t, err) diff --git a/commonspace/objectsync/mock_objectsync/mock_objectsync.go b/commonspace/objectsync/mock_objectsync/mock_objectsync.go index b82360db..aaca46a5 100644 --- a/commonspace/objectsync/mock_objectsync/mock_objectsync.go +++ b/commonspace/objectsync/mock_objectsync/mock_objectsync.go @@ -124,15 +124,15 @@ func (mr *MockSyncClientMockRecorder) PeerManager() *gomock.Call { } // SendWithReply mocks base method. -func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1 string, arg2 *treechangeproto.TreeSyncMessage, arg3 string) error { +func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1, arg2 string, arg3 *treechangeproto.TreeSyncMessage, arg4 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(error) return ret0 } // SendWithReply indicates an expected call of SendWithReply. -func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3, arg4) } diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index e0f35893..f3c3fd74 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -84,30 +84,30 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() { - return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId) + return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId, msg.ObjectId) } } log.DebugCtx(ctx, "handling message") hasTree, err := s.spaceStorage.HasTree(msg.ObjectId) if err != nil { - return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId) + return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) } // in this case we will try to get it from remote, unless the sender also sent us the same request :-) if !hasTree { treeMsg := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, treeMsg) if err != nil { - return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ReplyId) + return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.ReplyId) } // this means that we don't have the tree locally and therefore can't return it if s.isEmptyFullSyncRequest(treeMsg) { - return s.sendError(ctx, treeMsg.RootChange, treechangeproto.ErrGetTree, senderId, msg.ReplyId) + return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.ReplyId) } } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { log.DebugCtx(ctx, "failed to get object") - return s.unmarshallSendError(ctx, msg, err, senderId) + return s.unmarshallSendError(ctx, msg, err, msg.ObjectId, senderId) } return obj.HandleMessage(ctx, senderId, msg) } @@ -116,18 +116,18 @@ func (s *objectSync) MessagePool() MessagePool { return s.messagePool } -func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId string) (err error) { +func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) (err error) { unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, unmarshalled) if err != nil { return } - return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, msg.ReplyId) + return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.ReplyId) } -func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, replyId string) (err error) { +func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) (err error) { resp := treechangeproto.WrapError(respErr, root) - return s.syncClient.SendWithReply(ctx, senderId, resp, replyId) + return s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId) } func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool { diff --git a/commonspace/objectsync/syncclient.go b/commonspace/objectsync/syncclient.go index 6ef203c0..4a95cae1 100644 --- a/commonspace/objectsync/syncclient.go +++ b/commonspace/objectsync/syncclient.go @@ -10,7 +10,7 @@ import ( type SyncClient interface { RequestFactory Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) - SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) + SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) PeerManager() peermanager.PeerManager } @@ -39,8 +39,8 @@ func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyn return s.peerManager.Broadcast(ctx, objMsg) } -func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) { - objMsg, err := MarshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, replyId) +func (s *syncClient) SendWithReply(ctx context.Context, peerId, objectId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) { + objMsg, err := MarshallTreeMessage(msg, s.spaceId, objectId, replyId) if err != nil { return }