Update object sync error handling logic
This commit is contained in:
parent
533806893f
commit
3b3a0199bd
@ -167,7 +167,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
|||||||
defer func() {
|
defer func() {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
|
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
|
||||||
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(treechangeproto.ErrorCodes_FullSyncRequestError, header), replyId)
|
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId)
|
||||||
return
|
return
|
||||||
} else if fullResponse != nil {
|
} else if fullResponse != nil {
|
||||||
cnt := fullResponse.Content.GetFullSyncResponse()
|
cnt := fullResponse.Content.GetFullSyncResponse()
|
||||||
|
|||||||
@ -1,5 +1,7 @@
|
|||||||
package treechangeproto
|
package treechangeproto
|
||||||
|
|
||||||
|
import "github.com/anytypeio/any-sync/net/rpc/rpcerr"
|
||||||
|
|
||||||
func WrapHeadUpdate(update *TreeHeadUpdate, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
|
func WrapHeadUpdate(update *TreeHeadUpdate, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
|
||||||
return &TreeSyncMessage{
|
return &TreeSyncMessage{
|
||||||
Content: &TreeSyncContentValue{
|
Content: &TreeSyncContentValue{
|
||||||
@ -27,10 +29,10 @@ func WrapFullResponse(response *TreeFullSyncResponse, rootChange *RawTreeChangeW
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WrapError(code ErrorCodes, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
|
func WrapError(err error, rootChange *RawTreeChangeWithId) *TreeSyncMessage {
|
||||||
return &TreeSyncMessage{
|
return &TreeSyncMessage{
|
||||||
Content: &TreeSyncContentValue{
|
Content: &TreeSyncContentValue{
|
||||||
Value: &TreeSyncContentValue_ErrorResponse{ErrorResponse: &TreeErrorResponse{ErrCode: uint64(code)}},
|
Value: &TreeSyncContentValue_ErrorResponse{ErrorResponse: &TreeErrorResponse{ErrCode: rpcerr.Code(err)}},
|
||||||
},
|
},
|
||||||
RootChange: rootChange,
|
RootChange: rootChange,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -84,18 +84,30 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
|||||||
log = log.With(zap.Bool("isDeleted", true))
|
log = log.With(zap.Bool("isDeleted", true))
|
||||||
// preventing sync with other clients if they are not just syncing the settings tree
|
// preventing sync with other clients if they are not just syncing the settings tree
|
||||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||||
return spacesyncproto.ErrSpaceIsDeleted
|
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.DebugCtx(ctx, "handling message")
|
log.DebugCtx(ctx, "handling message")
|
||||||
|
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
|
||||||
|
if err != nil {
|
||||||
|
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId)
|
||||||
|
}
|
||||||
|
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
|
||||||
|
if !hasTree {
|
||||||
|
treeMsg := &treechangeproto.TreeSyncMessage{}
|
||||||
|
err = proto.Unmarshal(msg.Payload, treeMsg)
|
||||||
|
if err != nil {
|
||||||
|
return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ReplyId)
|
||||||
|
}
|
||||||
|
// this means that we don't have the tree locally and therefore can't return it
|
||||||
|
if s.isEmptyFullSyncRequest(treeMsg) {
|
||||||
|
return s.sendError(ctx, treeMsg.RootChange, treechangeproto.ErrGetTree, senderId, msg.ReplyId)
|
||||||
|
}
|
||||||
|
}
|
||||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.DebugCtx(ctx, "failed to get object")
|
log.DebugCtx(ctx, "failed to get object")
|
||||||
respErr := s.sendErrorResponse(ctx, msg, senderId)
|
return s.unmarshallSendError(ctx, msg, err, senderId)
|
||||||
if respErr != nil {
|
|
||||||
log.DebugCtx(ctx, "failed to send error response", zap.Error(respErr))
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
return obj.HandleMessage(ctx, senderId, msg)
|
return obj.HandleMessage(ctx, senderId, msg)
|
||||||
}
|
}
|
||||||
@ -104,12 +116,20 @@ func (s *objectSync) MessagePool() MessagePool {
|
|||||||
return s.messagePool
|
return s.messagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *objectSync) sendErrorResponse(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, senderId string) (err error) {
|
func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId string) (err error) {
|
||||||
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
unmarshalled := &treechangeproto.TreeSyncMessage{}
|
||||||
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
err = proto.Unmarshal(msg.Payload, unmarshalled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
resp := treechangeproto.WrapError(treechangeproto.ErrorCodes_GetTreeError, unmarshalled.RootChange)
|
return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, msg.ReplyId)
|
||||||
return s.syncClient.SendWithReply(ctx, senderId, resp, msg.ReplyId)
|
}
|
||||||
|
|
||||||
|
func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, replyId string) (err error) {
|
||||||
|
resp := treechangeproto.WrapError(respErr, root)
|
||||||
|
return s.syncClient.SendWithReply(ctx, senderId, resp, replyId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {
|
||||||
|
return len(msg.GetContent().GetFullSyncRequest().GetHeads()) == 0
|
||||||
}
|
}
|
||||||
|
|||||||
@ -81,6 +81,21 @@ func (mr *MockSpaceStorageMockRecorder) CreateTreeStorage(arg0 interface{}) *gom
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTreeStorage", reflect.TypeOf((*MockSpaceStorage)(nil).CreateTreeStorage), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTreeStorage", reflect.TypeOf((*MockSpaceStorage)(nil).CreateTreeStorage), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// HasTree mocks base method.
|
||||||
|
func (m *MockSpaceStorage) HasTree(arg0 string) (bool, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "HasTree", arg0)
|
||||||
|
ret0, _ := ret[0].(bool)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasTree indicates an expected call of HasTree.
|
||||||
|
func (mr *MockSpaceStorageMockRecorder) HasTree(arg0 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HasTree", reflect.TypeOf((*MockSpaceStorage)(nil).HasTree), arg0)
|
||||||
|
}
|
||||||
|
|
||||||
// Id mocks base method.
|
// Id mocks base method.
|
||||||
func (m *MockSpaceStorage) Id() string {
|
func (m *MockSpaceStorage) Id() string {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|||||||
@ -47,6 +47,7 @@ type SpaceStorage interface {
|
|||||||
StoredIds() ([]string, error)
|
StoredIds() ([]string, error)
|
||||||
TreeRoot(id string) (*treechangeproto.RawTreeChangeWithId, error)
|
TreeRoot(id string) (*treechangeproto.RawTreeChangeWithId, error)
|
||||||
TreeStorage(id string) (treestorage.TreeStorage, error)
|
TreeStorage(id string) (treestorage.TreeStorage, error)
|
||||||
|
HasTree(id string) (bool, error)
|
||||||
CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error)
|
CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error)
|
||||||
WriteSpaceHash(hash string) error
|
WriteSpaceHash(hash string) error
|
||||||
ReadSpaceHash() (hash string, err error)
|
ReadSpaceHash() (hash string, err error)
|
||||||
|
|||||||
@ -24,6 +24,10 @@ func RegisterErr(err error, code uint64) error {
|
|||||||
return errWithCode
|
return errWithCode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Code(err error) uint64 {
|
||||||
|
return drpcerr.Code(err)
|
||||||
|
}
|
||||||
|
|
||||||
func Err(code uint64) error {
|
func Err(code uint64) error {
|
||||||
err, ok := errsMap[code]
|
err, ok := errsMap[code]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user