Send update only on remote tree or put tree when building

This commit is contained in:
mcrakhman 2023-05-11 13:40:24 +02:00 committed by Mikhail Iudin
parent a232ed76ce
commit 7feb6b0d2c
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
9 changed files with 42 additions and 38 deletions

View File

@ -199,7 +199,7 @@ func (ot *objectTree) AddContent(ctx context.Context, content SignableChangeCont
panic(err)
}
err = ot.treeStorage.AddMany([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
err = ot.treeStorage.AddRawChangesSetHead([]*treechangeproto.RawTreeChangeWithId{rawChange}, []string{objChange.Id})
if err != nil {
return
}
@ -296,7 +296,7 @@ func (ot *objectTree) AddRawChanges(ctx context.Context, changesPayload RawChang
addResult.Mode = Rebuild
}
err = ot.treeStorage.AddMany(addResult.Added, addResult.Heads)
err = ot.treeStorage.AddRawChangesSetHead(addResult.Added, addResult.Heads)
if err != nil {
// rolling back all changes made to inmemory state
ot.rebuildFromStorage(nil, nil)

View File

@ -65,7 +65,7 @@ func prepareContext(
treeStorage := changeCreator.CreateNewTreeStorage("0", aclList.Head().Id)
if additionalChanges != nil {
payload := additionalChanges(changeCreator)
err := treeStorage.AddMany(payload.RawChanges, payload.NewHeads)
err := treeStorage.AddRawChangesSetHead(payload.RawChanges, payload.NewHeads)
require.NoError(t, err)
}
objTree, err := objTreeBuilder(treeStorage, aclList)
@ -503,7 +503,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "1", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "1", true, "3", "4", "5"),
}
store.AddMany(storageChanges, []string{"1"})
store.AddRawChangesSetHead(storageChanges, []string{"1"})
// updating with subset of those changes to see that everything will still work
payload = RawChangesPayload{
@ -748,7 +748,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
}
deps.treeStorage.AddMany(rawChanges, []string{"6"})
deps.treeStorage.AddRawChangesSetHead(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "6",
IncludeBeforeId: false,
@ -780,7 +780,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "1", true, "3", "4"),
changeCreator.CreateRaw("6", aclList.Head().Id, "5", false, "5"),
}
deps.treeStorage.AddMany(rawChanges, []string{"6"})
deps.treeStorage.AddRawChangesSetHead(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BuildFullTree: true,
})
@ -810,7 +810,7 @@ func TestObjectTree(t *testing.T) {
changeCreator.CreateRaw("5", aclList.Head().Id, "0", false, "1"),
changeCreator.CreateRaw("6", aclList.Head().Id, "0", false, "3", "4", "5"),
}
deps.treeStorage.AddMany(rawChanges, []string{"6"})
deps.treeStorage.AddRawChangesSetHead(rawChanges, []string{"6"})
hTree, err := buildHistoryTree(deps, HistoryTreeParams{
BeforeId: "6",
IncludeBeforeId: true,

View File

@ -112,7 +112,7 @@ func testTreeMerge(t *testing.T, levels, perLevel int, hasData bool, isSnapshot
}
// generating initial tree
initialRes := genChanges(changeCreator, params)
err = storage.AddMany(initialRes.changes, initialRes.heads)
err = storage.AddRawChangesSetHead(initialRes.changes, initialRes.heads)
require.NoError(t, err)
deps := fixtureDeps{
aclList: aclList,
@ -264,7 +264,7 @@ func testTreeStorageHasExtra(t *testing.T, levels, perLevel int, hasData bool, i
// adding some changes to store, but without updating heads
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
oldHeads, _ := store.Heads()
store.AddMany(initialRes.changes, oldHeads)
store.AddRawChangesSetHead(initialRes.changes, oldHeads)
// sending those changes to other peer
fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
@ -333,8 +333,8 @@ func testTreeStorageHasExtraThreeParts(t *testing.T, levels, perLevel int, hasDa
require.True(t, slice.UnsortedEquals(res.Heads, firstPart.heads))
store := fx.handlers["peer1"].tree().Storage().(*treestorage.InMemoryTreeStorage)
oldHeads, _ := store.Heads()
store.AddMany(secondPart.changes, oldHeads)
store.AddMany(thirdPart.changes, oldHeads)
store.AddRawChangesSetHead(secondPart.changes, oldHeads)
store.AddRawChangesSetHead(thirdPart.changes, oldHeads)
var peer2Initial []*treechangeproto.RawTreeChangeWithId
peer2Initial = append(peer2Initial, firstPart.changes...)

View File

@ -76,12 +76,15 @@ type BuildDeps struct {
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
remoteGetter := treeRemoteGetter{treeId: id, deps: deps}
deps.TreeStorage, err = remoteGetter.getTree(ctx)
var (
remoteGetter = treeRemoteGetter{treeId: id, deps: deps}
isRemote bool
)
deps.TreeStorage, isRemote, err = remoteGetter.getTree(ctx)
if err != nil {
return
}
return buildSyncTree(ctx, true, deps)
return buildSyncTree(ctx, isRemote, deps)
}
func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, deps BuildDeps) (t SyncTree, err error) {
@ -92,7 +95,7 @@ func PutSyncTree(ctx context.Context, payload treestorage.TreeStorageCreatePaylo
return buildSyncTree(ctx, true, deps)
}
func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t SyncTree, err error) {
func buildSyncTree(ctx context.Context, sendUpdate bool, deps BuildDeps) (t SyncTree, err error) {
objTree, err := deps.BuildObjectTree(deps.TreeStorage, deps.AclList)
if err != nil {
return
@ -113,7 +116,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
syncTree.afterBuild()
syncTree.Unlock()
if isFirstBuild {
if sendUpdate {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// send to everybody, because everybody should know that the node or client got new tree
syncTree.syncClient.Broadcast(ctx, headUpdate)

View File

@ -93,7 +93,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
defer func() {
if err != nil {
log.With(zap.Error(err)).Debug("head update finished with error")
log.ErrorCtx(ctx, "head update finished with error", zap.Error(err))
} else if fullRequest != nil {
cnt := fullRequest.Content.GetFullSyncRequest()
log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes)))
@ -168,7 +168,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
defer func() {
if err != nil {
log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error")
log.ErrorCtx(ctx, "full sync request finished with error", zap.Error(err))
s.syncClient.SendWithReply(ctx, senderId, treeId, treechangeproto.WrapError(treechangeproto.ErrFullSync, header), replyId)
return
} else if fullResponse != nil {
@ -212,7 +212,7 @@ func (s *syncTreeHandler) handleFullSyncResponse(
defer func() {
if err != nil {
log.With(zap.Error(err)).DebugCtx(ctx, "full sync response failed")
log.ErrorCtx(ctx, "full sync response failed", zap.Error(err))
} else {
log.DebugCtx(ctx, "full sync response succeeded")
}

View File

@ -90,12 +90,11 @@ Loop:
}
}
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) {
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, isRemote bool, err error) {
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
if err == nil {
return
}
if err != nil && err != treestorage.ErrUnknownTreeId {
return
}
@ -109,6 +108,7 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
return
}
isRemote = true
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync)
if err != nil {
return
@ -139,5 +139,6 @@ func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.
return
}
// now we are sure that we can save it to the storage
return t.deps.SpaceStorage.CreateTreeStorage(payload)
treeStorage, err = t.deps.SpaceStorage.CreateTreeStorage(payload)
return
}

View File

@ -22,7 +22,7 @@ func (t *InMemoryTreeStorage) SetReturnErrorOnAdd(err error) {
t.addErr = err
}
func (t *InMemoryTreeStorage) AddMany(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
func (t *InMemoryTreeStorage) AddRawChangesSetHead(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error {
t.RLock()
defer t.RUnlock()
if t.addErr != nil {

View File

@ -35,20 +35,6 @@ func (m *MockTreeStorage) EXPECT() *MockTreeStorageMockRecorder {
return m.recorder
}
// AddMany mocks base method.
func (m *MockTreeStorage) AddMany(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddMany", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// AddMany indicates an expected call of AddMany.
func (mr *MockTreeStorageMockRecorder) AddMany(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddMany", reflect.TypeOf((*MockTreeStorage)(nil).AddMany), arg0, arg1)
}
// AddRawChange mocks base method.
func (m *MockTreeStorage) AddRawChange(arg0 *treechangeproto.RawTreeChangeWithId) error {
m.ctrl.T.Helper()
@ -63,6 +49,20 @@ func (mr *MockTreeStorageMockRecorder) AddRawChange(arg0 interface{}) *gomock.Ca
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChange", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChange), arg0)
}
// AddRawChangesSetHead mocks base method.
func (m *MockTreeStorage) AddRawChangesSetHead(arg0 []*treechangeproto.RawTreeChangeWithId, arg1 []string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRawChangesSetHead", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// AddRawChangesSetHead indicates an expected call of AddRawChangesSetHead.
func (mr *MockTreeStorageMockRecorder) AddRawChangesSetHead(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRawChangesSetHead", reflect.TypeOf((*MockTreeStorage)(nil).AddRawChangesSetHead), arg0, arg1)
}
// Delete mocks base method.
func (m *MockTreeStorage) Delete() error {
m.ctrl.T.Helper()

View File

@ -31,7 +31,7 @@ type TreeStorage interface {
Heads() ([]string, error)
SetHeads(heads []string) error
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
AddMany(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
AddRawChangesSetHead(changes []*treechangeproto.RawTreeChangeWithId, heads []string) error
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
HasChange(ctx context.Context, id string) (bool, error)