Rename to object tree

This commit is contained in:
mcrakhman 2022-09-05 13:08:49 +02:00 committed by Mikhail Iudin
parent 257b9cba8e
commit d3e62b418a
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
8 changed files with 140 additions and 143 deletions

View File

@ -64,7 +64,7 @@ func NewChangeFromRaw(rawChange *aclpb.RawChange) (*Change, error) {
return ch, nil
}
func NewVerifiedChangeFromRaw(
func newVerifiedChangeFromRaw(
rawChange *aclpb.RawChange,
kch *keychain) (*Change, error) {
unmarshalled := &aclpb.Change{}

View File

@ -1,21 +0,0 @@
package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
)
type CommonTree interface {
ID() string
Header() *aclpb.Header
Heads() []string
Root() *Change
Iterate(func(change *Change) bool)
IterateFrom(string, func(change *Change) bool)
HasChange(string) bool
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Storage() storage.TreeStorage
DebugDump() (string, error)
Close() error
}

View File

@ -14,9 +14,9 @@ import (
"time"
)
type TreeUpdateListener interface {
Update(tree DocTree)
Rebuild(tree DocTree)
type ObjectTreeUpdateListener interface {
Update(tree ObjectTree)
Rebuild(tree ObjectTree)
}
type RWLocker interface {
@ -46,16 +46,33 @@ type AddResult struct {
Summary AddResultSummary
}
type DocTree interface {
type ObjectTree interface {
RWLocker
CommonTree
ID() string
Header() *aclpb.Header
Heads() []string
Root() *Change
HasChange(string) bool
Iterate(func(change *Change) bool)
IterateFrom(string, func(change *Change) bool)
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Storage() storage.TreeStorage
DebugDump() (string, error)
AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, aclList list.ACLList, changes ...*aclpb.RawChange) (AddResult, error)
Close() error
}
type docTree struct {
type objectTree struct {
treeStorage storage.TreeStorage
updateListener TreeUpdateListener
updateListener ObjectTreeUpdateListener
id string
header *aclpb.Header
@ -76,12 +93,12 @@ type docTree struct {
sync.RWMutex
}
func BuildDocTree(t storage.TreeStorage, listener TreeUpdateListener, aclList list.ACLList) (DocTree, error) {
treeBuilder := newTreeBuilder(t)
func BuildObjectTree(treeStorage storage.TreeStorage, listener ObjectTreeUpdateListener, aclList list.ACLList) (ObjectTree, error) {
treeBuilder := newTreeBuilder(treeStorage)
validator := newTreeValidator()
docTree := &docTree{
treeStorage: t,
objTree := &objectTree{
treeStorage: treeStorage,
tree: nil,
treeBuilder: treeBuilder,
validator: validator,
@ -91,80 +108,81 @@ func BuildDocTree(t storage.TreeStorage, listener TreeUpdateListener, aclList li
notSeenIdxBuf: make([]int, 0, 10),
kch: newKeychain(),
}
err := docTree.rebuildFromStorage(aclList, nil)
err := objTree.rebuildFromStorage(aclList, nil)
if err != nil {
return nil, err
}
storageHeads, err := t.Heads()
storageHeads, err := treeStorage.Heads()
if err != nil {
return nil, err
}
// comparing rebuilt heads with heads in storage
// in theory it can happen that we didn't set heads because the process has crashed
// therefore we want to set them later
if !slice.UnsortedEquals(storageHeads, docTree.tree.Heads()) {
log.With(zap.Strings("storage", storageHeads), zap.Strings("rebuilt", docTree.tree.Heads())).
Errorf("the heads in storage and tree are different")
err = t.SetHeads(docTree.tree.Heads())
if !slice.UnsortedEquals(storageHeads, objTree.tree.Heads()) {
log.With(zap.Strings("storage", storageHeads), zap.Strings("rebuilt", objTree.tree.Heads())).
Errorf("the heads in storage and objTree are different")
err = treeStorage.SetHeads(objTree.tree.Heads())
if err != nil {
return nil, err
}
}
docTree.id, err = t.ID()
objTree.id, err = treeStorage.ID()
if err != nil {
return nil, err
}
docTree.header, err = t.Header()
objTree.header, err = treeStorage.Header()
if err != nil {
return nil, err
}
if listener != nil {
listener.Rebuild(docTree)
listener.Rebuild(objTree)
}
return docTree, nil
return objTree, nil
}
func (d *docTree) rebuildFromStorage(aclList list.ACLList, newChanges []*Change) (err error) {
d.treeBuilder.Init(d.kch)
func (ot *objectTree) rebuildFromStorage(aclList list.ACLList, newChanges []*Change) (err error) {
ot.treeBuilder.Init(ot.kch)
d.tree, err = d.treeBuilder.Build(newChanges)
ot.tree, err = ot.treeBuilder.Build(newChanges)
if err != nil {
return
}
// during building the tree we may have marked some changes as possible roots,
// but obviously they are not roots, because of the way how we construct the tree
d.tree.clearPossibleRoots()
ot.tree.clearPossibleRoots()
return d.validator.ValidateTree(d.tree, aclList)
return ot.validator.ValidateTree(ot.tree, aclList)
}
func (d *docTree) ID() string {
return d.id
func (ot *objectTree) ID() string {
return ot.id
}
func (d *docTree) Header() *aclpb.Header {
return d.header
func (ot *objectTree) Header() *aclpb.Header {
return ot.header
}
func (d *docTree) Storage() storage.TreeStorage {
return d.treeStorage
func (ot *objectTree) Storage() storage.TreeStorage {
return ot.treeStorage
}
func (d *docTree) AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (rawChange *aclpb.RawChange, err error) {
func (ot *objectTree) AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (rawChange *aclpb.RawChange, err error) {
defer func() {
if d.updateListener != nil {
d.updateListener.Update(d)
if ot.updateListener != nil {
ot.updateListener.Update(ot)
}
}()
state := aclList.ACLState() // special method for own keys
aclChange := &aclpb.Change{
TreeHeadIds: d.tree.Heads(),
TreeHeadIds: ot.tree.Heads(),
AclHeadId: aclList.Head().Id,
SnapshotBaseId: d.tree.RootId(),
SnapshotBaseId: ot.tree.RootId(),
CurrentReadKeyHash: state.CurrentReadKeyHash(),
Timestamp: int64(time.Now().Nanosecond()),
Identity: content.Identity,
@ -207,9 +225,9 @@ func (d *docTree) AddContent(ctx context.Context, aclList list.ACLList, content
if content.IsSnapshot {
// clearing tree, because we already fixed everything in the last snapshot
d.tree = &Tree{}
ot.tree = &Tree{}
}
err = d.tree.AddMergedHead(docChange)
err = ot.tree.AddMergedHead(docChange)
if err != nil {
panic(err)
}
@ -219,86 +237,86 @@ func (d *docTree) AddContent(ctx context.Context, aclList list.ACLList, content
Id: docChange.Id,
}
err = d.treeStorage.AddRawChange(rawChange)
err = ot.treeStorage.AddRawChange(rawChange)
if err != nil {
return
}
err = d.treeStorage.SetHeads([]string{docChange.Id})
err = ot.treeStorage.SetHeads([]string{docChange.Id})
return
}
func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (addResult AddResult, err error) {
func (ot *objectTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (addResult AddResult, err error) {
var mode Mode
mode, addResult, err = d.addRawChanges(ctx, aclList, rawChanges...)
mode, addResult, err = ot.addRawChanges(ctx, aclList, rawChanges...)
if err != nil {
return
}
// reducing tree if we have new roots
d.tree.reduceTree()
ot.tree.reduceTree()
// adding to database all the added changes only after they are good
for _, ch := range addResult.Added {
err = d.treeStorage.AddRawChange(ch)
err = ot.treeStorage.AddRawChange(ch)
if err != nil {
return
}
}
// setting heads
err = d.treeStorage.SetHeads(d.tree.Heads())
err = ot.treeStorage.SetHeads(ot.tree.Heads())
if err != nil {
return
}
if d.updateListener == nil {
if ot.updateListener == nil {
return
}
switch mode {
case Append:
d.updateListener.Update(d)
ot.updateListener.Update(ot)
case Rebuild:
d.updateListener.Rebuild(d)
ot.updateListener.Rebuild(ot)
default:
break
}
return
}
func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (mode Mode, addResult AddResult, err error) {
func (ot *objectTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (mode Mode, addResult AddResult, err error) {
// resetting buffers
d.tmpChangesBuf = d.tmpChangesBuf[:0]
d.notSeenIdxBuf = d.notSeenIdxBuf[:0]
d.difSnapshotBuf = d.difSnapshotBuf[:0]
d.newSnapshotsBuf = d.newSnapshotsBuf[:0]
ot.tmpChangesBuf = ot.tmpChangesBuf[:0]
ot.notSeenIdxBuf = ot.notSeenIdxBuf[:0]
ot.difSnapshotBuf = ot.difSnapshotBuf[:0]
ot.newSnapshotsBuf = ot.newSnapshotsBuf[:0]
// this will be returned to client so we shouldn't use buffer here
prevHeadsCopy := make([]string, 0, len(d.tree.Heads()))
copy(prevHeadsCopy, d.tree.Heads())
prevHeadsCopy := make([]string, 0, len(ot.tree.Heads()))
copy(prevHeadsCopy, ot.tree.Heads())
// filtering changes, verifying and unmarshalling them
for idx, ch := range rawChanges {
if d.HasChange(ch.Id) {
if ot.HasChange(ch.Id) {
continue
}
var change *Change
change, err = NewVerifiedChangeFromRaw(ch, d.kch)
change, err = newVerifiedChangeFromRaw(ch, ot.kch)
if err != nil {
return
}
if change.IsSnapshot {
d.newSnapshotsBuf = append(d.newSnapshotsBuf, change)
ot.newSnapshotsBuf = append(ot.newSnapshotsBuf, change)
}
d.tmpChangesBuf = append(d.tmpChangesBuf, change)
d.notSeenIdxBuf = append(d.notSeenIdxBuf, idx)
ot.tmpChangesBuf = append(ot.tmpChangesBuf, change)
ot.notSeenIdxBuf = append(ot.notSeenIdxBuf, idx)
}
// if no new changes, then returning
if len(d.notSeenIdxBuf) == 0 {
if len(ot.notSeenIdxBuf) == 0 {
addResult = AddResult{
OldHeads: prevHeadsCopy,
Heads: prevHeadsCopy,
@ -308,17 +326,17 @@ func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawCh
}
headsCopy := func() []string {
newHeads := make([]string, 0, len(d.tree.Heads()))
copy(newHeads, d.tree.Heads())
newHeads := make([]string, 0, len(ot.tree.Heads()))
copy(newHeads, ot.tree.Heads())
return newHeads
}
// returns changes that we added to the tree
getAddedChanges := func() []*aclpb.RawChange {
var added []*aclpb.RawChange
for _, idx := range d.notSeenIdxBuf {
for _, idx := range ot.notSeenIdxBuf {
rawChange := rawChanges[idx]
if _, exists := d.tree.attached[rawChange.Id]; exists {
if _, exists := ot.tree.attached[rawChange.Id]; exists {
added = append(added, rawChange)
}
}
@ -326,21 +344,21 @@ func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawCh
}
rollback := func() {
for _, ch := range d.tmpChangesBuf {
if _, exists := d.tree.attached[ch.Id]; exists {
delete(d.tree.attached, ch.Id)
} else if _, exists := d.tree.unAttached[ch.Id]; exists {
delete(d.tree.unAttached, ch.Id)
for _, ch := range ot.tmpChangesBuf {
if _, exists := ot.tree.attached[ch.Id]; exists {
delete(ot.tree.attached, ch.Id)
} else if _, exists := ot.tree.unAttached[ch.Id]; exists {
delete(ot.tree.unAttached, ch.Id)
}
}
}
// checks if we need to go to database
isOldSnapshot := func(ch *Change) bool {
if ch.SnapshotId == d.tree.RootId() {
if ch.SnapshotId == ot.tree.RootId() {
return false
}
for _, sn := range d.newSnapshotsBuf {
for _, sn := range ot.newSnapshotsBuf {
// if change refers to newly received snapshot
if ch.SnapshotId == sn.Id {
return false
@ -350,12 +368,12 @@ func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawCh
}
// checking if we have some changes with different snapshot and then rebuilding
for _, ch := range d.tmpChangesBuf {
for _, ch := range ot.tmpChangesBuf {
if isOldSnapshot(ch) {
err = d.rebuildFromStorage(aclList, d.tmpChangesBuf)
err = ot.rebuildFromStorage(aclList, ot.tmpChangesBuf)
if err != nil {
// rebuilding without new changes
d.rebuildFromStorage(aclList, nil)
ot.rebuildFromStorage(aclList, nil)
return
}
@ -370,7 +388,7 @@ func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawCh
}
// normal mode of operation, where we don't need to rebuild from database
mode = d.tree.Add(d.tmpChangesBuf...)
mode = ot.tree.Add(ot.tmpChangesBuf...)
switch mode {
case Nothing:
addResult = AddResult{
@ -383,7 +401,7 @@ func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawCh
default:
// just rebuilding the state from start without reloading everything from tree storage
// as an optimization we could've started from current heads, but I didn't implement that
err = d.validator.ValidateTree(d.tree, aclList)
err = ot.validator.ValidateTree(ot.tree, aclList)
if err != nil {
rollback()
err = ErrHasInvalidChanges
@ -400,57 +418,57 @@ func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawCh
return
}
func (d *docTree) Iterate(f func(change *Change) bool) {
d.tree.Iterate(d.tree.RootId(), f)
func (ot *objectTree) Iterate(f func(change *Change) bool) {
ot.tree.Iterate(ot.tree.RootId(), f)
}
func (d *docTree) IterateFrom(s string, f func(change *Change) bool) {
d.tree.Iterate(s, f)
func (ot *objectTree) IterateFrom(s string, f func(change *Change) bool) {
ot.tree.Iterate(s, f)
}
func (d *docTree) HasChange(s string) bool {
_, attachedExists := d.tree.attached[s]
_, unattachedExists := d.tree.unAttached[s]
func (ot *objectTree) HasChange(s string) bool {
_, attachedExists := ot.tree.attached[s]
_, unattachedExists := ot.tree.unAttached[s]
return attachedExists || unattachedExists
}
func (d *docTree) Heads() []string {
return d.tree.Heads()
func (ot *objectTree) Heads() []string {
return ot.tree.Heads()
}
func (d *docTree) Root() *Change {
return d.tree.Root()
func (ot *objectTree) Root() *Change {
return ot.tree.Root()
}
func (d *docTree) Close() error {
func (ot *objectTree) Close() error {
return nil
}
func (d *docTree) SnapshotPath() []string {
if d.snapshotPathIsActual() {
return d.snapshotPath
func (ot *objectTree) SnapshotPath() []string {
if ot.snapshotPathIsActual() {
return ot.snapshotPath
}
var path []string
// TODO: think that the user may have not all of the snapshots locally
currentSnapshotId := d.tree.RootId()
currentSnapshotId := ot.tree.RootId()
for currentSnapshotId != "" {
sn, err := d.treeBuilder.loadChange(currentSnapshotId)
sn, err := ot.treeBuilder.loadChange(currentSnapshotId)
if err != nil {
break
}
path = append(path, currentSnapshotId)
currentSnapshotId = sn.SnapshotId
}
d.snapshotPath = path
ot.snapshotPath = path
return path
}
func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
func (ot *objectTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
var (
needFullDocument = len(theirPath) == 0
ourPath = d.SnapshotPath()
ourPath = ot.SnapshotPath()
// by default returning everything we have
commonSnapshot = ourPath[len(ourPath)-1]
err error
@ -465,20 +483,20 @@ func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
}
log.With(
zap.Strings("heads", d.tree.Heads()),
zap.Strings("heads", ot.tree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", d.id)).
zap.String("id", ot.id)).
Debug("getting all changes from common snapshot")
if commonSnapshot == d.tree.RootId() {
return d.getChangesFromTree()
if commonSnapshot == ot.tree.RootId() {
return ot.getChangesFromTree()
} else {
return d.getChangesFromDB(commonSnapshot, needFullDocument)
return ot.getChangesFromDB(commonSnapshot, needFullDocument)
}
}
func (d *docTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err error) {
d.tree.dfsPrev(d.tree.HeadsChanges(), func(ch *Change) bool {
func (ot *objectTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err error) {
ot.tree.dfsPrev(ot.tree.HeadsChanges(), func(ch *Change) bool {
var marshalled []byte
marshalled, err = ch.Content.Marshal()
if err != nil {
@ -496,9 +514,9 @@ func (d *docTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err error
return
}
func (d *docTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) {
func (ot *objectTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) {
load := func(id string) (*Change, error) {
raw, err := d.treeStorage.GetRawChange(context.Background(), id)
raw, err := ot.treeStorage.GetRawChange(context.Background(), id)
if err != nil {
return nil, err
}
@ -512,7 +530,7 @@ func (d *docTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool
return ch, nil
}
_, err = d.treeBuilder.dfs(d.tree.Heads(), commonSnapshot, load)
_, err = ot.treeBuilder.dfs(ot.tree.Heads(), commonSnapshot, load)
if err != nil {
return
}
@ -525,10 +543,10 @@ func (d *docTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool
return
}
func (d *docTree) snapshotPathIsActual() bool {
return len(d.snapshotPath) != 0 && d.snapshotPath[len(d.snapshotPath)-1] == d.tree.RootId()
func (ot *objectTree) snapshotPathIsActual() bool {
return len(ot.snapshotPath) != 0 && ot.snapshotPath[len(ot.snapshotPath)-1] == ot.tree.RootId()
}
func (d *docTree) DebugDump() (string, error) {
return d.tree.Graph(NoOpDescriptionParser)
func (ot *objectTree) DebugDump() (string, error) {
return ot.tree.Graph(NoOpDescriptionParser)
}

View File

@ -131,7 +131,7 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) {
return nil, err
}
ch, err = NewVerifiedChangeFromRaw(change, tb.kch)
ch, err = newVerifiedChangeFromRaw(change, tb.kch)
if err != nil {
return nil, err
}

View File

@ -80,7 +80,7 @@ func (s *service) treeDump(w http.ResponseWriter, req *http.Request) {
err error
)
err = s.treeCache.Do(context.Background(), treeId, func(obj interface{}) error {
t := obj.(tree.CommonTree)
t := obj.(tree.ObjectTree)
dump, err = t.DebugDump()
if err != nil {
return err

View File

@ -84,7 +84,7 @@ func (s *service) UpdateDocumentTree(ctx context.Context, id, text string) (err
Debug("updating document")
err = s.treeCache.Do(ctx, id, func(obj interface{}) error {
docTree, ok := obj.(tree.DocTree)
docTree, ok := obj.(tree.ObjectTree)
if !ok {
return fmt.Errorf("can't update acl trees with text")
}

View File

@ -88,7 +88,7 @@ func (r *requestHandler) HandleHeadUpdate(
Debug("processing head update")
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
docTree := obj.(tree.DocTree)
docTree := obj.(tree.ObjectTree)
docTree.Lock()
defer docTree.Unlock()
@ -160,7 +160,7 @@ func (r *requestHandler) HandleFullSyncRequest(
log.Info("getting doc tree from treeCache", zap.String("treeId", treeId))
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
docTree := obj.(tree.DocTree)
docTree := obj.(tree.ObjectTree)
docTree.Lock()
defer docTree.Unlock()
@ -222,7 +222,7 @@ func (r *requestHandler) HandleFullSyncResponse(
Debug("processing full sync response")
err = r.treeCache.Do(ctx, treeId, func(obj interface{}) error {
docTree := obj.(tree.DocTree)
docTree := obj.(tree.ObjectTree)
docTree.Lock()
defer docTree.Unlock()
@ -290,7 +290,7 @@ func (r *requestHandler) HandleACLList(
return err
}
func (r *requestHandler) prepareFullSyncRequest(theirPath []string, t tree.CommonTree) (*syncproto.SyncFullRequest, error) {
func (r *requestHandler) prepareFullSyncRequest(theirPath []string, t tree.ObjectTree) (*syncproto.SyncFullRequest, error) {
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
if err != nil {
return nil, err
@ -306,7 +306,7 @@ func (r *requestHandler) prepareFullSyncResponse(
treeId string,
theirPath []string,
theirChanges []*aclpb.RawChange,
t tree.CommonTree) (*syncproto.SyncFullResponse, error) {
t tree.ObjectTree) (*syncproto.SyncFullResponse, error) {
// TODO: we can probably use the common snapshot calculated on the request step from previous peer
ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath)
if err != nil {

View File

@ -123,7 +123,7 @@ func (s *service) loadTree(ctx context.Context, id string) (ocache.Object, error
return nil, fmt.Errorf("incorrect type")
}
log.Info("got header", zap.String("header", header.String()))
var docTree tree.DocTree
var docTree tree.ObjectTree
// TODO: it is a question if we need to use ACLList on the first tree build, because we can think that the tree is already validated
err = s.Do(ctx, header.AclListId, func(obj interface{}) error {
aclTree := obj.(list.ACLList)