From 246e0aa6749b36aed1bbf1bec670208ef8611dd9 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Wed, 7 Sep 2022 21:02:42 +0200 Subject: [PATCH] WIP algorithm --- pkg/acl/tree/objecttree.go | 59 ++++---- pkg/acl/tree/objecttree_test.go | 112 +++++++++------- pkg/acl/tree/rawloader.go | 231 ++++++++++++++++++++++++++++++++ pkg/acl/tree/tree.go | 10 +- pkg/acl/tree/treereduce.go | 4 +- 5 files changed, 337 insertions(+), 79 deletions(-) create mode 100644 pkg/acl/tree/rawloader.go diff --git a/pkg/acl/tree/objecttree.go b/pkg/acl/tree/objecttree.go index f12f1d1b..b56616ca 100644 --- a/pkg/acl/tree/objecttree.go +++ b/pkg/acl/tree/objecttree.go @@ -60,7 +60,7 @@ type ObjectTree interface { IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error SnapshotPath() []string - ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) + ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*aclpb.RawChange, error) Storage() storage.TreeStorage DebugDump() (string, error) @@ -511,11 +511,11 @@ func (ot *objectTree) SnapshotPath() []string { return path } -func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) { +func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath, theirHeads []string) ([]*aclpb.RawChange, error) { var ( needFullDocument = len(theirPath) == 0 ourPath = ot.SnapshotPath() - // by default returning everything we have + // by default returning everything we have from start commonSnapshot = ourPath[len(ourPath)-1] err error ) @@ -528,40 +528,38 @@ func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.R } } - log.With( - zap.Strings("heads", ot.tree.Heads()), - zap.String("breakpoint", commonSnapshot), - zap.String("id", ot.id)). - Debug("getting all changes from common snapshot") - if commonSnapshot == ot.tree.RootId() { - return ot.getChangesFromTree() + return ot.getChangesFromTree(theirHeads) } else { - return ot.getChangesFromDB(commonSnapshot, needFullDocument) + return ot.getChangesFromDB(commonSnapshot, theirHeads, needFullDocument) } } -func (ot *objectTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err error) { - ot.tree.dfsPrev(ot.tree.HeadsChanges(), func(ch *Change) bool { - var marshalled []byte - marshalled, err = ch.Content.Marshal() - if err != nil { - return false - } +func (ot *objectTree) getChangesFromTree(theirHeads []string) (rawChanges []*aclpb.RawChange, err error) { + ot.tree.dfsPrev( + ot.tree.HeadsChanges(), + theirHeads, + func(ch *Change) bool { + var marshalled []byte + marshalled, err = ch.Content.Marshal() + if err != nil { + return false + } - raw := &aclpb.RawChange{ - Payload: marshalled, - Signature: ch.Signature(), - Id: ch.Id, - } - rawChanges = append(rawChanges, raw) - return true - }, func(changes []*Change) {}) + raw := &aclpb.RawChange{ + Payload: marshalled, + Signature: ch.Signature(), + Id: ch.Id, + } + rawChanges = append(rawChanges, raw) + return true + }, + func(changes []*Change) {}) return } -func (ot *objectTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) { +func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) { load := func(id string) (*Change, error) { raw, err := ot.treeStorage.GetRawChange(context.Background(), id) if err != nil { @@ -576,8 +574,13 @@ func (ot *objectTree) getChangesFromDB(commonSnapshot string, needStartSnapshot rawChanges = append(rawChanges, raw) return ch, nil } + // setting breakpoints to include head and common snapshot + // that means that we will ignore all changes which start at theirHeads + breakpoints := make([]string, 0, len(theirHeads)+1) + breakpoints = append(breakpoints, commonSnapshot) + breakpoints = append(breakpoints, theirHeads...) - _, err = ot.treeBuilder.dfs(ot.tree.Heads(), commonSnapshot, load) + _, err = ot.treeBuilder.dfs(ot.tree.Heads(), breakpoints, load) if err != nil { return } diff --git a/pkg/acl/tree/objecttree_test.go b/pkg/acl/tree/objecttree_test.go index 7f5c7ce8..5acfd5be 100644 --- a/pkg/acl/tree/objecttree_test.go +++ b/pkg/acl/tree/objecttree_test.go @@ -271,7 +271,7 @@ func TestObjectTree(t *testing.T) { assert.Equal(t, true, objTree.(*objectTree).snapshotPathIsActual()) }) - t.Run("changes after common snapshot from tree", func(t *testing.T) { + t.Run("changes after common snapshot complex", func(t *testing.T) { ctx := prepareTreeContext(t, aclList) changeCreator := ctx.changeCreator objTree := ctx.objTree @@ -289,19 +289,43 @@ func TestObjectTree(t *testing.T) { require.NoError(t, err, "adding changes should be without error") require.Equal(t, "0", objTree.Root().Id) - changeIds := make(map[string]struct{}) - changes, err := objTree.ChangesAfterCommonSnapshot([]string{"3", "0"}) - for _, ch := range changes { - changeIds[ch.Id] = struct{}{} - } + t.Run("changes from tree", func(t *testing.T) { + changes, err := objTree.ChangesAfterCommonSnapshot([]string{"3", "0"}, []string{}) + require.NoError(t, err, "changes after common snapshot should be without error") - for _, raw := range rawChanges { - _, ok := changeIds[raw.Id] - assert.Equal(t, true, ok) - } + changeIds := make(map[string]struct{}) + for _, ch := range changes { + changeIds[ch.Id] = struct{}{} + } + + for _, raw := range rawChanges { + _, ok := changeIds[raw.Id] + assert.Equal(t, true, ok) + } + }) + + t.Run("changes from tree after first", func(t *testing.T) { + changes, err := objTree.ChangesAfterCommonSnapshot([]string{"3", "0"}, []string{"1"}) + require.NoError(t, err, "changes after common snapshot should be without error") + + changeIds := make(map[string]struct{}) + for _, ch := range changes { + changeIds[ch.Id] = struct{}{} + } + + for _, raw := range rawChanges { + if raw.Id == "1" { + _, ok := changeIds[raw.Id] + assert.Equal(t, false, ok) + continue + } + _, ok := changeIds[raw.Id] + assert.Equal(t, true, ok) + } + }) }) - t.Run("changes after common snapshot from db", func(t *testing.T) { + t.Run("changes after common snapshot simple", func(t *testing.T) { ctx := prepareTreeContext(t, aclList) changeCreator := ctx.changeCreator objTree := ctx.objTree @@ -316,45 +340,37 @@ func TestObjectTree(t *testing.T) { require.NoError(t, err, "adding changes should be without error") require.Equal(t, "3", objTree.Root().Id) - changeIds := make(map[string]struct{}) - changes, err := objTree.ChangesAfterCommonSnapshot([]string{"0"}) - for _, ch := range changes { - changeIds[ch.Id] = struct{}{} - } + t.Run("changes from db", func(t *testing.T) { + changes, err := objTree.ChangesAfterCommonSnapshot([]string{"0"}, []string{}) + require.NoError(t, err, "changes after common snapshot should be without error") - for _, raw := range rawChanges { - _, ok := changeIds[raw.Id] + changeIds := make(map[string]struct{}) + for _, ch := range changes { + changeIds[ch.Id] = struct{}{} + } + + for _, raw := range rawChanges { + _, ok := changeIds[raw.Id] + assert.Equal(t, true, ok) + } + }) + + t.Run("changes from db with empty path", func(t *testing.T) { + changes, err := objTree.ChangesAfterCommonSnapshot([]string{}, []string{}) + require.NoError(t, err, "changes after common snapshot should be without error") + + changeIds := make(map[string]struct{}) + for _, ch := range changes { + changeIds[ch.Id] = struct{}{} + } + + for _, raw := range rawChanges { + _, ok := changeIds[raw.Id] + assert.Equal(t, true, ok) + } + _, ok := changeIds["0"] assert.Equal(t, true, ok) - } - }) - - t.Run("changes after common snapshot from db, they have empty path", func(t *testing.T) { - ctx := prepareTreeContext(t, aclList) - changeCreator := ctx.changeCreator - objTree := ctx.objTree - - rawChanges := []*aclpb.RawChange{ - changeCreator.createRaw("1", aclList.Head().Id, "0", false, "0"), - changeCreator.createRaw("2", aclList.Head().Id, "0", false, "1"), - changeCreator.createRaw("3", aclList.Head().Id, "0", true, "2"), - } - - _, err := objTree.AddRawChanges(context.Background(), rawChanges...) - require.NoError(t, err, "adding changes should be without error") - require.Equal(t, "3", objTree.Root().Id) - - changeIds := make(map[string]struct{}) - changes, err := objTree.ChangesAfterCommonSnapshot([]string{}) - for _, ch := range changes { - changeIds[ch.Id] = struct{}{} - } - - for _, raw := range rawChanges { - _, ok := changeIds[raw.Id] - assert.Equal(t, true, ok) - } - _, ok := changeIds["0"] - assert.Equal(t, true, ok) + }) }) t.Run("add new changes related to previous snapshot", func(t *testing.T) { diff --git a/pkg/acl/tree/rawloader.go b/pkg/acl/tree/rawloader.go new file mode 100644 index 00000000..473b2017 --- /dev/null +++ b/pkg/acl/tree/rawloader.go @@ -0,0 +1,231 @@ +package tree + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" + "time" +) + +type rawChangeLoader struct { + treeStorage storage.TreeStorage + changeBuilder ChangeBuilder + + // buffers + idStack []string + cache map[string]rawCacheEntry +} + +type rawCacheEntry struct { + change *Change + rawChange *aclpb.RawChange +} + +func newRawChangeLoader(treeStorage storage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader { + return &rawChangeLoader{ + treeStorage: treeStorage, + changeBuilder: changeBuilder, + } +} + +func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb.RawChange, error) { + var stack []*Change + for _, h := range t.headIds { + stack = append(stack, t.attached[h]) + } + + convert := func(chs []*Change) (rawChanges []*aclpb.RawChange, err error) { + for _, ch := range chs { + var marshalled []byte + marshalled, err = ch.Content.Marshal() + if err != nil { + return + } + + raw := &aclpb.RawChange{ + Payload: marshalled, + Signature: ch.Signature(), + Id: ch.Id, + } + rawChanges = append(rawChanges, raw) + } + return + } + + // getting all changes that we visit + var results []*Change + rootVisited := false + t.dfsPrev( + stack, + breakpoints, + func(ch *Change) bool { + results = append(results, ch) + return true + }, + func(visited []*Change) { + if t.root.visited { + rootVisited = true + } + }, + ) + + // if we stopped at breakpoints or there are no breakpoints + if !rootVisited || len(breakpoints) == 0 { + return convert(results) + } + + // now starting from breakpoints + stack = stack[:0] + for _, h := range breakpoints { + stack = append(stack, t.attached[h]) + } + + // doing another dfs to get all changes before breakpoints, we need to exclude them from results + t.dfsPrev( + stack, + []string{}, + func(ch *Change) bool { + return true + }, + func(visited []*Change) { + discardFromSlice(results, func(change *Change) bool { + return change.visited + }) + }, + ) + + // otherwise we want to exclude everything that wasn't in breakpoints + return convert(results) +} + +func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) { + var ok bool + if entry, ok = r.cache[id]; ok { + return + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + defer cancel() + + rawChange, err := r.treeStorage.GetRawChange(ctx, id) + if err != nil { + return + } + + change, err := r.changeBuilder.ConvertFromRaw(rawChange) + if err != nil { + return + } + entry = rawCacheEntry{ + change: change, + rawChange: rawChange, + } + r.cache[id] = entry + return +} + +func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*aclpb.RawChange, error) { + // initializing buffers + r.idStack = r.idStack[:0] + + // updating map + bufPosMap := make(map[string]int) + for _, breakpoint := range breakpoints { + bufPosMap[breakpoint] = -1 + } + bufPosMap[commonSnapshot] = -1 + + dfs := func( + commonSnapshot string, + heads []string, + startCounter int, + shouldVisit func(counter int, mapExists bool) bool, + visit func(prevCounter int, entry rawCacheEntry) int) bool { + + commonSnapshotVisited := false + for len(r.idStack) > 0 { + id := r.idStack[len(r.idStack)-1] + r.idStack = r.idStack[:len(r.idStack)-1] + + cnt, exists := bufPosMap[id] + if !shouldVisit(cnt, exists) { + continue + } + + entry, err := r.loadEntry(id) + if err != nil { + continue + } + + // setting the counter when we visit + bufPosMap[id] = visit(cnt, entry) + + for _, prev := range entry.change.PreviousIds { + if prev == commonSnapshot { + commonSnapshotVisited = true + break + } + cnt, exists = bufPosMap[prev] + if !shouldVisit(cnt, exists) { + continue + } + r.idStack = append(r.idStack, prev) + } + } + return commonSnapshotVisited + } + + // preparing first pass + r.idStack = append(r.idStack, heads...) + var buffer []*aclpb.RawChange + + rootVisited := dfs(commonSnapshot, heads, 0, + func(counter int, mapExists bool) bool { + return !mapExists + }, + func(_ int, entry rawCacheEntry) int { + buffer = append(buffer, entry.rawChange) + return len(buffer) - 1 + }) + + // checking if we stopped at breakpoints + if !rootVisited || len(breakpoints) == 0 { + return buffer, nil + } + + // resetting stack + r.idStack = r.idStack[:0] + r.idStack = append(r.idStack, breakpoints...) + + // marking all visited as nil + dfs(commonSnapshot, heads, len(buffer), + func(counter int, mapExists bool) bool { + return !mapExists || counter < len(buffer) + }, + func(discardedPosition int, entry rawCacheEntry) int { + if discardedPosition != -1 { + buffer[discardedPosition] = nil + } + return len(buffer) + 1 + }) + discardFromSlice(buffer, func(change *aclpb.RawChange) bool { + return change == nil + }) + + return buffer, nil +} + +func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) { + var ( + finishedIdx = 0 + currentIdx = 0 + ) + for currentIdx < len(elements) { + if !isDiscarded(elements[currentIdx]) && finishedIdx != currentIdx { + elements[finishedIdx] = elements[currentIdx] + finishedIdx++ + } + currentIdx++ + } + elements = elements[:finishedIdx] +} diff --git a/pkg/acl/tree/tree.go b/pkg/acl/tree/tree.go index c15b15d1..549d7f84 100644 --- a/pkg/acl/tree/tree.go +++ b/pkg/acl/tree/tree.go @@ -271,9 +271,17 @@ func (t *Tree) after(id1, id2 string) (found bool) { return } -func (t *Tree) dfsPrev(stack []*Change, visit func(ch *Change) (isContinue bool), afterVisit func([]*Change)) { +func (t *Tree) dfsPrev(stack []*Change, breakpoints []string, visit func(ch *Change) (isContinue bool), afterVisit func([]*Change)) { t.visitedBuf = t.visitedBuf[:0] + // setting breakpoints as visited + for _, breakpoint := range breakpoints { + if ch, ok := t.attached[breakpoint]; ok { + ch.visited = true + t.visitedBuf = append(t.visitedBuf, ch) + } + } + defer func() { afterVisit(t.visitedBuf) for _, ch := range t.visitedBuf { diff --git a/pkg/acl/tree/treereduce.go b/pkg/acl/tree/treereduce.go index 7677f1cb..98532339 100644 --- a/pkg/acl/tree/treereduce.go +++ b/pkg/acl/tree/treereduce.go @@ -18,9 +18,9 @@ func (t *Tree) checkRoot(change *Change) (total int) { stack = append(stack, t.attached[h]) } - change.visited = true t.dfsPrev( stack, + []string{change.Id}, func(ch *Change) bool { total += 1 return true @@ -31,7 +31,6 @@ func (t *Tree) checkRoot(change *Change) (total int) { } }, ) - change.visited = false return } @@ -46,6 +45,7 @@ func (t *Tree) makeRootAndRemove(start *Change) { t.dfsPrev( stack, + []string{}, func(ch *Change) bool { return true },