From 9109c4de1c339338db7bed37687d1fdeefe0f66c Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 17 May 2023 13:13:23 +0200 Subject: [PATCH] Fix send error problem --- commonspace/objectsync/msgpool.go | 1 + commonspace/objectsync/objectsync.go | 42 ++++++++++++++++++++-------- 2 files changed, 31 insertions(+), 12 deletions(-) diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index acce59f7..e62820cc 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -103,6 +103,7 @@ func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *s return } log.WarnCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId)) + return } return s.messageHandler(ctx, senderId, msg) } diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index f6f26630..fef1d84d 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -3,6 +3,7 @@ package objectsync import ( "context" + "fmt" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/gogo/protobuf/proto" "sync/atomic" @@ -87,51 +88,68 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() { - return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId, msg.ObjectId) + s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) + return fmt.Errorf("can't perform operation with object, space is deleted") } } log.DebugCtx(ctx, "handling message") hasTree, err := s.spaceStorage.HasTree(msg.ObjectId) if err != nil { - return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) + s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) + return fmt.Errorf("falied to execute get operation on storage has tree: %w", err) } // in this case we will try to get it from remote, unless the sender also sent us the same request :-) if !hasTree { treeMsg := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(msg.Payload, treeMsg) if err != nil { - return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId) + s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId) + return fmt.Errorf("failed to unmarshall tree sync message: %w", err) } // this means that we don't have the tree locally and therefore can't return it if s.isEmptyFullSyncRequest(treeMsg) { - return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId) + err = treechangeproto.ErrGetTree + s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId) + return fmt.Errorf("failed to get tree from storage on full sync: %w", err) } } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { - log.WarnCtx(ctx, "failed to get object", zap.Error(err)) // TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync - return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId) + s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) + return fmt.Errorf("failed to get object from cache: %w", err) } - return obj.HandleMessage(ctx, senderId, msg) + // TODO: unmarshall earlier + err = obj.HandleMessage(ctx, senderId, msg) + if err != nil { + s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId) + return fmt.Errorf("failed to handle message: %w", err) + } + return } func (s *objectSync) SyncClient() SyncClient { return s.syncClient } -func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) (err error) { +func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) { unmarshalled := &treechangeproto.TreeSyncMessage{} - err = proto.Unmarshal(msg.Payload, unmarshalled) + err := proto.Unmarshal(msg.Payload, unmarshalled) if err != nil { return } - return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId) + s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId) } -func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) (err error) { +func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) { + // we don't send errors if have no reply id, this can lead to bugs and also nobody needs this error + if replyId == "" { + return + } resp := treechangeproto.WrapError(respErr, root) - return s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId) + if err := s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId); err != nil { + log.InfoCtx(ctx, "failed to send error to client") + } } func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {