diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 9974559d..4af26f3e 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -11,9 +11,11 @@ import ( "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/util/periodicsync" "go.uber.org/zap" + "golang.org/x/exp/slices" "strings" "sync/atomic" "time" @@ -43,6 +45,7 @@ type headSync struct { diff ldiff.Diff log logger.CtxLogger syncer DiffSyncer + configuration nodeconf.Configuration spaceIsDeleted *atomic.Bool syncPeriod int @@ -64,6 +67,7 @@ func NewHeadSync( factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) sync := func(ctx context.Context) (err error) { + // for clients cancelling sync the sync process if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) { return spacesyncproto.ErrSpaceIsDeleted } @@ -79,6 +83,7 @@ func NewHeadSync( diff: diff, log: log, syncPeriod: syncPeriod, + configuration: configuration, spaceIsDeleted: spaceIsDeleted, } } @@ -90,6 +95,16 @@ func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDe } func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { + if d.spaceIsDeleted.Load() { + peerId, err := peer.CtxPeerId(ctx) + if err != nil { + return nil, err + } + // stop receiving all request for sync from clients + if !slices.Contains(d.configuration.NodeIds(d.spaceId), peerId) { + return nil, spacesyncproto.ErrSpaceIsDeleted + } + } return HandleRangeRequest(ctx, d.diff, req) }