Further changes to treebuilder etc

This commit is contained in:
mcrakhman 2022-09-05 11:48:50 +02:00 committed by Mikhail Iudin
parent 9882ebc267
commit b026d228df
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
11 changed files with 322 additions and 309 deletions

View File

@ -24,6 +24,7 @@ var ErrDocumentForbidden = errors.New("your user was forbidden access to the doc
var ErrUserAlreadyExists = errors.New("user already exists") var ErrUserAlreadyExists = errors.New("user already exists")
var ErrNoSuchRecord = errors.New("no such record") var ErrNoSuchRecord = errors.New("no such record")
var ErrInsufficientPermissions = errors.New("insufficient permissions") var ErrInsufficientPermissions = errors.New("insufficient permissions")
var ErrNoReadKey = errors.New("acl state doesn't have a read key")
type UserPermissionPair struct { type UserPermissionPair struct {
Identity string Identity string
@ -70,6 +71,14 @@ func (st *ACLState) CurrentReadKeyHash() uint64 {
return st.currentReadKeyHash return st.currentReadKeyHash
} }
func (st *ACLState) CurrentReadKey() (*symmetric.Key, error) {
key, exists := st.userReadKeys[st.currentReadKeyHash]
if !exists {
return nil, ErrNoReadKey
}
return key, nil
}
func (st *ACLState) UserReadKeys() map[uint64]*symmetric.Key { func (st *ACLState) UserReadKeys() map[uint64]*symmetric.Key {
return st.userReadKeys return st.userReadKeys
} }

View File

@ -3,8 +3,6 @@ package tree
import ( import (
"fmt" "fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/symmetric" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/symmetric"
@ -52,7 +50,7 @@ func (ch *Change) DecryptContents(key *symmetric.Key) error {
return nil return nil
} }
func NewFromRawChange(rawChange *aclpb.RawChange) (*Change, error) { func NewChangeFromRaw(rawChange *aclpb.RawChange) (*Change, error) {
unmarshalled := &aclpb.Change{} unmarshalled := &aclpb.Change{}
err := proto.Unmarshal(rawChange.Payload, unmarshalled) err := proto.Unmarshal(rawChange.Payload, unmarshalled)
if err != nil { if err != nil {
@ -64,25 +62,20 @@ func NewFromRawChange(rawChange *aclpb.RawChange) (*Change, error) {
return ch, nil return ch, nil
} }
func NewFromVerifiedRawChange( func NewVerifiedChangeFromRaw(
rawChange *aclpb.RawChange, rawChange *aclpb.RawChange,
identityKeys map[string]signingkey.PubKey, kch *keychain) (*Change, error) {
decoder keys.Decoder) (*Change, error) {
unmarshalled := &aclpb.Change{} unmarshalled := &aclpb.Change{}
err := proto.Unmarshal(rawChange.Payload, unmarshalled) err := proto.Unmarshal(rawChange.Payload, unmarshalled)
if err != nil { if err != nil {
return nil, err return nil, err
} }
identityKey, exists := identityKeys[unmarshalled.Identity] identityKey, err := kch.getOrAdd(unmarshalled.Identity)
if !exists { if err != nil {
key, err := decoder.DecodeFromString(unmarshalled.Identity) return nil, err
if err != nil {
return nil, err
}
identityKey = key.(signingkey.PubKey)
identityKeys[unmarshalled.Identity] = identityKey
} }
res, err := identityKey.Verify(rawChange.Payload, rawChange.Signature) res, err := identityKey.Verify(rawChange.Payload, rawChange.Signature)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -3,13 +3,10 @@ package tree
import ( import (
"context" "context"
"errors" "errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/proto"
"go.uber.org/zap" "go.uber.org/zap"
@ -30,7 +27,6 @@ type RWLocker interface {
var ErrHasInvalidChanges = errors.New("the change is invalid") var ErrHasInvalidChanges = errors.New("the change is invalid")
var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot") var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot")
var ErrTreeWithoutIdentity = errors.New("acl tree is created without identity")
type AddResultSummary int type AddResultSummary int
@ -51,13 +47,12 @@ type AddResult struct {
type DocTree interface { type DocTree interface {
RWLocker RWLocker
CommonTree CommonTree
AddContent(ctx context.Context, aclList list.ACLList, content proto.Marshaler, isSnapshot bool) (*aclpb.RawChange, error) AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, aclList list.ACLList, changes ...*aclpb.RawChange) (AddResult, error) AddRawChanges(ctx context.Context, aclList list.ACLList, changes ...*aclpb.RawChange) (AddResult, error)
} }
type docTree struct { type docTree struct {
treeStorage storage.TreeStorage treeStorage storage.TreeStorage
accountData *account.AccountData
updateListener TreeUpdateListener updateListener TreeUpdateListener
id string id string
@ -66,45 +61,31 @@ type docTree struct {
treeBuilder *treeBuilder treeBuilder *treeBuilder
validator DocTreeValidator validator DocTreeValidator
kch *keychain
// buffers
difSnapshotBuf []*aclpb.RawChange difSnapshotBuf []*aclpb.RawChange
tmpChangesBuf []*Change tmpChangesBuf []*Change
newSnapshots []*Change
notSeenIdxBuf []int notSeenIdxBuf []int
identityKeys map[string]signingkey.PubKey
sync.RWMutex sync.RWMutex
} }
func BuildDocTreeWithIdentity(t storage.TreeStorage, acc *account.AccountData, listener TreeUpdateListener, aclList list.ACLList) (DocTree, error) { func BuildDocTree(t storage.TreeStorage, listener TreeUpdateListener, aclList list.ACLList) (DocTree, error) {
return buildDocTreeWithAccount(t, acc, acc.Decoder, listener, aclList) treeBuilder := newTreeBuilder(t)
}
func BuildDocTree(t storage.TreeStorage, decoder keys.Decoder, listener TreeUpdateListener, aclList list.ACLList) (DocTree, error) {
return buildDocTreeWithAccount(t, nil, decoder, listener, aclList)
}
func buildDocTreeWithAccount(
t storage.TreeStorage,
acc *account.AccountData,
decoder keys.Decoder,
listener TreeUpdateListener,
aclList list.ACLList) (DocTree, error) {
treeBuilder := newTreeBuilder(t, decoder)
validator := newTreeValidator() validator := newTreeValidator()
docTree := &docTree{ docTree := &docTree{
treeStorage: t, treeStorage: t,
tree: nil, tree: nil,
accountData: acc,
treeBuilder: treeBuilder, treeBuilder: treeBuilder,
validator: validator, validator: validator,
updateListener: listener, updateListener: listener,
tmpChangesBuf: make([]*Change, 0, 10), tmpChangesBuf: make([]*Change, 0, 10),
difSnapshotBuf: make([]*aclpb.RawChange, 0, 10), difSnapshotBuf: make([]*aclpb.RawChange, 0, 10),
notSeenIdxBuf: make([]int, 0, 10), notSeenIdxBuf: make([]int, 0, 10),
identityKeys: make(map[string]signingkey.PubKey), kch: newKeychain(),
} }
err := docTree.rebuildFromStorage(aclList, nil) err := docTree.rebuildFromStorage(aclList, nil)
if err != nil { if err != nil {
@ -143,13 +124,17 @@ func buildDocTreeWithAccount(
} }
func (d *docTree) rebuildFromStorage(aclList list.ACLList, newChanges []*Change) (err error) { func (d *docTree) rebuildFromStorage(aclList list.ACLList, newChanges []*Change) (err error) {
d.treeBuilder.Init(d.identityKeys) d.treeBuilder.Init(d.kch)
d.tree, err = d.treeBuilder.Build(false, newChanges) d.tree, err = d.treeBuilder.Build(newChanges)
if err != nil { if err != nil {
return err return err
} }
// during building the tree we may have marked some changes as possible roots,
// but obviously they are not roots, because of the way how we construct the tree
d.tree.clearPossibleRoots()
return d.validator.ValidateTree(d.tree, aclList) return d.validator.ValidateTree(d.tree, aclList)
} }
@ -165,152 +150,159 @@ func (d *docTree) Storage() storage.TreeStorage {
return d.treeStorage return d.treeStorage
} }
func (d *docTree) AddContent(ctx context.Context, aclList list.ACLList, content proto.Marshaler, isSnapshot bool) (*aclpb.RawChange, error) { func (d *docTree) AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (rawChange *aclpb.RawChange, err error) {
if d.accountData == nil {
return nil, ErrTreeWithoutIdentity
}
defer func() { defer func() {
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
if d.updateListener != nil { if d.updateListener != nil {
d.updateListener.Update(d) d.updateListener.Update(d)
} }
}() }()
state := aclList.ACLState() state := aclList.ACLState() // special method for own keys
change := &aclpb.Change{ aclChange := &aclpb.Change{
TreeHeadIds: d.tree.Heads(), TreeHeadIds: d.tree.Heads(),
AclHeadId: aclList.Head().Id, AclHeadId: aclList.Head().Id,
SnapshotBaseId: d.tree.RootId(), SnapshotBaseId: d.tree.RootId(),
CurrentReadKeyHash: state.CurrentReadKeyHash(), CurrentReadKeyHash: state.CurrentReadKeyHash(),
Timestamp: int64(time.Now().Nanosecond()), Timestamp: int64(time.Now().Nanosecond()),
Identity: d.accountData.Identity, Identity: content.Identity,
IsSnapshot: isSnapshot, IsSnapshot: content.IsSnapshot,
} }
marshalledData, err := content.Marshal() marshalledData, err := content.Proto.Marshal()
if err != nil { if err != nil {
return nil, err return nil, err
} }
encrypted, err := state.UserReadKeys()[state.CurrentReadKeyHash()].Encrypt(marshalledData)
if err != nil {
return nil, err
}
change.ChangesData = encrypted
fullMarshalledChange, err := proto.Marshal(change) readKey, err := state.CurrentReadKey()
if err != nil { if err != nil {
return nil, err return nil, err
} }
signature, err := d.accountData.SignKey.Sign(fullMarshalledChange)
encrypted, err := readKey.Encrypt(marshalledData)
if err != nil { if err != nil {
return nil, err return nil, err
} }
aclChange.ChangesData = encrypted
fullMarshalledChange, err := proto.Marshal(aclChange)
if err != nil {
return nil, err
}
signature, err := content.Key.Sign(fullMarshalledChange)
if err != nil {
return nil, err
}
id, err := cid.NewCIDFromBytes(fullMarshalledChange) id, err := cid.NewCIDFromBytes(fullMarshalledChange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ch := NewChange(id, change)
ch.ParsedModel = content
ch.Sign = signature
if isSnapshot { docChange := NewChange(id, aclChange)
docChange.ParsedModel = content
docChange.Sign = signature
if content.IsSnapshot {
// clearing tree, because we already fixed everything in the last snapshot // clearing tree, because we already fixed everything in the last snapshot
d.tree = &Tree{} d.tree = &Tree{}
} }
err = d.tree.AddMergedHead(ch) err = d.tree.AddMergedHead(docChange)
if err != nil { if err != nil {
panic("error in adding head") panic(err)
} }
rawCh := &aclpb.RawChange{ rawChange = &aclpb.RawChange{
Payload: fullMarshalledChange, Payload: fullMarshalledChange,
Signature: ch.Signature(), Signature: docChange.Signature(),
Id: ch.Id, Id: docChange.Id,
} }
err = d.treeStorage.AddRawChange(rawCh) err = d.treeStorage.AddRawChange(rawChange)
if err != nil { if err != nil {
return nil, err return
} }
err = d.treeStorage.SetHeads([]string{ch.Id}) err = d.treeStorage.SetHeads([]string{docChange.Id})
if err != nil { return
return nil, err
}
return rawCh, nil
} }
func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (addResult AddResult, err error) { func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (addResult AddResult, err error) {
var mode Mode var mode Mode
mode, addResult, err = d.addRawChanges(ctx, aclList, rawChanges...)
if err != nil {
return
}
// reducing tree if we have new roots
d.tree.reduceTree()
// adding to database all the added changes only after they are good
for _, ch := range addResult.Added {
err = d.treeStorage.AddRawChange(ch)
if err != nil {
return
}
}
// setting heads
err = d.treeStorage.SetHeads(d.tree.Heads())
if err != nil {
return
}
if d.updateListener == nil {
return
}
switch mode {
case Append:
d.updateListener.Update(d)
case Rebuild:
d.updateListener.Rebuild(d)
default:
break
}
return
}
func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (mode Mode, addResult AddResult, err error) {
// resetting buffers // resetting buffers
d.tmpChangesBuf = d.tmpChangesBuf[:0] d.tmpChangesBuf = d.tmpChangesBuf[:0]
d.notSeenIdxBuf = d.notSeenIdxBuf[:0] d.notSeenIdxBuf = d.notSeenIdxBuf[:0]
d.difSnapshotBuf = d.difSnapshotBuf[:0] d.difSnapshotBuf = d.difSnapshotBuf[:0]
d.newSnapshots = d.newSnapshots[:0]
prevHeads := d.tree.Heads() prevHeads := d.tree.Heads()
// TODO: if we can use new snapshot -> update tree, check if some snapshot, dfsPrev?
// filtering changes, verifying and unmarshalling them // filtering changes, verifying and unmarshalling them
for idx, ch := range rawChanges { for idx, ch := range rawChanges {
if d.HasChange(ch.Id) { if d.HasChange(ch.Id) {
continue continue
} }
// if we already added the change to invalid ones
if _, exists := d.tree.invalidChanges[ch.Id]; exists {
return AddResult{}, ErrHasInvalidChanges
}
var change *Change var change *Change
change, err = NewFromVerifiedRawChange(ch, d.identityKeys, d.treeBuilder.signingPubKeyDecoder) change, err = NewVerifiedChangeFromRaw(ch, d.kch)
if err != nil { if err != nil {
return AddResult{}, err return
} }
if change.IsSnapshot {
d.newSnapshots = append(d.newSnapshots, change)
}
d.tmpChangesBuf = append(d.tmpChangesBuf, change) d.tmpChangesBuf = append(d.tmpChangesBuf, change)
d.notSeenIdxBuf = append(d.notSeenIdxBuf, idx) d.notSeenIdxBuf = append(d.notSeenIdxBuf, idx)
} }
// if no new changes, then returning // if no new changes, then returning
if len(d.notSeenIdxBuf) == 0 { if len(d.notSeenIdxBuf) == 0 {
return AddResult{ addResult = AddResult{
OldHeads: prevHeads, OldHeads: prevHeads,
Heads: prevHeads, Heads: prevHeads,
Summary: AddResultSummaryNothing, Summary: AddResultSummaryNothing,
}, nil }
return
} }
defer func() {
if err != nil {
return
}
// adding to database all the added changes only after they are good
for _, ch := range addResult.Added {
err = d.treeStorage.AddRawChange(ch)
if err != nil {
return
}
}
// setting heads
err = d.treeStorage.SetHeads(d.tree.Heads())
if err != nil {
return
}
if d.updateListener == nil {
return
}
switch mode {
case Append:
d.updateListener.Update(d)
case Rebuild:
d.updateListener.Rebuild(d)
default:
break
}
}()
// returns changes that we added to the tree // returns changes that we added to the tree
getAddedChanges := func() []*aclpb.RawChange { getAddedChanges := func() []*aclpb.RawChange {
var added []*aclpb.RawChange var added []*aclpb.RawChange
@ -333,13 +325,28 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawCh
} }
} }
// checks if we need to go to database
isOldSnapshot := func(ch *Change) bool {
if ch.SnapshotId == d.tree.RootId() {
return false
}
for _, sn := range d.newSnapshots {
// if change refers to newly received snapshot
if ch.SnapshotId == sn.Id {
return false
}
}
return true
}
// checking if we have some changes with different snapshot and then rebuilding // checking if we have some changes with different snapshot and then rebuilding
for _, ch := range d.tmpChangesBuf { for _, ch := range d.tmpChangesBuf {
if ch.SnapshotId != d.tree.RootId() && ch.SnapshotId != "" { if isOldSnapshot(ch) {
err = d.rebuildFromStorage(aclList, d.tmpChangesBuf) err = d.rebuildFromStorage(aclList, d.tmpChangesBuf)
if err != nil { if err != nil {
rollback() // rebuilding without new changes
return AddResult{}, err d.rebuildFromStorage(aclList, nil)
return
} }
addResult = AddResult{ addResult = AddResult{
@ -348,7 +355,6 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawCh
Added: getAddedChanges(), Added: getAddedChanges(),
Summary: AddResultSummaryRebuild, Summary: AddResultSummaryRebuild,
} }
err = nil
return return
} }
} }
@ -362,7 +368,6 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawCh
Heads: prevHeads, Heads: prevHeads,
Summary: AddResultSummaryNothing, Summary: AddResultSummaryNothing,
} }
err = nil
return return
default: default:
@ -371,7 +376,8 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawCh
err = d.validator.ValidateTree(d.tree, aclList) err = d.validator.ValidateTree(d.tree, aclList)
if err != nil { if err != nil {
rollback() rollback()
return AddResult{}, ErrHasInvalidChanges err = ErrHasInvalidChanges
return
} }
addResult = AddResult{ addResult = AddResult{
@ -380,7 +386,6 @@ func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawCh
Added: getAddedChanges(), Added: getAddedChanges(),
Summary: AddResultSummaryAppend, Summary: AddResultSummaryAppend,
} }
err = nil
} }
return return
} }
@ -429,14 +434,11 @@ func (d *docTree) SnapshotPath() []string {
} }
func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) { func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
// TODO: think about when the clients will have their full acl tree and thus full snapshots
// but no changes after some of the snapshots
var ( var (
isNewDocument = len(theirPath) == 0 isNewDocument = len(theirPath) == 0
ourPath = d.SnapshotPath() ourPath = d.SnapshotPath()
// by default returning everything we have // by default returning everything we have
commonSnapshot = ourPath[len(ourPath)-1] // TODO: root snapshot, probably it is better to have a specific method in treestorage commonSnapshot = ourPath[len(ourPath)-1]
err error err error
) )
@ -447,16 +449,55 @@ func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
return nil, err return nil, err
} }
} }
// TODO: if the snapshot is in the tree we probably can skip going to the DB
log.With(
zap.Strings("heads", d.tree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", d.id)).
Debug("getting all changes from common snapshot")
if commonSnapshot == d.tree.RootId() {
return d.getChangesFromTree(isNewDocument)
} else {
return d.getChangesFromDB(commonSnapshot, isNewDocument)
}
}
func (d *docTree) getChangesFromTree(isNewDocument bool) (marshalledChanges []*aclpb.RawChange, err error) {
if !isNewDocument {
// ignoring root change
d.tree.Root().visited = true
}
d.tree.dfsPrev(d.tree.HeadsChanges(), func(ch *Change) bool {
var marshalled []byte
marshalled, err = ch.Content.Marshal()
if err != nil {
return false
}
raw := &aclpb.RawChange{
Payload: marshalled,
Signature: ch.Signature(),
Id: ch.Id,
}
marshalledChanges = append(marshalledChanges, raw)
return true
}, func(changes []*Change) {})
if err != nil {
return nil, err
}
return
}
func (d *docTree) getChangesFromDB(commonSnapshot string, isNewDocument bool) (marshalledChanges []*aclpb.RawChange, err error) {
var rawChanges []*aclpb.RawChange var rawChanges []*aclpb.RawChange
// using custom load function to skip verification step and save raw changes
load := func(id string) (*Change, error) { load := func(id string) (*Change, error) {
raw, err := d.treeStorage.GetRawChange(context.Background(), id) raw, err := d.treeStorage.GetRawChange(context.Background(), id)
if err != nil { if err != nil {
return nil, err return nil, err
} }
ch, err := NewFromRawChange(raw) ch, err := NewChangeFromRaw(raw)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -464,16 +505,12 @@ func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
rawChanges = append(rawChanges, raw) rawChanges = append(rawChanges, raw)
return ch, nil return ch, nil
} }
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
log.With(
zap.Strings("heads", d.tree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", d.id)).
Debug("getting all changes from common snapshot")
_, err = d.treeBuilder.dfs(d.tree.Heads(), commonSnapshot, load) _, err = d.treeBuilder.dfs(d.tree.Heads(), commonSnapshot, load)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if isNewDocument { if isNewDocument {
// adding snapshot to raw changes // adding snapshot to raw changes
_, err = load(commonSnapshot) _, err = load(commonSnapshot)
@ -481,10 +518,6 @@ func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
return nil, err return nil, err
} }
} }
log.With(
zap.Int("len(changes)", len(rawChanges)),
zap.String("id", d.id)).
Debug("returning all changes after common snapshot")
return rawChanges, nil return rawChanges, nil
} }
@ -492,31 +525,3 @@ func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
func (d *docTree) DebugDump() (string, error) { func (d *docTree) DebugDump() (string, error) {
return d.tree.Graph(NoOpDescriptionParser) return d.tree.Graph(NoOpDescriptionParser)
} }
func commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
var i int
var j int
log.With(zap.Strings("our path", ourPath), zap.Strings("their path", theirPath)).
Debug("finding common snapshot for two paths")
OuterLoop:
// find starting point from the right
for i = len(ourPath) - 1; i >= 0; i-- {
for j = len(theirPath) - 1; j >= 0; j-- {
// most likely there would be only one comparison, because mostly the snapshot path will start from the root for nodes
if ourPath[i] == theirPath[j] {
break OuterLoop
}
}
}
if i < 0 || j < 0 {
return "", ErrNoCommonSnapshot
}
// find last common element of the sequence moving from right to left
for i >= 0 && j >= 0 {
if ourPath[i] == theirPath[j] {
i--
j--
}
}
return ourPath[i+1], nil
}

30
pkg/acl/tree/keychain.go Normal file
View File

@ -0,0 +1,30 @@
package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
)
type keychain struct {
decoder keys.Decoder
keys map[string]signingkey.PubKey
}
func newKeychain() *keychain {
return &keychain{
decoder: signingkey.NewEDPubKeyDecoder(),
}
}
func (k *keychain) getOrAdd(identity string) (signingkey.PubKey, error) {
if key, exists := k.keys[identity]; exists {
return key, nil
}
res, err := k.decoder.DecodeFromString(identity)
if err != nil {
return nil, err
}
k.keys[identity] = res.(signingkey.PubKey)
return res.(signingkey.PubKey), nil
}

View File

@ -0,0 +1,13 @@
package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"github.com/gogo/protobuf/proto"
)
type SignableChangeContent struct {
Proto proto.Marshaler
Key signingkey.PrivKey
Identity string
IsSnapshot bool
}

View File

@ -270,9 +270,16 @@ func (t *Tree) after(id1, id2 string) (found bool) {
return return
} }
func (t *Tree) dfsPrev(stack []*Change, visit func(ch *Change), afterVisit func([]*Change)) { func (t *Tree) dfsPrev(stack []*Change, visit func(ch *Change) (isContinue bool), afterVisit func([]*Change)) {
t.visitedBuf = t.visitedBuf[:0] t.visitedBuf = t.visitedBuf[:0]
defer func() {
afterVisit(t.visitedBuf)
for _, ch := range t.visitedBuf {
ch.visited = false
}
}()
for len(stack) > 0 { for len(stack) > 0 {
ch := stack[len(stack)-1] ch := stack[len(stack)-1]
stack = stack[:len(stack)-1] stack = stack[:len(stack)-1]
@ -289,11 +296,9 @@ func (t *Tree) dfsPrev(stack []*Change, visit func(ch *Change), afterVisit func(
stack = append(stack, prevCh) stack = append(stack, prevCh)
} }
} }
visit(ch) if !visit(ch) {
} return
afterVisit(t.visitedBuf) }
for _, ch := range t.visitedBuf {
ch.visited = false
} }
} }
@ -373,6 +378,14 @@ func (t *Tree) Heads() []string {
return t.headIds return t.headIds
} }
func (t *Tree) HeadsChanges() []*Change {
var heads []*Change
for _, head := range t.headIds {
heads = append(heads, t.attached[head])
}
return heads
}
func (t *Tree) String() string { func (t *Tree) String() string {
var buf = bytes.NewBuffer(nil) var buf = bytes.NewBuffer(nil)
t.Iterate(t.RootId(), func(c *Change) (isContinue bool) { t.Iterate(t.RootId(), func(c *Change) (isContinue bool) {

View File

@ -19,112 +19,55 @@ var (
) )
type treeBuilder struct { type treeBuilder struct {
cache map[string]*Change cache map[string]*Change
identityKeys map[string]signingkey.PubKey kch *keychain
signingPubKeyDecoder keys.Decoder tree *Tree
tree *Tree treeStorage storage.TreeStorage
treeStorage storage.TreeStorage
// buffers
idStack []string
loadBuffer []*Change
} }
func newTreeBuilder(t storage.TreeStorage, decoder keys.Decoder) *treeBuilder { func newTreeBuilder(t storage.TreeStorage) *treeBuilder {
return &treeBuilder{ return &treeBuilder{
signingPubKeyDecoder: decoder, treeStorage: t,
treeStorage: t,
} }
} }
func (tb *treeBuilder) Init(identityKeys map[string]signingkey.PubKey) { func (tb *treeBuilder) Init(kch *keychain) {
tb.cache = make(map[string]*Change) tb.cache = make(map[string]*Change)
tb.identityKeys = identityKeys tb.kch = kch
tb.tree = &Tree{} tb.tree = &Tree{}
} }
func (tb *treeBuilder) Build(fromStart bool, newChanges []*Change) (*Tree, error) { func (tb *treeBuilder) Build(newChanges []*Change) (*Tree, error) {
var headsAndOrphans []string var headsAndNewChanges []string
heads, err := tb.treeStorage.Heads() heads, err := tb.treeStorage.Heads()
if err != nil { if err != nil {
return nil, err return nil, err
} }
headsAndOrphans = append(headsAndOrphans, heads...) headsAndNewChanges = append(headsAndNewChanges, heads...)
tb.cache = make(map[string]*Change) tb.cache = make(map[string]*Change)
for _, ch := range newChanges { for _, ch := range newChanges {
headsAndOrphans = append(headsAndOrphans, ch.Id) headsAndNewChanges = append(headsAndNewChanges, ch.Id)
tb.cache[ch.Id] = ch tb.cache[ch.Id] = ch
} }
log.With(zap.Strings("heads", heads)).Debug("building tree") log.With(zap.Strings("heads", heads)).Debug("building tree")
if fromStart { breakpoint, err := tb.findBreakpoint(headsAndNewChanges)
if err := tb.buildTreeFromStart(headsAndOrphans); err != nil { if err != nil {
return nil, fmt.Errorf("buildTree error: %v", err) return nil, fmt.Errorf("findBreakpoint error: %v", err)
} }
} else {
breakpoint, err := tb.findBreakpoint(headsAndOrphans)
if err != nil {
return nil, fmt.Errorf("findBreakpoint error: %v", err)
}
if err = tb.buildTree(headsAndOrphans, breakpoint); err != nil { if err = tb.buildTree(headsAndNewChanges, breakpoint); err != nil {
return nil, fmt.Errorf("buildTree error: %v", err) return nil, fmt.Errorf("buildTree error: %v", err)
}
} }
return tb.tree, nil return tb.tree, nil
} }
func (tb *treeBuilder) buildTreeFromStart(heads []string) (err error) {
changes, root, err := tb.dfsFromStart(heads)
if err != nil {
return err
}
tb.tree.AddFast(root)
tb.tree.AddFast(changes...)
return
}
func (tb *treeBuilder) dfsFromStart(heads []string) (buf []*Change, root *Change, err error) {
var possibleRoots []*Change
stack := make([]string, len(heads), len(heads)*2)
copy(stack, heads)
buf = make([]*Change, 0, len(stack)*2)
uniqMap := make(map[string]struct{})
for len(stack) > 0 {
id := stack[len(stack)-1]
stack = stack[:len(stack)-1]
if _, exists := uniqMap[id]; exists {
continue
}
ch, err := tb.loadChange(id)
if err != nil {
continue
}
uniqMap[id] = struct{}{}
buf = append(buf, ch)
for _, prev := range ch.PreviousIds {
stack = append(stack, prev)
}
if len(ch.PreviousIds) == 0 {
possibleRoots = append(possibleRoots, ch)
}
}
header, err := tb.treeStorage.Header()
if err != nil {
return nil, nil, err
}
for _, r := range possibleRoots {
if r.Id == header.FirstId {
return buf, r, nil
}
}
return nil, nil, fmt.Errorf("could not find root change")
}
func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error) { func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error) {
ch, err := tb.loadChange(breakpoint) ch, err := tb.loadChange(breakpoint)
if err != nil { if err != nil {
@ -141,11 +84,17 @@ func (tb *treeBuilder) dfs(
heads []string, heads []string,
breakpoint string, breakpoint string,
load func(string) (*Change, error)) (buf []*Change, err error) { load func(string) (*Change, error)) (buf []*Change, err error) {
stack := make([]string, len(heads), len(heads)*2) tb.idStack = tb.idStack[:0]
tb.loadBuffer = tb.loadBuffer[:0]
buf = tb.loadBuffer
var (
stack = tb.idStack
uniqMap = map[string]struct{}{breakpoint: {}}
)
copy(stack, heads) copy(stack, heads)
buf = make([]*Change, 0, len(stack)*2)
uniqMap := map[string]struct{}{breakpoint: {}}
for len(stack) > 0 { for len(stack) > 0 {
id := stack[len(stack)-1] id := stack[len(stack)-1]
stack = stack[:len(stack)-1] stack = stack[:len(stack)-1]
@ -162,6 +111,9 @@ func (tb *treeBuilder) dfs(
buf = append(buf, ch) buf = append(buf, ch)
for _, prev := range ch.PreviousIds { for _, prev := range ch.PreviousIds {
if _, exists := uniqMap[id]; exists {
continue
}
stack = append(stack, prev) stack = append(stack, prev)
} }
} }
@ -173,7 +125,6 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) {
return ch, nil return ch, nil
} }
// TODO: Add virtual changes logic
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel() defer cancel()
@ -182,8 +133,7 @@ func (tb *treeBuilder) loadChange(id string) (ch *Change, err error) {
return nil, err return nil, err
} }
// TODO: maybe we can use unverified changes here, because we shouldn't put bad changes in the DB in the first place ch, err = NewVerifiedChangeFromRaw(change, tb.identityKeys, tb.signingPubKeyDecoder)
ch, err = NewFromVerifiedRawChange(change, tb.identityKeys, tb.signingPubKeyDecoder)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -302,11 +252,9 @@ func (tb *treeBuilder) findCommonForTwoSnapshots(s1, s2 string) (s string, err e
} }
isEmptySnapshot := func(ch *Change) bool { isEmptySnapshot := func(ch *Change) bool {
// TODO: add more sophisticated checks in Change for snapshots
return !ch.IsSnapshot return !ch.IsSnapshot
} }
// TODO: can we even have empty snapshots?
// prefer not empty snapshot // prefer not empty snapshot
if isEmptySnapshot(ch1) && !isEmptySnapshot(ch2) { if isEmptySnapshot(ch1) && !isEmptySnapshot(ch2) {
log.Warnf("changes build Tree: prefer %s(not empty) over %s(empty)", s2, s1) log.Warnf("changes build Tree: prefer %s(not empty) over %s(empty)", s2, s1)
@ -316,7 +264,6 @@ func (tb *treeBuilder) findCommonForTwoSnapshots(s1, s2 string) (s string, err e
return s1, nil return s1, nil
} }
// TODO: add virtual change mechanics
// unexpected behavior - just return lesser id // unexpected behavior - just return lesser id
if s1 < s2 { if s1 < s2 {
log.Warnf("changes build Tree: prefer %s (%s<%s)", s1, s1, s2) log.Warnf("changes build Tree: prefer %s (%s<%s)", s1, s1, s2)

View File

@ -21,8 +21,9 @@ func (t *Tree) checkRoot(change *Change) (total int) {
change.visited = true change.visited = true
t.dfsPrev( t.dfsPrev(
stack, stack,
func(ch *Change) { func(ch *Change) bool {
total += 1 total += 1
return true
}, },
func(changes []*Change) { func(changes []*Change) {
if t.root.visited { if t.root.visited {
@ -45,7 +46,9 @@ func (t *Tree) makeRootAndRemove(start *Change) {
t.dfsPrev( t.dfsPrev(
stack, stack,
func(ch *Change) {}, func(ch *Change) bool {
return true
},
func(changes []*Change) { func(changes []*Change) {
for _, ch := range changes { for _, ch := range changes {
delete(t.unAttached, ch.Id) delete(t.unAttached, ch.Id)

View File

@ -10,48 +10,6 @@ import (
"time" "time"
) )
//
//func CreateNewTreeStorageWithACL(
// acc *account.AccountData,
// build func(builder list.ACLChangeBuilder) error,
// create treestorage.CreatorFunc) (treestorage.Storage, error) {
// bld := list.newACLChangeBuilder()
// bld.Init(
// list.newACLStateWithIdentity(acc.Identity, acc.EncKey, signingkey.NewEd25519PubKeyDecoder()),
// &Tree{},
// acc)
// err := build(bld)
// if err != nil {
// return nil, err
// }
//
// change, payload, err := bld.BuildAndApply()
// if err != nil {
// return nil, err
// }
//
// rawChange := &aclpb.RawChange{
// Payload: payload,
// Signature: change.Signature(),
// Id: change.CID(),
// }
// header, id, err := createTreeHeaderAndId(rawChange, treepb.TreeHeader_ACLTree, "")
// if err != nil {
// return nil, err
// }
//
// thr, err := create(id, header, []*aclpb.RawChange{rawChange})
// if err != nil {
// return nil, err
// }
//
// err = thr.SetHeads([]string{change.CID()})
// if err != nil {
// return nil, err
// }
// return thr, nil
//}
func CreateNewTreeStorage( func CreateNewTreeStorage(
acc *account.AccountData, acc *account.AccountData,
aclList list.ACLList, aclList list.ACLList,
@ -71,20 +29,29 @@ func CreateNewTreeStorage(
if err != nil { if err != nil {
return nil, err return nil, err
} }
encrypted, err := state.UserReadKeys()[state.CurrentReadKeyHash()].Encrypt(marshalledData)
readKey, err := state.CurrentReadKey()
if err != nil { if err != nil {
return nil, err return nil, err
} }
encrypted, err := readKey.Encrypt(marshalledData)
if err != nil {
return nil, err
}
change.ChangesData = encrypted change.ChangesData = encrypted
fullMarshalledChange, err := proto.Marshal(change) fullMarshalledChange, err := proto.Marshal(change)
if err != nil { if err != nil {
return nil, err return nil, err
} }
signature, err := acc.SignKey.Sign(fullMarshalledChange) signature, err := acc.SignKey.Sign(fullMarshalledChange)
if err != nil { if err != nil {
return nil, err return nil, err
} }
changeId, err := cid.NewCIDFromBytes(fullMarshalledChange) changeId, err := cid.NewCIDFromBytes(fullMarshalledChange)
if err != nil { if err != nil {
return nil, err return nil, err

27
pkg/acl/tree/util.go Normal file
View File

@ -0,0 +1,27 @@
package tree
func commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
var i int
var j int
OuterLoop:
// find starting point from the right
for i = len(ourPath) - 1; i >= 0; i-- {
for j = len(theirPath) - 1; j >= 0; j-- {
// most likely there would be only one comparison, because mostly the snapshot path will start from the root for nodes
if ourPath[i] == theirPath[j] {
break OuterLoop
}
}
}
if i < 0 || j < 0 {
return "", ErrNoCommonSnapshot
}
// find last common element of the sequence moving from right to left
for i >= 0 && j >= 0 {
if ourPath[i] == theirPath[j] {
i--
j--
}
}
return ourPath[i+1], nil
}

View File

@ -97,7 +97,13 @@ func (s *service) UpdateDocumentTree(ctx context.Context, id, text string) (err
defer aclTree.RUnlock() defer aclTree.RUnlock()
content := createAppendTextChange(text) content := createAppendTextChange(text)
ch, err = docTree.AddContent(ctx, aclTree, content, false) signable := tree.SignableChangeContent{
Proto: content,
Key: s.account.Account().SignKey,
Identity: s.account.Account().Identity,
IsSnapshot: false,
}
ch, err = docTree.AddContent(ctx, aclTree, signable)
if err != nil { if err != nil {
return err return err
} }