Change headsync logic
This commit is contained in:
parent
a5e2bea04c
commit
5b553f1a8d
@ -137,16 +137,19 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
|||||||
|
|
||||||
totalLen := len(newIds) + len(changedIds) + len(removedIds)
|
totalLen := len(newIds) + len(changedIds) + len(removedIds)
|
||||||
// not syncing ids which were removed through settings document
|
// not syncing ids which were removed through settings document
|
||||||
filteredIds := d.deletionState.FilterJoin(newIds, changedIds, removedIds)
|
missingIds := d.deletionState.Filter(newIds)
|
||||||
|
existingIds := append(d.deletionState.Filter(removedIds), d.deletionState.Filter(changedIds)...)
|
||||||
|
|
||||||
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
|
d.syncStatus.RemoveAllExcept(p.Id(), existingIds, stateCounter)
|
||||||
|
|
||||||
d.syncTrees(ctx, p.Id(), filteredIds)
|
|
||||||
|
|
||||||
|
err = d.cache.SyncTrees(ctx, existingIds, missingIds)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
|
||||||
zap.Int("changedIds", len(changedIds)),
|
zap.Int("changedIds", len(changedIds)),
|
||||||
zap.Int("removedIds", len(removedIds)),
|
zap.Int("removedIds", len(removedIds)),
|
||||||
zap.Int("already deleted ids", totalLen-len(filteredIds)),
|
zap.Int("already deleted ids", totalLen-len(existingIds)-len(missingIds)),
|
||||||
zap.String("peerId", p.Id()),
|
zap.String("peerId", p.Id()),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
|||||||
@ -135,12 +135,10 @@ func TestDiffSyncer_Sync(t *testing.T) {
|
|||||||
diffMock.EXPECT().
|
diffMock.EXPECT().
|
||||||
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(spaceId, clientMock))).
|
||||||
Return([]string{"new"}, []string{"changed"}, nil, nil)
|
Return([]string{"new"}, []string{"changed"}, nil, nil)
|
||||||
delState.EXPECT().FilterJoin(gomock.Any()).Return([]string{"new", "changed"})
|
delState.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1)
|
||||||
for _, arg := range []string{"new", "changed"} {
|
delState.EXPECT().Filter([]string{"changed"}).Return([]string{"changed"}).Times(1)
|
||||||
cacheMock.EXPECT().
|
delState.EXPECT().Filter(nil).Return(nil).Times(1)
|
||||||
GetTree(gomock.Any(), spaceId, arg).
|
cacheMock.EXPECT().SyncTrees(gomock.Any(), []string{"changed"}, []string{"new"}).Return(nil)
|
||||||
Return(nil, nil)
|
|
||||||
}
|
|
||||||
require.NoError(t, diffSyncer.Sync(ctx))
|
require.NoError(t, diffSyncer.Sync(ctx))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@ -29,7 +29,6 @@ type TreeHeads struct {
|
|||||||
|
|
||||||
type HeadSync interface {
|
type HeadSync interface {
|
||||||
Init(objectIds []string, deletionState settingsstate.ObjectDeletionState)
|
Init(objectIds []string, deletionState settingsstate.ObjectDeletionState)
|
||||||
Run()
|
|
||||||
|
|
||||||
UpdateHeads(id string, heads []string)
|
UpdateHeads(id string, heads []string)
|
||||||
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
|
HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error)
|
||||||
@ -49,7 +48,6 @@ type headSync struct {
|
|||||||
syncer DiffSyncer
|
syncer DiffSyncer
|
||||||
configuration nodeconf.NodeConf
|
configuration nodeconf.NodeConf
|
||||||
spaceIsDeleted *atomic.Bool
|
spaceIsDeleted *atomic.Bool
|
||||||
isRunning bool
|
|
||||||
|
|
||||||
syncPeriod int
|
syncPeriod int
|
||||||
}
|
}
|
||||||
@ -95,11 +93,7 @@ func NewHeadSync(
|
|||||||
func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) {
|
func (d *headSync) Init(objectIds []string, deletionState settingsstate.ObjectDeletionState) {
|
||||||
d.fillDiff(objectIds)
|
d.fillDiff(objectIds)
|
||||||
d.syncer.Init(deletionState)
|
d.syncer.Init(deletionState)
|
||||||
}
|
|
||||||
|
|
||||||
func (d *headSync) Run() {
|
|
||||||
d.periodicSync.Run()
|
d.periodicSync.Run()
|
||||||
d.isRunning = true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
|
func (d *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) {
|
||||||
@ -141,9 +135,7 @@ func (d *headSync) RemoveObjects(ids []string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (d *headSync) Close() (err error) {
|
func (d *headSync) Close() (err error) {
|
||||||
if d.isRunning {
|
d.periodicSync.Close()
|
||||||
d.periodicSync.Close()
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -51,7 +51,6 @@ func TestDiffService(t *testing.T) {
|
|||||||
storageMock.EXPECT().WriteSpaceHash(hash)
|
storageMock.EXPECT().WriteSpaceHash(hash)
|
||||||
pSyncMock.EXPECT().Run()
|
pSyncMock.EXPECT().Run()
|
||||||
service.Init([]string{initId}, delState)
|
service.Init([]string{initId}, delState)
|
||||||
service.Run()
|
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("update heads", func(t *testing.T) {
|
t.Run("update heads", func(t *testing.T) {
|
||||||
@ -65,9 +64,7 @@ func TestDiffService(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("close", func(t *testing.T) {
|
t.Run("close", func(t *testing.T) {
|
||||||
pSyncMock.EXPECT().Run()
|
|
||||||
pSyncMock.EXPECT().Close()
|
pSyncMock.EXPECT().Close()
|
||||||
service.Run()
|
|
||||||
service.Close()
|
service.Close()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@ -134,3 +134,17 @@ func (mr *MockTreeManagerMockRecorder) Run(arg0 interface{}) *gomock.Call {
|
|||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockTreeManager)(nil).Run), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SyncTrees mocks base method.
|
||||||
|
func (m *MockTreeManager) SyncTrees(arg0 context.Context, arg1, arg2 []string) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "SyncTrees", arg0, arg1, arg2)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// SyncTrees indicates an expected call of SyncTrees.
|
||||||
|
func (mr *MockTreeManagerMockRecorder) SyncTrees(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncTrees", reflect.TypeOf((*MockTreeManager)(nil).SyncTrees), arg0, arg1, arg2)
|
||||||
|
}
|
||||||
|
|||||||
@ -14,4 +14,5 @@ type TreeManager interface {
|
|||||||
GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
|
GetTree(ctx context.Context, spaceId, treeId string) (objecttree.ObjectTree, error)
|
||||||
MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error
|
MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error
|
||||||
DeleteTree(ctx context.Context, spaceId, treeId string) error
|
DeleteTree(ctx context.Context, spaceId, treeId string) error
|
||||||
|
SyncTrees(ctx context.Context, exiting, removed []string) error
|
||||||
}
|
}
|
||||||
|
|||||||
@ -16,7 +16,7 @@ type ObjectDeletionState interface {
|
|||||||
GetQueued() (ids []string)
|
GetQueued() (ids []string)
|
||||||
Delete(id string) (err error)
|
Delete(id string) (err error)
|
||||||
Exists(id string) bool
|
Exists(id string) bool
|
||||||
FilterJoin(ids ...[]string) (filtered []string)
|
Filter(ids []string) (filtered []string)
|
||||||
}
|
}
|
||||||
|
|
||||||
type objectDeletionState struct {
|
type objectDeletionState struct {
|
||||||
@ -113,19 +113,14 @@ func (st *objectDeletionState) Exists(id string) bool {
|
|||||||
return st.exists(id)
|
return st.exists(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (st *objectDeletionState) FilterJoin(ids ...[]string) (filtered []string) {
|
func (st *objectDeletionState) Filter(ids []string) (filtered []string) {
|
||||||
st.RLock()
|
st.RLock()
|
||||||
defer st.RUnlock()
|
defer st.RUnlock()
|
||||||
filter := func(ids []string) {
|
for _, id := range ids {
|
||||||
for _, id := range ids {
|
if !st.exists(id) {
|
||||||
if !st.exists(id) {
|
filtered = append(filtered, id)
|
||||||
filtered = append(filtered, id)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, arr := range ids {
|
|
||||||
filter(arr)
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -80,8 +80,8 @@ func TestDeletionState_FilterJoin(t *testing.T) {
|
|||||||
fx.delState.queued["id1"] = struct{}{}
|
fx.delState.queued["id1"] = struct{}{}
|
||||||
fx.delState.queued["id2"] = struct{}{}
|
fx.delState.queued["id2"] = struct{}{}
|
||||||
|
|
||||||
filtered := fx.delState.FilterJoin([]string{"id1"}, []string{"id3", "id2"}, []string{"id4"})
|
filtered := fx.delState.Filter([]string{"id3", "id2"})
|
||||||
require.Equal(t, []string{"id3", "id4"}, filtered)
|
require.Equal(t, []string{"id3"}, filtered)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeletionState_AddObserver(t *testing.T) {
|
func TestDeletionState_AddObserver(t *testing.T) {
|
||||||
|
|||||||
@ -87,22 +87,18 @@ func (mr *MockObjectDeletionStateMockRecorder) Exists(arg0 interface{}) *gomock.
|
|||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockObjectDeletionState)(nil).Exists), arg0)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Exists", reflect.TypeOf((*MockObjectDeletionState)(nil).Exists), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterJoin mocks base method.
|
// Filter mocks base method.
|
||||||
func (m *MockObjectDeletionState) FilterJoin(arg0 ...[]string) []string {
|
func (m *MockObjectDeletionState) Filter(arg0 []string) []string {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
varargs := []interface{}{}
|
ret := m.ctrl.Call(m, "Filter", arg0)
|
||||||
for _, a := range arg0 {
|
|
||||||
varargs = append(varargs, a)
|
|
||||||
}
|
|
||||||
ret := m.ctrl.Call(m, "FilterJoin", varargs...)
|
|
||||||
ret0, _ := ret[0].([]string)
|
ret0, _ := ret[0].([]string)
|
||||||
return ret0
|
return ret0
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterJoin indicates an expected call of FilterJoin.
|
// Filter indicates an expected call of Filter.
|
||||||
func (mr *MockObjectDeletionStateMockRecorder) FilterJoin(arg0 ...interface{}) *gomock.Call {
|
func (mr *MockObjectDeletionStateMockRecorder) Filter(arg0 interface{}) *gomock.Call {
|
||||||
mr.mock.ctrl.T.Helper()
|
mr.mock.ctrl.T.Helper()
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FilterJoin", reflect.TypeOf((*MockObjectDeletionState)(nil).FilterJoin), arg0...)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Filter", reflect.TypeOf((*MockObjectDeletionState)(nil).Filter), arg0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetQueued mocks base method.
|
// GetQueued mocks base method.
|
||||||
|
|||||||
@ -224,6 +224,10 @@ type mockTreeManager struct {
|
|||||||
markedIds []string
|
markedIds []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *mockTreeManager) SyncTrees(ctx context.Context, exiting, removed []string) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error {
|
func (t *mockTreeManager) MarkTreeDeleted(ctx context.Context, spaceId, treeId string) error {
|
||||||
t.markedIds = append(t.markedIds, treeId)
|
t.markedIds = append(t.markedIds, treeId)
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user