WIP algorithm

This commit is contained in:
mcrakhman 2022-09-07 21:02:42 +02:00 committed by Mikhail Iudin
parent dec7cc6ee3
commit 246e0aa674
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
5 changed files with 337 additions and 79 deletions

View File

@ -60,7 +60,7 @@ type ObjectTree interface {
IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
ChangesAfterCommonSnapshot(snapshotPath, heads []string) ([]*aclpb.RawChange, error)
Storage() storage.TreeStorage
DebugDump() (string, error)
@ -511,11 +511,11 @@ func (ot *objectTree) SnapshotPath() []string {
return path
}
func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath, theirHeads []string) ([]*aclpb.RawChange, error) {
var (
needFullDocument = len(theirPath) == 0
ourPath = ot.SnapshotPath()
// by default returning everything we have
// by default returning everything we have from start
commonSnapshot = ourPath[len(ourPath)-1]
err error
)
@ -528,21 +528,18 @@ func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.R
}
}
log.With(
zap.Strings("heads", ot.tree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", ot.id)).
Debug("getting all changes from common snapshot")
if commonSnapshot == ot.tree.RootId() {
return ot.getChangesFromTree()
return ot.getChangesFromTree(theirHeads)
} else {
return ot.getChangesFromDB(commonSnapshot, needFullDocument)
return ot.getChangesFromDB(commonSnapshot, theirHeads, needFullDocument)
}
}
func (ot *objectTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err error) {
ot.tree.dfsPrev(ot.tree.HeadsChanges(), func(ch *Change) bool {
func (ot *objectTree) getChangesFromTree(theirHeads []string) (rawChanges []*aclpb.RawChange, err error) {
ot.tree.dfsPrev(
ot.tree.HeadsChanges(),
theirHeads,
func(ch *Change) bool {
var marshalled []byte
marshalled, err = ch.Content.Marshal()
if err != nil {
@ -556,12 +553,13 @@ func (ot *objectTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err e
}
rawChanges = append(rawChanges, raw)
return true
}, func(changes []*Change) {})
},
func(changes []*Change) {})
return
}
func (ot *objectTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) {
func (ot *objectTree) getChangesFromDB(commonSnapshot string, theirHeads []string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) {
load := func(id string) (*Change, error) {
raw, err := ot.treeStorage.GetRawChange(context.Background(), id)
if err != nil {
@ -576,8 +574,13 @@ func (ot *objectTree) getChangesFromDB(commonSnapshot string, needStartSnapshot
rawChanges = append(rawChanges, raw)
return ch, nil
}
// setting breakpoints to include head and common snapshot
// that means that we will ignore all changes which start at theirHeads
breakpoints := make([]string, 0, len(theirHeads)+1)
breakpoints = append(breakpoints, commonSnapshot)
breakpoints = append(breakpoints, theirHeads...)
_, err = ot.treeBuilder.dfs(ot.tree.Heads(), commonSnapshot, load)
_, err = ot.treeBuilder.dfs(ot.tree.Heads(), breakpoints, load)
if err != nil {
return
}

View File

@ -271,7 +271,7 @@ func TestObjectTree(t *testing.T) {
assert.Equal(t, true, objTree.(*objectTree).snapshotPathIsActual())
})
t.Run("changes after common snapshot from tree", func(t *testing.T) {
t.Run("changes after common snapshot complex", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
@ -289,8 +289,11 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "0", objTree.Root().Id)
t.Run("changes from tree", func(t *testing.T) {
changes, err := objTree.ChangesAfterCommonSnapshot([]string{"3", "0"}, []string{})
require.NoError(t, err, "changes after common snapshot should be without error")
changeIds := make(map[string]struct{})
changes, err := objTree.ChangesAfterCommonSnapshot([]string{"3", "0"})
for _, ch := range changes {
changeIds[ch.Id] = struct{}{}
}
@ -301,7 +304,28 @@ func TestObjectTree(t *testing.T) {
}
})
t.Run("changes after common snapshot from db", func(t *testing.T) {
t.Run("changes from tree after first", func(t *testing.T) {
changes, err := objTree.ChangesAfterCommonSnapshot([]string{"3", "0"}, []string{"1"})
require.NoError(t, err, "changes after common snapshot should be without error")
changeIds := make(map[string]struct{})
for _, ch := range changes {
changeIds[ch.Id] = struct{}{}
}
for _, raw := range rawChanges {
if raw.Id == "1" {
_, ok := changeIds[raw.Id]
assert.Equal(t, false, ok)
continue
}
_, ok := changeIds[raw.Id]
assert.Equal(t, true, ok)
}
})
})
t.Run("changes after common snapshot simple", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
@ -316,8 +340,11 @@ func TestObjectTree(t *testing.T) {
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "3", objTree.Root().Id)
t.Run("changes from db", func(t *testing.T) {
changes, err := objTree.ChangesAfterCommonSnapshot([]string{"0"}, []string{})
require.NoError(t, err, "changes after common snapshot should be without error")
changeIds := make(map[string]struct{})
changes, err := objTree.ChangesAfterCommonSnapshot([]string{"0"})
for _, ch := range changes {
changeIds[ch.Id] = struct{}{}
}
@ -328,23 +355,11 @@ func TestObjectTree(t *testing.T) {
}
})
t.Run("changes after common snapshot from db, they have empty path", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)
changeCreator := ctx.changeCreator
objTree := ctx.objTree
rawChanges := []*aclpb.RawChange{
changeCreator.createRaw("1", aclList.Head().Id, "0", false, "0"),
changeCreator.createRaw("2", aclList.Head().Id, "0", false, "1"),
changeCreator.createRaw("3", aclList.Head().Id, "0", true, "2"),
}
_, err := objTree.AddRawChanges(context.Background(), rawChanges...)
require.NoError(t, err, "adding changes should be without error")
require.Equal(t, "3", objTree.Root().Id)
t.Run("changes from db with empty path", func(t *testing.T) {
changes, err := objTree.ChangesAfterCommonSnapshot([]string{}, []string{})
require.NoError(t, err, "changes after common snapshot should be without error")
changeIds := make(map[string]struct{})
changes, err := objTree.ChangesAfterCommonSnapshot([]string{})
for _, ch := range changes {
changeIds[ch.Id] = struct{}{}
}
@ -356,6 +371,7 @@ func TestObjectTree(t *testing.T) {
_, ok := changeIds["0"]
assert.Equal(t, true, ok)
})
})
t.Run("add new changes related to previous snapshot", func(t *testing.T) {
ctx := prepareTreeContext(t, aclList)

231
pkg/acl/tree/rawloader.go Normal file
View File

@ -0,0 +1,231 @@
package tree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"time"
)
type rawChangeLoader struct {
treeStorage storage.TreeStorage
changeBuilder ChangeBuilder
// buffers
idStack []string
cache map[string]rawCacheEntry
}
type rawCacheEntry struct {
change *Change
rawChange *aclpb.RawChange
}
func newRawChangeLoader(treeStorage storage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader {
return &rawChangeLoader{
treeStorage: treeStorage,
changeBuilder: changeBuilder,
}
}
func (r *rawChangeLoader) LoadFromTree(t *Tree, breakpoints []string) ([]*aclpb.RawChange, error) {
var stack []*Change
for _, h := range t.headIds {
stack = append(stack, t.attached[h])
}
convert := func(chs []*Change) (rawChanges []*aclpb.RawChange, err error) {
for _, ch := range chs {
var marshalled []byte
marshalled, err = ch.Content.Marshal()
if err != nil {
return
}
raw := &aclpb.RawChange{
Payload: marshalled,
Signature: ch.Signature(),
Id: ch.Id,
}
rawChanges = append(rawChanges, raw)
}
return
}
// getting all changes that we visit
var results []*Change
rootVisited := false
t.dfsPrev(
stack,
breakpoints,
func(ch *Change) bool {
results = append(results, ch)
return true
},
func(visited []*Change) {
if t.root.visited {
rootVisited = true
}
},
)
// if we stopped at breakpoints or there are no breakpoints
if !rootVisited || len(breakpoints) == 0 {
return convert(results)
}
// now starting from breakpoints
stack = stack[:0]
for _, h := range breakpoints {
stack = append(stack, t.attached[h])
}
// doing another dfs to get all changes before breakpoints, we need to exclude them from results
t.dfsPrev(
stack,
[]string{},
func(ch *Change) bool {
return true
},
func(visited []*Change) {
discardFromSlice(results, func(change *Change) bool {
return change.visited
})
},
)
// otherwise we want to exclude everything that wasn't in breakpoints
return convert(results)
}
func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error) {
var ok bool
if entry, ok = r.cache[id]; ok {
return
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
rawChange, err := r.treeStorage.GetRawChange(ctx, id)
if err != nil {
return
}
change, err := r.changeBuilder.ConvertFromRaw(rawChange)
if err != nil {
return
}
entry = rawCacheEntry{
change: change,
rawChange: rawChange,
}
r.cache[id] = entry
return
}
func (r *rawChangeLoader) LoadFromStorage(commonSnapshot string, heads, breakpoints []string) ([]*aclpb.RawChange, error) {
// initializing buffers
r.idStack = r.idStack[:0]
// updating map
bufPosMap := make(map[string]int)
for _, breakpoint := range breakpoints {
bufPosMap[breakpoint] = -1
}
bufPosMap[commonSnapshot] = -1
dfs := func(
commonSnapshot string,
heads []string,
startCounter int,
shouldVisit func(counter int, mapExists bool) bool,
visit func(prevCounter int, entry rawCacheEntry) int) bool {
commonSnapshotVisited := false
for len(r.idStack) > 0 {
id := r.idStack[len(r.idStack)-1]
r.idStack = r.idStack[:len(r.idStack)-1]
cnt, exists := bufPosMap[id]
if !shouldVisit(cnt, exists) {
continue
}
entry, err := r.loadEntry(id)
if err != nil {
continue
}
// setting the counter when we visit
bufPosMap[id] = visit(cnt, entry)
for _, prev := range entry.change.PreviousIds {
if prev == commonSnapshot {
commonSnapshotVisited = true
break
}
cnt, exists = bufPosMap[prev]
if !shouldVisit(cnt, exists) {
continue
}
r.idStack = append(r.idStack, prev)
}
}
return commonSnapshotVisited
}
// preparing first pass
r.idStack = append(r.idStack, heads...)
var buffer []*aclpb.RawChange
rootVisited := dfs(commonSnapshot, heads, 0,
func(counter int, mapExists bool) bool {
return !mapExists
},
func(_ int, entry rawCacheEntry) int {
buffer = append(buffer, entry.rawChange)
return len(buffer) - 1
})
// checking if we stopped at breakpoints
if !rootVisited || len(breakpoints) == 0 {
return buffer, nil
}
// resetting stack
r.idStack = r.idStack[:0]
r.idStack = append(r.idStack, breakpoints...)
// marking all visited as nil
dfs(commonSnapshot, heads, len(buffer),
func(counter int, mapExists bool) bool {
return !mapExists || counter < len(buffer)
},
func(discardedPosition int, entry rawCacheEntry) int {
if discardedPosition != -1 {
buffer[discardedPosition] = nil
}
return len(buffer) + 1
})
discardFromSlice(buffer, func(change *aclpb.RawChange) bool {
return change == nil
})
return buffer, nil
}
func discardFromSlice[T any](elements []T, isDiscarded func(T) bool) {
var (
finishedIdx = 0
currentIdx = 0
)
for currentIdx < len(elements) {
if !isDiscarded(elements[currentIdx]) && finishedIdx != currentIdx {
elements[finishedIdx] = elements[currentIdx]
finishedIdx++
}
currentIdx++
}
elements = elements[:finishedIdx]
}

View File

@ -271,9 +271,17 @@ func (t *Tree) after(id1, id2 string) (found bool) {
return
}
func (t *Tree) dfsPrev(stack []*Change, visit func(ch *Change) (isContinue bool), afterVisit func([]*Change)) {
func (t *Tree) dfsPrev(stack []*Change, breakpoints []string, visit func(ch *Change) (isContinue bool), afterVisit func([]*Change)) {
t.visitedBuf = t.visitedBuf[:0]
// setting breakpoints as visited
for _, breakpoint := range breakpoints {
if ch, ok := t.attached[breakpoint]; ok {
ch.visited = true
t.visitedBuf = append(t.visitedBuf, ch)
}
}
defer func() {
afterVisit(t.visitedBuf)
for _, ch := range t.visitedBuf {

View File

@ -18,9 +18,9 @@ func (t *Tree) checkRoot(change *Change) (total int) {
stack = append(stack, t.attached[h])
}
change.visited = true
t.dfsPrev(
stack,
[]string{change.Id},
func(ch *Change) bool {
total += 1
return true
@ -31,7 +31,6 @@ func (t *Tree) checkRoot(change *Change) (total int) {
}
},
)
change.visited = false
return
}
@ -46,6 +45,7 @@ func (t *Tree) makeRootAndRemove(start *Change) {
t.dfsPrev(
stack,
[]string{},
func(ch *Change) bool {
return true
},