Stop syncing with clients after deletion

This commit is contained in:
mcrakhman 2023-02-24 00:10:40 +01:00 committed by Mikhail Iudin
parent 05d76811b6
commit b2f8f83518
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0

View File

@ -11,9 +11,11 @@ import (
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/nodeconf" "github.com/anytypeio/any-sync/nodeconf"
"github.com/anytypeio/any-sync/util/periodicsync" "github.com/anytypeio/any-sync/util/periodicsync"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -43,6 +45,7 @@ type headSync struct {
diff ldiff.Diff diff ldiff.Diff
log logger.CtxLogger log logger.CtxLogger
syncer DiffSyncer syncer DiffSyncer
configuration nodeconf.Configuration
spaceIsDeleted *atomic.Bool spaceIsDeleted *atomic.Bool
syncPeriod int syncPeriod int
@ -64,6 +67,7 @@ func NewHeadSync(
factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) factory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l) syncer := newDiffSyncer(spaceId, diff, peerManager, cache, storage, factory, syncStatus, l)
sync := func(ctx context.Context) (err error) { sync := func(ctx context.Context) (err error) {
// for clients cancelling sync the sync process
if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) { if spaceIsDeleted.Load() && !configuration.IsResponsible(spaceId) {
return spacesyncproto.ErrSpaceIsDeleted return spacesyncproto.ErrSpaceIsDeleted
} }
@ -79,6 +83,7 @@ func NewHeadSync(
diff: diff, diff: diff,
log: log, log: log,
syncPeriod: syncPeriod, syncPeriod: syncPeriod,
configuration: configuration,
spaceIsDeleted: spaceIsDeleted, spaceIsDeleted: spaceIsDeleted,
} }
} }
@ -90,6 +95,16 @@ func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDe
} }
func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
if d.spaceIsDeleted.Load() {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
return nil, err
}
// stop receiving all request for sync from clients
if !slices.Contains(d.configuration.NodeIds(d.spaceId), peerId) {
return nil, spacesyncproto.ErrSpaceIsDeleted
}
}
return HandleRangeRequest(ctx, d.diff, req) return HandleRangeRequest(ctx, d.diff, req)
} }