From 990cbc58b648f85261d7760a51e7a574772e0e8b Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sat, 3 Jun 2023 22:41:03 +0200 Subject: [PATCH] Add sync requests handling --- commonspace/objectsync/objectsync.go | 64 ++++++++++++++++++++++++---- commonspace/space.go | 5 +++ 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 44e5308c..9e3c3eba 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -5,11 +5,13 @@ import ( "context" "fmt" "github.com/anyproto/any-sync/app" + "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/spacestate" "github.com/anyproto/any-sync/metric" "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/util/multiqueue" "github.com/cheggaaa/mb/v3" + "github.com/gogo/protobuf/proto" "sync/atomic" "time" @@ -102,18 +104,12 @@ func (s *objectSync) LastUsage() time.Time { } func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) { - + return s.handleRequest(ctx, hm.SenderId, hm.Message) } func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) { threadId := hm.Message.ObjectId hm.ReceiveTime = time.Now() - if hm.Message.ReplyId != "" { - threadId += hm.Message.ReplyId - defer func() { - _ = s.handleQueue.CloseThread(threadId) - }() - } if hm.PeerCtx == nil { hm.PeerCtx = ctx } @@ -160,6 +156,29 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) { } } +func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) { + log := log.With( + zap.String("objectId", msg.ObjectId), + zap.String("requestId", msg.RequestId), + zap.String("replyId", msg.ReplyId)) + if s.spaceIsDeleted.Load() { + log = log.With(zap.Bool("isDeleted", true)) + // preventing sync with other clients if they are not just syncing the settings tree + if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() { + return nil, spacesyncproto.ErrSpaceIsDeleted + } + } + err = s.checkEmptyFullSync(log, msg) + if err != nil { + return nil, err + } + obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) + if err != nil { + return nil, treechangeproto.ErrGetTree + } + return obj.HandleRequest(ctx, senderId, msg) +} + func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { log := log.With( zap.String("objectId", msg.ObjectId), @@ -169,9 +188,13 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp log = log.With(zap.Bool("isDeleted", true)) // preventing sync with other clients if they are not just syncing the settings tree if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() { - return fmt.Errorf("can't perform operation with object, space is deleted") + return spacesyncproto.ErrSpaceIsDeleted } } + err = s.checkEmptyFullSync(log, msg) + if err != nil { + return err + } obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId) if err != nil { return fmt.Errorf("failed to get object from cache: %w", err) @@ -186,3 +209,28 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp func (s *objectSync) CloseThread(id string) (err error) { return s.handleQueue.CloseThread(id) } + +func (s *objectSync) checkEmptyFullSync(log logger.CtxLogger, msg *spacesyncproto.ObjectSyncMessage) (err error) { + hasTree, err := s.spaceStorage.HasTree(msg.ObjectId) + if err != nil { + log.Warn("failed to execute get operation on storage has tree", zap.Error(err)) + return spacesyncproto.ErrUnexpected + } + // in this case we will try to get it from remote, unless the sender also sent us the same request :-) + if !hasTree { + treeMsg := &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(msg.Payload, treeMsg) + if err != nil { + return nil + } + // this means that we don't have the tree locally and therefore can't return it + if s.isEmptyFullSyncRequest(treeMsg) { + return treechangeproto.ErrGetTree + } + } + return +} + +func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool { + return msg.GetContent().GetFullSyncRequest() != nil && len(msg.GetContent().GetFullSyncRequest().GetHeads()) == 0 +} diff --git a/commonspace/space.go b/commonspace/space.go index 02e6d8a9..32b1b04d 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -67,6 +67,7 @@ type Space interface { DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error) HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error) + HandleRequest(ctx context.Context, msg objectsync.HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) TryClose(objectTTL time.Duration) (close bool, err error) Close() error @@ -103,6 +104,10 @@ func (s *space) HandleMessage(ctx context.Context, msg objectsync.HandleMessage) return s.objectSync.HandleMessage(ctx, msg) } +func (s *space) HandleRequest(ctx context.Context, msg objectsync.HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) { + return s.objectSync.HandleRequest(ctx, msg) +} + func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder { return s.treeBuilder }