Fix send error problem

This commit is contained in:
mcrakhman 2023-05-17 13:13:23 +02:00 committed by Mikhail Iudin
parent 743a7b8d2f
commit 6e48b2dcff
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 31 additions and 12 deletions

View File

@ -103,6 +103,7 @@ func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *s
return
}
log.WarnCtx(ctx, "reply id does not exist", zap.String("replyId", msg.ReplyId))
return
}
return s.messageHandler(ctx, senderId, msg)
}

View File

@ -3,6 +3,7 @@ package objectsync
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/gogo/protobuf/proto"
"sync/atomic"
@ -87,51 +88,68 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
log = log.With(zap.Bool("isDeleted", true))
// preventing sync with other clients if they are not just syncing the settings tree
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrSpaceIsDeleted, senderId, msg.ObjectId)
s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId)
return fmt.Errorf("can't perform operation with object, space is deleted")
}
}
log.DebugCtx(ctx, "handling message")
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
if err != nil {
return s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId)
s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId)
return fmt.Errorf("falied to execute get operation on storage has tree: %w", err)
}
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
if !hasTree {
treeMsg := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, treeMsg)
if err != nil {
return s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId)
s.sendError(ctx, nil, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId, msg.RequestId)
return fmt.Errorf("failed to unmarshall tree sync message: %w", err)
}
// this means that we don't have the tree locally and therefore can't return it
if s.isEmptyFullSyncRequest(treeMsg) {
return s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId)
err = treechangeproto.ErrGetTree
s.sendError(ctx, nil, treechangeproto.ErrGetTree, senderId, msg.ObjectId, msg.RequestId)
return fmt.Errorf("failed to get tree from storage on full sync: %w", err)
}
}
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
if err != nil {
log.WarnCtx(ctx, "failed to get object", zap.Error(err))
// TODO: write tests for object sync https://linear.app/anytype/issue/GO-1299/write-tests-for-commonspaceobjectsync
return s.unmarshallSendError(ctx, msg, err, senderId, msg.ObjectId)
s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId)
return fmt.Errorf("failed to get object from cache: %w", err)
}
return obj.HandleMessage(ctx, senderId, msg)
// TODO: unmarshall earlier
err = obj.HandleMessage(ctx, senderId, msg)
if err != nil {
s.unmarshallSendError(ctx, msg, spacesyncproto.ErrUnexpected, senderId, msg.ObjectId)
return fmt.Errorf("failed to handle message: %w", err)
}
return
}
func (s *objectSync) SyncClient() SyncClient {
return s.syncClient
}
func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) (err error) {
func (s *objectSync) unmarshallSendError(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage, respErr error, senderId, objectId string) {
unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(msg.Payload, unmarshalled)
err := proto.Unmarshal(msg.Payload, unmarshalled)
if err != nil {
return
}
return s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId)
s.sendError(ctx, unmarshalled.RootChange, respErr, senderId, objectId, msg.RequestId)
}
func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) (err error) {
func (s *objectSync) sendError(ctx context.Context, root *treechangeproto.RawTreeChangeWithId, respErr error, senderId, objectId, replyId string) {
// we don't send errors if have no reply id, this can lead to bugs and also nobody needs this error
if replyId == "" {
return
}
resp := treechangeproto.WrapError(respErr, root)
return s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId)
if err := s.syncClient.SendWithReply(ctx, senderId, objectId, resp, replyId); err != nil {
log.InfoCtx(ctx, "failed to send error to client")
}
}
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {