Merge pull request #35 from anyproto/fix-raw-loader

This commit is contained in:
Mikhail Rakhmanov 2023-06-22 19:31:02 +02:00 committed by GitHub
commit 0095a34167
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -2,10 +2,11 @@ package objecttree
import ( import (
"context" "context"
"time"
"github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/object/tree/treestorage"
"github.com/anyproto/any-sync/util/slice" "github.com/anyproto/any-sync/util/slice"
"time"
) )
type rawChangeLoader struct { type rawChangeLoader struct {
@ -22,6 +23,7 @@ type rawCacheEntry struct {
change *Change change *Change
rawChange *treechangeproto.RawTreeChangeWithId rawChange *treechangeproto.RawTreeChangeWithId
position int position int
removed bool
} }
func newStorageLoader(treeStorage treestorage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader { func newStorageLoader(treeStorage treestorage.TreeStorage, changeBuilder ChangeBuilder) *rawChangeLoader {
@ -126,7 +128,6 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
if err != nil { if err != nil {
continue continue
} }
entry.position = -1
r.cache[b] = entry r.cache[b] = entry
existingBreakpoints = append(existingBreakpoints, b) existingBreakpoints = append(existingBreakpoints, b)
} }
@ -135,8 +136,7 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
dfs := func( dfs := func(
commonSnapshot string, commonSnapshot string,
heads []string, heads []string,
startCounter int, shouldVisit func(entry rawCacheEntry, mapExists bool) bool,
shouldVisit func(counter int, mapExists bool) bool,
visit func(entry rawCacheEntry) rawCacheEntry) bool { visit func(entry rawCacheEntry) rawCacheEntry) bool {
// resetting stack // resetting stack
@ -150,7 +150,7 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
r.idStack = r.idStack[:len(r.idStack)-1] r.idStack = r.idStack[:len(r.idStack)-1]
entry, exists := r.cache[id] entry, exists := r.cache[id]
if !shouldVisit(entry.position, exists) { if !shouldVisit(entry, exists) {
continue continue
} }
if id == commonSnapshot { if id == commonSnapshot {
@ -159,7 +159,6 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
} }
if !exists { if !exists {
entry, err = r.loadEntry(id) entry, err = r.loadEntry(id)
entry.position = -1
if err != nil { if err != nil {
continue continue
} }
@ -174,7 +173,7 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
break break
} }
prevEntry, exists := r.cache[prev] prevEntry, exists := r.cache[prev]
if !shouldVisit(prevEntry.position, exists) { if !shouldVisit(prevEntry, exists) {
continue continue
} }
r.idStack = append(r.idStack, prev) r.idStack = append(r.idStack, prev)
@ -187,8 +186,8 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
r.idStack = append(r.idStack, heads...) r.idStack = append(r.idStack, heads...)
var buffer []*treechangeproto.RawTreeChangeWithId var buffer []*treechangeproto.RawTreeChangeWithId
rootVisited := dfs(commonSnapshot, heads, 0, rootVisited := dfs(commonSnapshot, heads,
func(counter int, mapExists bool) bool { func(_ rawCacheEntry, mapExists bool) bool {
return !mapExists return !mapExists
}, },
func(entry rawCacheEntry) rawCacheEntry { func(entry rawCacheEntry) rawCacheEntry {
@ -213,11 +212,13 @@ func (r *rawChangeLoader) loadFromStorage(commonSnapshot string, heads, breakpoi
} }
// marking all visited as nil // marking all visited as nil
dfs(commonSnapshot, existingBreakpoints, len(buffer), dfs(commonSnapshot, existingBreakpoints,
func(counter int, mapExists bool) bool { func(entry rawCacheEntry, mapExists bool) bool {
return !mapExists || counter < len(buffer) // only going through already loaded changes
return mapExists && !entry.removed
}, },
func(entry rawCacheEntry) rawCacheEntry { func(entry rawCacheEntry) rawCacheEntry {
entry.removed = true
if entry.position != -1 { if entry.position != -1 {
buffer[entry.position] = nil buffer[entry.position] = nil
} }
@ -248,6 +249,7 @@ func (r *rawChangeLoader) loadEntry(id string) (entry rawCacheEntry, err error)
entry = rawCacheEntry{ entry = rawCacheEntry{
change: change, change: change,
rawChange: rawChange, rawChange: rawChange,
position: -1,
} }
return return
} }