Add sync requests handling
This commit is contained in:
parent
748681d765
commit
990cbc58b6
@ -5,11 +5,13 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anyproto/any-sync/app"
|
||||
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anyproto/any-sync/commonspace/spacestate"
|
||||
"github.com/anyproto/any-sync/metric"
|
||||
"github.com/anyproto/any-sync/net/peer"
|
||||
"github.com/anyproto/any-sync/util/multiqueue"
|
||||
"github.com/cheggaaa/mb/v3"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -102,18 +104,12 @@ func (s *objectSync) LastUsage() time.Time {
|
||||
}
|
||||
|
||||
func (s *objectSync) HandleRequest(ctx context.Context, hm HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
|
||||
return s.handleRequest(ctx, hm.SenderId, hm.Message)
|
||||
}
|
||||
|
||||
func (s *objectSync) HandleMessage(ctx context.Context, hm HandleMessage) (err error) {
|
||||
threadId := hm.Message.ObjectId
|
||||
hm.ReceiveTime = time.Now()
|
||||
if hm.Message.ReplyId != "" {
|
||||
threadId += hm.Message.ReplyId
|
||||
defer func() {
|
||||
_ = s.handleQueue.CloseThread(threadId)
|
||||
}()
|
||||
}
|
||||
if hm.PeerCtx == nil {
|
||||
hm.PeerCtx = ctx
|
||||
}
|
||||
@ -160,6 +156,29 @@ func (s *objectSync) processHandleMessage(msg HandleMessage) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *objectSync) handleRequest(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (response *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
log := log.With(
|
||||
zap.String("objectId", msg.ObjectId),
|
||||
zap.String("requestId", msg.RequestId),
|
||||
zap.String("replyId", msg.ReplyId))
|
||||
if s.spaceIsDeleted.Load() {
|
||||
log = log.With(zap.Bool("isDeleted", true))
|
||||
// preventing sync with other clients if they are not just syncing the settings tree
|
||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||
return nil, spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
}
|
||||
err = s.checkEmptyFullSync(log, msg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||
if err != nil {
|
||||
return nil, treechangeproto.ErrGetTree
|
||||
}
|
||||
return obj.HandleRequest(ctx, senderId, msg)
|
||||
}
|
||||
|
||||
func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
log := log.With(
|
||||
zap.String("objectId", msg.ObjectId),
|
||||
@ -169,9 +188,13 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
||||
log = log.With(zap.Bool("isDeleted", true))
|
||||
// preventing sync with other clients if they are not just syncing the settings tree
|
||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||
return fmt.Errorf("can't perform operation with object, space is deleted")
|
||||
return spacesyncproto.ErrSpaceIsDeleted
|
||||
}
|
||||
}
|
||||
err = s.checkEmptyFullSync(log, msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
obj, err := s.objectGetter.GetObject(ctx, msg.ObjectId)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get object from cache: %w", err)
|
||||
@ -186,3 +209,28 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
||||
func (s *objectSync) CloseThread(id string) (err error) {
|
||||
return s.handleQueue.CloseThread(id)
|
||||
}
|
||||
|
||||
func (s *objectSync) checkEmptyFullSync(log logger.CtxLogger, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
hasTree, err := s.spaceStorage.HasTree(msg.ObjectId)
|
||||
if err != nil {
|
||||
log.Warn("failed to execute get operation on storage has tree", zap.Error(err))
|
||||
return spacesyncproto.ErrUnexpected
|
||||
}
|
||||
// in this case we will try to get it from remote, unless the sender also sent us the same request :-)
|
||||
if !hasTree {
|
||||
treeMsg := &treechangeproto.TreeSyncMessage{}
|
||||
err = proto.Unmarshal(msg.Payload, treeMsg)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
// this means that we don't have the tree locally and therefore can't return it
|
||||
if s.isEmptyFullSyncRequest(treeMsg) {
|
||||
return treechangeproto.ErrGetTree
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *objectSync) isEmptyFullSyncRequest(msg *treechangeproto.TreeSyncMessage) bool {
|
||||
return msg.GetContent().GetFullSyncRequest() != nil && len(msg.GetContent().GetFullSyncRequest().GetHeads()) == 0
|
||||
}
|
||||
|
||||
@ -67,6 +67,7 @@ type Space interface {
|
||||
DeleteSpace(ctx context.Context, deleteChange *treechangeproto.RawTreeChangeWithId) (err error)
|
||||
|
||||
HandleMessage(ctx context.Context, msg objectsync.HandleMessage) (err error)
|
||||
HandleRequest(ctx context.Context, msg objectsync.HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error)
|
||||
|
||||
TryClose(objectTTL time.Duration) (close bool, err error)
|
||||
Close() error
|
||||
@ -103,6 +104,10 @@ func (s *space) HandleMessage(ctx context.Context, msg objectsync.HandleMessage)
|
||||
return s.objectSync.HandleMessage(ctx, msg)
|
||||
}
|
||||
|
||||
func (s *space) HandleRequest(ctx context.Context, msg objectsync.HandleMessage) (resp *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
return s.objectSync.HandleRequest(ctx, msg)
|
||||
}
|
||||
|
||||
func (s *space) TreeBuilder() objecttreebuilder.TreeBuilder {
|
||||
return s.treeBuilder
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user