Update diffsyncer to directly sync settings tree
This commit is contained in:
parent
dea506aa42
commit
a124dbc7ea
@ -121,6 +121,10 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
|
newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff)
|
||||||
err = rpcerr.Unwrap(err)
|
err = rpcerr.Unwrap(err)
|
||||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||||
|
if err == spacesyncproto.ErrSpaceIsDeleted {
|
||||||
|
d.log.Debug("got space deleted while syncing")
|
||||||
|
d.syncTrees(ctx, p.Id(), []string{d.storage.SpaceSettingsId()})
|
||||||
|
}
|
||||||
d.syncStatus.SetNodesOnline(p.Id(), false)
|
d.syncStatus.SetNodesOnline(p.Id(), false)
|
||||||
return fmt.Errorf("diff error: %v", err)
|
return fmt.Errorf("diff error: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@ -207,7 +207,7 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
require.NoError(t, diffSyncer.Sync(ctx))
|
require.NoError(t, diffSyncer.Sync(ctx))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("diff syncer sync other error", func(t *testing.T) {
|
t.Run("diff syncer sync unexpected", func(t *testing.T) {
|
||||||
peerManagerMock.EXPECT().
|
peerManagerMock.EXPECT().
|
||||||
GetResponsiblePeers(gomock.Any()).
|
GetResponsiblePeers(gomock.Any()).
|
||||||
Return([]peer.Peer{mockPeer{}}, nil)
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
@ -217,4 +217,19 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
|
|
||||||
require.NoError(t, diffSyncer.Sync(ctx))
|
require.NoError(t, diffSyncer.Sync(ctx))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("diff syncer sync space is deleted error", func(t *testing.T) {
|
||||||
|
peerManagerMock.EXPECT().
|
||||||
|
GetResponsiblePeers(gomock.Any()).
|
||||||
|
Return([]peer.Peer{mockPeer{}}, nil)
|
||||||
|
diffMock.EXPECT().
|
||||||
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
||||||
|
Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted)
|
||||||
|
stMock.EXPECT().SpaceSettingsId().Return("settingsId")
|
||||||
|
cacheMock.EXPECT().
|
||||||
|
GetTree(gomock.Any(), spaceId, "settingsId").
|
||||||
|
Return(nil, nil)
|
||||||
|
|
||||||
|
require.NoError(t, diffSyncer.Sync(ctx))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -7,6 +7,7 @@ import (
|
|||||||
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
||||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||||
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
"github.com/anytypeio/any-sync/commonspace/peermanager"
|
||||||
|
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||||
"github.com/anytypeio/any-sync/nodeconf"
|
"github.com/anytypeio/any-sync/nodeconf"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
@ -32,6 +33,7 @@ type objectSync struct {
|
|||||||
messagePool MessagePool
|
messagePool MessagePool
|
||||||
objectGetter syncobjectgetter.SyncObjectGetter
|
objectGetter syncobjectgetter.SyncObjectGetter
|
||||||
configuration nodeconf.Configuration
|
configuration nodeconf.Configuration
|
||||||
|
spaceStorage spacestorage.SpaceStorage
|
||||||
|
|
||||||
syncCtx context.Context
|
syncCtx context.Context
|
||||||
cancelSync context.CancelFunc
|
cancelSync context.CancelFunc
|
||||||
@ -43,13 +45,15 @@ func NewObjectSync(
|
|||||||
spaceIsDeleted *atomic.Bool,
|
spaceIsDeleted *atomic.Bool,
|
||||||
configuration nodeconf.Configuration,
|
configuration nodeconf.Configuration,
|
||||||
peerManager peermanager.PeerManager,
|
peerManager peermanager.PeerManager,
|
||||||
objectGetter syncobjectgetter.SyncObjectGetter) ObjectSync {
|
objectGetter syncobjectgetter.SyncObjectGetter,
|
||||||
|
storage spacestorage.SpaceStorage) ObjectSync {
|
||||||
syncCtx, cancel := context.WithCancel(context.Background())
|
syncCtx, cancel := context.WithCancel(context.Background())
|
||||||
os := newObjectSync(
|
os := newObjectSync(
|
||||||
spaceId,
|
spaceId,
|
||||||
spaceIsDeleted,
|
spaceIsDeleted,
|
||||||
configuration,
|
configuration,
|
||||||
objectGetter,
|
objectGetter,
|
||||||
|
storage,
|
||||||
syncCtx,
|
syncCtx,
|
||||||
cancel)
|
cancel)
|
||||||
msgPool := newMessagePool(peerManager, os.handleMessage)
|
msgPool := newMessagePool(peerManager, os.handleMessage)
|
||||||
@ -62,11 +66,13 @@ func newObjectSync(
|
|||||||
spaceIsDeleted *atomic.Bool,
|
spaceIsDeleted *atomic.Bool,
|
||||||
configuration nodeconf.Configuration,
|
configuration nodeconf.Configuration,
|
||||||
objectGetter syncobjectgetter.SyncObjectGetter,
|
objectGetter syncobjectgetter.SyncObjectGetter,
|
||||||
|
spaceStorage spacestorage.SpaceStorage,
|
||||||
syncCtx context.Context,
|
syncCtx context.Context,
|
||||||
cancel context.CancelFunc,
|
cancel context.CancelFunc,
|
||||||
) *objectSync {
|
) *objectSync {
|
||||||
return &objectSync{
|
return &objectSync{
|
||||||
objectGetter: objectGetter,
|
objectGetter: objectGetter,
|
||||||
|
spaceStorage: spaceStorage,
|
||||||
spaceId: spaceId,
|
spaceId: spaceId,
|
||||||
syncCtx: syncCtx,
|
syncCtx: syncCtx,
|
||||||
cancelSync: cancel,
|
cancelSync: cancel,
|
||||||
@ -95,8 +101,8 @@ func (s *objectSync) handleMessage(ctx context.Context, senderId string, msg *sp
|
|||||||
log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId))
|
log := log.With(zap.String("objectId", msg.ObjectId), zap.String("replyId", msg.ReplyId))
|
||||||
if s.spaceIsDeleted.Load() {
|
if s.spaceIsDeleted.Load() {
|
||||||
log = log.With(zap.Bool("isDeleted", true))
|
log = log.With(zap.Bool("isDeleted", true))
|
||||||
// preventing sync with other clients
|
// preventing sync with other clients if they are not just syncing the settings tree
|
||||||
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) {
|
if !slices.Contains(s.configuration.NodeIds(s.spaceId), senderId) && msg.ObjectId != s.spaceStorage.SpaceSettingsId() {
|
||||||
return spacesyncproto.ErrSpaceIsDeleted
|
return spacesyncproto.ErrSpaceIsDeleted
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -148,7 +148,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, s.credentialProvider, log)
|
headSync := headsync.NewHeadSync(id, spaceIsDeleted, s.config.SyncPeriod, lastConfiguration, st, peerManager, getter, syncStatus, s.credentialProvider, log)
|
||||||
objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter)
|
objectSync := objectsync.NewObjectSync(id, spaceIsDeleted, lastConfiguration, peerManager, getter, st)
|
||||||
sp := &space{
|
sp := &space{
|
||||||
id: id,
|
id: id,
|
||||||
objectSync: objectSync,
|
objectSync: objectSync,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user