Further work acltree, aclstate etc

This commit is contained in:
mcrakhman 2022-08-10 14:48:17 +02:00 committed by Mikhail Iudin
parent 3eedbf4188
commit 660f8d839b
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
8 changed files with 537 additions and 91 deletions

View File

@ -50,18 +50,12 @@ func newACLState() *ACLState {
}
}
func (st *ACLState) applyChange(changeWrapper *Change) (err error) {
change := changeWrapper.Content
func (st *ACLState) applyChange(change *aclpb.Change) (err error) {
aclData := &aclpb.ACLChangeACLData{}
if changeWrapper.DecryptedModel != nil {
aclData = changeWrapper.DecryptedModel.(*aclpb.ACLChangeACLData)
} else {
err = proto.Unmarshal(change.ChangesData, aclData)
if err != nil {
return
}
changeWrapper.DecryptedModel = aclData
err = proto.Unmarshal(change.ChangesData, aclData)
if err != nil {
return
}
defer func() {
@ -71,23 +65,51 @@ func (st *ACLState) applyChange(changeWrapper *Change) (err error) {
st.currentReadKeyHash = change.CurrentReadKeyHash
}()
return st.applyChangeData(aclData, change.CurrentReadKeyHash, change.Identity)
}
func (st *ACLState) applyChangeAndUpdate(changeWrapper *Change) (err error) {
change := changeWrapper.Content
aclData := &aclpb.ACLChangeACLData{}
if changeWrapper.ParsedModel != nil {
aclData = changeWrapper.ParsedModel.(*aclpb.ACLChangeACLData)
} else {
err = proto.Unmarshal(change.ChangesData, aclData)
if err != nil {
return
}
changeWrapper.ParsedModel = aclData
}
return st.applyChangeData(aclData, changeWrapper.Content.CurrentReadKeyHash, changeWrapper.Content.Identity)
}
func (st *ACLState) applyChangeData(changeData *aclpb.ACLChangeACLData, hash uint64, identity string) (err error) {
defer func() {
if err != nil {
return
}
st.currentReadKeyHash = hash
}()
// we can't check this for the user which is joining, because it will not be in our list
// the same is for the first change to be added
skipIdentityCheck := st.isUserJoin(aclData) || (st.currentReadKeyHash == 0 && st.isUserAdd(aclData, change.Identity))
skipIdentityCheck := st.isUserJoin(changeData) || (st.currentReadKeyHash == 0 && st.isUserAdd(changeData, identity))
if !skipIdentityCheck {
// we check signature when we add this to the Tree, so no need to do it here
if _, exists := st.userStates[change.Identity]; !exists {
if _, exists := st.userStates[identity]; !exists {
err = ErrNoSuchUser
return
}
if !st.hasPermission(change.Identity, aclpb.ACLChange_Admin) {
err = fmt.Errorf("user %s must have admin permissions", change.Identity)
if !st.hasPermission(identity, aclpb.ACLChange_Admin) {
err = fmt.Errorf("user %s must have admin permissions", identity)
return
}
}
for _, ch := range aclData.GetAclContent() {
for _, ch := range changeData.GetAclContent() {
if err = st.applyChangeContent(ch); err != nil {
log.Infof("error while applying changes: %v; ignore", err)
return err

View File

@ -60,11 +60,11 @@ func (sb *aclStateBuilder) BuildBefore(beforeId string) (*ACLState, bool, error)
if err == nil {
startChange = c
} else if err != ErrDocumentForbidden {
//log.Errorf("marking change %s as invalid: %v", c.Id, err)
log.Errorf("marking change %s as invalid: %v", c.Id, err)
sb.tree.RemoveInvalidChange(c.Id)
}
}()
err = state.applyChange(c)
err = state.applyChangeAndUpdate(c)
if err != nil {
return false
}

View File

@ -7,6 +7,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/asymmetric/signingkey"
"go.uber.org/zap"
"sync"
)
@ -53,7 +54,7 @@ type ACLTree interface {
ID() string
Header() *treepb.TreeHeader
ACLState() *ACLState
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*aclpb.RawChange, error)
AddContent(ctx context.Context, f func(builder ACLChangeBuilder) error) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
Heads() []string
Root() *Change
@ -80,18 +81,15 @@ type aclTree struct {
treeBuilder *treeBuilder
aclStateBuilder *aclStateBuilder
changeBuilder *changeBuilder
changeBuilder *aclChangeBuilder
sync.RWMutex
}
func BuildACLTreeWithIdentity(
t treestorage.TreeStorage,
acc *account.AccountData,
listener TreeUpdateListener) (ACLTree, error) {
func BuildACLTreeWithIdentity(t treestorage.TreeStorage, acc *account.AccountData, listener TreeUpdateListener) (ACLTree, error) {
treeBuilder := newTreeBuilder(t, acc.Decoder)
aclStateBuilder := newACLStateBuilderWithIdentity(acc.Decoder, acc)
changeBuilder := newChangeBuilder()
changeBuilder := newACLChangeBuilder()
aclTree := &aclTree{
treeStorage: t,
@ -129,8 +127,44 @@ func BuildACLTreeWithIdentity(
return aclTree, nil
}
func BuildACLTree(t treestorage.TreeStorage) {
// TODO: Add logic for building without identity
func BuildACLTree(t treestorage.TreeStorage, decoder signingkey.PubKeyDecoder, listener TreeUpdateListener) (ACLTree, error) {
treeBuilder := newTreeBuilder(t, decoder)
aclStateBuilder := newACLStateBuilder()
changeBuilder := newACLChangeBuilder()
aclTree := &aclTree{
treeStorage: t,
tree: nil,
aclState: nil,
treeBuilder: treeBuilder,
aclStateBuilder: aclStateBuilder,
changeBuilder: changeBuilder,
updateListener: listener,
}
err := aclTree.rebuildFromStorage()
if err != nil {
return nil, err
}
err = aclTree.removeOrphans()
if err != nil {
return nil, err
}
err = t.SetHeads(aclTree.Heads())
if err != nil {
return nil, err
}
aclTree.id, err = t.TreeID()
if err != nil {
return nil, err
}
aclTree.header, err = t.Header()
if err != nil {
return nil, err
}
listener.Rebuild(aclTree)
return aclTree, nil
}
func (a *aclTree) removeOrphans() error {
@ -189,14 +223,16 @@ func (a *aclTree) Storage() treestorage.TreeStorage {
return a.treeStorage
}
func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*aclpb.RawChange, error) {
func (a *aclTree) AddContent(ctx context.Context, build func(builder ACLChangeBuilder) error) (*aclpb.RawChange, error) {
if a.accountData == nil {
return nil, ErrTreeWithoutIdentity
}
defer func() {
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
a.updateListener.Update(a)
if a.updateListener != nil {
a.updateListener.Update(a)
}
}()
a.changeBuilder.Init(a.aclState, a.tree, a.accountData)
@ -258,13 +294,15 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
return
}
switch mode {
case Append:
a.updateListener.Update(a)
case Rebuild:
a.updateListener.Rebuild(a)
default:
break
if a.updateListener != nil {
switch mode {
case Append:
a.updateListener.Update(a)
case Rebuild:
a.updateListener.Rebuild(a)
default:
break
}
}
}()
@ -386,7 +424,7 @@ func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawCh
// if this is non-empty request
if !isNewDocument {
commonSnapshot, err = a.commonSnapshotForTwoPaths(ourPath, theirPath)
commonSnapshot, err = commonSnapshotForTwoPaths(ourPath, theirPath)
if err != nil {
return nil, err
}
@ -437,7 +475,7 @@ func (a *aclTree) DebugDump() (string, error) {
return a.tree.Graph(ACLDescriptionParser)
}
func (a *aclTree) commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
func commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
var i int
var j int
log.With(zap.Strings("our path", ourPath), zap.Strings("their path", theirPath)).

View File

@ -23,7 +23,7 @@ type Change struct {
SnapshotId string
IsSnapshot bool
DecryptedChange []byte // TODO: check if we need it
DecryptedModel interface{}
ParsedModel interface{}
Content *aclpb.Change
Sign []byte

View File

@ -15,33 +15,25 @@ type MarshalledChange = []byte
type ACLChangeBuilder interface {
UserAdd(identity string, encryptionKey encryptionkey.PubKey, permissions aclpb.ACLChangeUserPermissions) error
AddId(id string) // TODO: this is only for testing
SetMakeSnapshot(bool) // TODO: who should decide this? probably ACLTree so we can delete it
AddId(id string) // TODO: this is only for testing
}
type ChangeBuilder interface {
ACLChangeBuilder
AddChangeContent(marshaler proto.Marshaler) // user code should be responsible for making regular snapshots
}
type changeBuilder struct {
type aclChangeBuilder struct {
aclState *ACLState
tree *Tree
acc *account.AccountData
aclData *aclpb.ACLChangeACLData
changeContent proto.Marshaler
id string
makeSnapshot bool
readKey *symmetric.Key
readKeyHash uint64
aclData *aclpb.ACLChangeACLData
id string
readKey *symmetric.Key
readKeyHash uint64
}
func newChangeBuilder() *changeBuilder {
return &changeBuilder{}
func newACLChangeBuilder() *aclChangeBuilder {
return &aclChangeBuilder{}
}
func (c *changeBuilder) Init(state *ACLState, tree *Tree, acc *account.AccountData) {
func (c *aclChangeBuilder) Init(state *ACLState, tree *Tree, acc *account.AccountData) {
c.aclState = state
c.tree = tree
c.acc = acc
@ -60,15 +52,11 @@ func (c *changeBuilder) Init(state *ACLState, tree *Tree, acc *account.AccountDa
}
}
func (c *changeBuilder) AddId(id string) {
func (c *aclChangeBuilder) AddId(id string) {
c.id = id
}
func (c *changeBuilder) SetMakeSnapshot(b bool) {
c.makeSnapshot = b
}
func (c *changeBuilder) UserAdd(identity string, encryptionKey encryptionkey.PubKey, permissions aclpb.ACLChangeUserPermissions) error {
func (c *aclChangeBuilder) UserAdd(identity string, encryptionKey encryptionkey.PubKey, permissions aclpb.ACLChangeUserPermissions) error {
var allKeys []*symmetric.Key
if c.aclState.currentReadKeyHash != 0 {
for _, key := range c.aclState.userReadKeys {
@ -105,38 +93,23 @@ func (c *changeBuilder) UserAdd(identity string, encryptionKey encryptionkey.Pub
return nil
}
func (c *changeBuilder) BuildAndApply() (*Change, []byte, error) {
aclChange := &aclpb.ACLChange{
func (c *aclChangeBuilder) BuildAndApply() (*Change, []byte, error) {
aclChange := &aclpb.Change{
TreeHeadIds: c.tree.Heads(),
AclHeadIds: c.tree.ACLHeads(),
SnapshotBaseId: c.tree.RootId(),
AclData: c.aclData,
CurrentReadKeyHash: c.readKeyHash,
Timestamp: int64(time.Now().Nanosecond()),
Identity: c.acc.Identity,
}
err := c.aclState.applyChange(aclChange)
marshalledData, err := proto.Marshal(c.aclData)
if err != nil {
return nil, nil, err
}
if c.makeSnapshot {
c.aclData.AclSnapshot = c.aclState.makeSnapshot()
}
var marshalled []byte
if c.changeContent != nil {
marshalled, err = c.changeContent.Marshal()
if err != nil {
return nil, nil, err
}
encrypted, err := c.aclState.userReadKeys[c.aclState.currentReadKeyHash].
Encrypt(marshalled)
if err != nil {
return nil, nil, err
}
aclChange.ChangesData = encrypted
aclChange.ChangesData = marshalledData
err = c.aclState.applyChange(aclChange)
if err != nil {
return nil, nil, err
}
fullMarshalledChange, err := proto.Marshal(aclChange)
@ -152,12 +125,8 @@ func (c *changeBuilder) BuildAndApply() (*Change, []byte, error) {
return nil, nil, err
}
ch := NewChange(id, aclChange)
ch.DecryptedDocumentChange = marshalled
ch.ParsedModel = c.aclData
ch.Sign = signature
return ch, fullMarshalledChange, nil
}
func (c *changeBuilder) AddChangeContent(marshaler proto.Marshaler) {
c.changeContent = marshaler
}

View File

@ -0,0 +1,19 @@
package tree
type ChangeValidator interface {
ValidateChange(change *Change) error
}
type defChangeValidator struct {
aclTree ACLTree
}
func NewDefChangeValidator(aclTree ACLTree) ChangeValidator {
return &defChangeValidator{}
}
func (c *defChangeValidator) ValidateChange(change *Change) error {
// TODO: add validation logic where we check that the change refers to correct acl heads
// that means that more recent changes should refer to more recent acl heads
return nil
}

View File

@ -20,8 +20,8 @@ func (a aclDescriptionParser) ParseChange(changeWrapper *Change) (res []string,
change := changeWrapper.Content
aclData := &aclpb.ACLChangeACLData{}
if changeWrapper.DecryptedModel != nil {
aclData = changeWrapper.DecryptedModel.(*aclpb.ACLChangeACLData)
if changeWrapper.ParsedModel != nil {
aclData = changeWrapper.ParsedModel.(*aclpb.ACLChangeACLData)
} else {
err = proto.Unmarshal(change.ChangesData, aclData)
if err != nil {

398
pkg/acl/tree/doctree.go Normal file
View File

@ -0,0 +1,398 @@
package tree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"sync"
)
type DocTree interface {
RWLocker
ID() string
Header() *treepb.TreeHeader
AddContent(ctx context.Context, content proto.Marshaler) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, validator ChangeValidator, changes ...*aclpb.RawChange) (AddResult, error)
Heads() []string
Root() *Change
Iterate(func(change *Change) bool)
IterateFrom(string, func(change *Change) bool)
HasChange(string) bool
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Storage() treestorage.TreeStorage
DebugDump() (string, error)
Close() error
}
type docTree struct {
treeStorage treestorage.TreeStorage
accountData *account.AccountData
updateListener TreeUpdateListener
id string
header *treepb.TreeHeader
tree *Tree
aclState *ACLState
treeBuilder *treeBuilder
sync.RWMutex
}
func BuildDocTreeWithIdentity(
t treestorage.TreeStorage,
acc *account.AccountData,
listener TreeUpdateListener) (DocTree, error) {
treeBuilder := newTreeBuilder(t, acc.Decoder)
aclStateBuilder := newACLStateBuilderWithIdentity(acc.Decoder, acc)
changeBuilder := newChangeBuilder()
docTree := &docTree{
treeStorage: t,
accountData: acc,
tree: nil,
aclState: nil,
treeBuilder: treeBuilder,
aclStateBuilder: aclStateBuilder,
changeBuilder: changeBuilder,
updateListener: listener,
}
err := docTree.rebuildFromStorage()
if err != nil {
return nil, err
}
err = docTree.removeOrphans()
if err != nil {
return nil, err
}
err = t.SetHeads(docTree.Heads())
if err != nil {
return nil, err
}
docTree.id, err = t.TreeID()
if err != nil {
return nil, err
}
docTree.header, err = t.Header()
if err != nil {
return nil, err
}
listener.Rebuild(docTree)
return docTree, nil
}
func BuildACLTree(t treestorage.TreeStorage) {
// TODO: Add logic for building without identity
}
func (a *docTree) removeOrphans() error {
// removing attached or invalid orphans
var toRemove []string
orphans, err := a.treeStorage.Orphans()
if err != nil {
return err
}
for _, orphan := range orphans {
if _, exists := a.tree.attached[orphan]; exists {
toRemove = append(toRemove, orphan)
}
if _, exists := a.tree.invalidChanges[orphan]; exists {
toRemove = append(toRemove, orphan)
}
}
return a.treeStorage.RemoveOrphans(toRemove...)
}
func (a *docTree) rebuildFromStorage() (err error) {
a.treeBuilder.Init()
a.tree, err = a.treeBuilder.Build(false)
if err != nil {
return err
}
err = a.aclStateBuilder.Init(a.tree)
if err != nil {
return err
}
a.aclState, err = a.aclStateBuilder.Build()
if err != nil {
return err
}
return nil
}
func (a *docTree) ID() string {
return a.id
}
func (a *docTree) Header() *treepb.TreeHeader {
return a.header
}
func (a *docTree) ACLState() *ACLState {
return a.aclState
}
func (a *docTree) Storage() treestorage.TreeStorage {
return a.treeStorage
}
func (a *docTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*aclpb.RawChange, error) {
if a.accountData == nil {
return nil, ErrTreeWithoutIdentity
}
defer func() {
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
a.updateListener.Update(a)
}()
a.changeBuilder.Init(a.aclState, a.tree, a.accountData)
err := build(a.changeBuilder)
if err != nil {
return nil, err
}
ch, marshalled, err := a.changeBuilder.BuildAndApply()
if err != nil {
return nil, err
}
a.tree.AddFast(ch)
rawCh := &aclpb.RawChange{
Payload: marshalled,
Signature: ch.Signature(),
Id: ch.Id,
}
err = a.treeStorage.AddRawChange(rawCh)
if err != nil {
return nil, err
}
err = a.treeStorage.SetHeads([]string{ch.Id})
if err != nil {
return nil, err
}
return rawCh, nil
}
func (a *docTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) {
// TODO: make proper error handling, because there are a lot of corner cases where this will break
var err error
var mode Mode
var changes []*Change // TODO: = addChangesBuf[:0] ...
for _, ch := range rawChanges {
change, err := NewFromRawChange(ch)
// TODO: think what if we will have incorrect signatures on rawChanges, how everything will work
if err != nil {
continue
}
changes = append(changes, change)
}
defer func() {
if err != nil {
return
}
err = a.removeOrphans()
if err != nil {
return
}
err = a.treeStorage.SetHeads(a.tree.Heads())
if err != nil {
return
}
switch mode {
case Append:
a.updateListener.Update(a)
case Rebuild:
a.updateListener.Rebuild(a)
default:
break
}
}()
getAddedChanges := func() []*aclpb.RawChange {
var added []*aclpb.RawChange
for _, ch := range rawChanges {
if _, exists := a.tree.attached[ch.Id]; exists {
added = append(added, ch)
}
}
return added
}
for _, ch := range changes {
err = a.treeStorage.AddChange(ch)
if err != nil {
return AddResult{}, err
}
err = a.treeStorage.AddOrphans(ch.Id)
if err != nil {
return AddResult{}, err
}
}
prevHeads := a.tree.Heads()
mode = a.tree.Add(changes...)
switch mode {
case Nothing:
return AddResult{
OldHeads: prevHeads,
Heads: prevHeads,
Summary: AddResultSummaryNothing,
}, nil
case Rebuild:
err = a.rebuildFromStorage()
if err != nil {
return AddResult{}, err
}
return AddResult{
OldHeads: prevHeads,
Heads: a.tree.Heads(),
Added: getAddedChanges(),
Summary: AddResultSummaryRebuild,
}, nil
default:
// just rebuilding the state from start without reloading everything from tree storage
// as an optimization we could've started from current heads, but I didn't implement that
a.aclState, err = a.aclStateBuilder.Build()
if err != nil {
return AddResult{}, err
}
return AddResult{
OldHeads: prevHeads,
Heads: a.tree.Heads(),
Added: getAddedChanges(),
Summary: AddResultSummaryAppend,
}, nil
}
}
func (a *docTree) Iterate(f func(change *Change) bool) {
a.tree.Iterate(a.tree.RootId(), f)
}
func (a *docTree) IterateFrom(s string, f func(change *Change) bool) {
a.tree.Iterate(s, f)
}
func (a *docTree) HasChange(s string) bool {
_, attachedExists := a.tree.attached[s]
_, unattachedExists := a.tree.unAttached[s]
_, invalidExists := a.tree.invalidChanges[s]
return attachedExists || unattachedExists || invalidExists
}
func (a *docTree) Heads() []string {
return a.tree.Heads()
}
func (a *docTree) Root() *Change {
return a.tree.Root()
}
func (a *docTree) Close() error {
return nil
}
func (a *docTree) SnapshotPath() []string {
// TODO: think about caching this
var path []string
// TODO: think that the user may have not all of the snapshots locally
currentSnapshotId := a.tree.RootId()
for currentSnapshotId != "" {
sn, err := a.treeBuilder.loadChange(currentSnapshotId)
if err != nil {
break
}
path = append(path, currentSnapshotId)
currentSnapshotId = sn.SnapshotId
}
return path
}
func (a *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
// TODO: think about when the clients will have their full acl tree and thus full snapshots
// but no changes after some of the snapshots
var (
isNewDocument = len(theirPath) == 0
ourPath = a.SnapshotPath()
// by default returning everything we have
commonSnapshot = ourPath[len(ourPath)-1] // TODO: root snapshot, probably it is better to have a specific method in treestorage
err error
)
// if this is non-empty request
if !isNewDocument {
commonSnapshot, err = commonSnapshotForTwoPaths(ourPath, theirPath)
if err != nil {
return nil, err
}
}
var rawChanges []*aclpb.RawChange
// using custom load function to skip verification step and save raw changes
load := func(id string) (*Change, error) {
raw, err := a.treeStorage.GetChange(context.Background(), id)
if err != nil {
return nil, err
}
aclChange, err := a.treeBuilder.makeUnverifiedACLChange(raw)
if err != nil {
return nil, err
}
ch := NewChange(id, aclChange)
rawChanges = append(rawChanges, raw)
return ch, nil
}
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
log.With(
zap.Strings("heads", a.tree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", a.id)).
Debug("getting all changes from common snapshot")
_, err = a.treeBuilder.dfs(a.tree.Heads(), commonSnapshot, load)
if err != nil {
return nil, err
}
if isNewDocument {
// adding snapshot to raw changes
_, err = load(commonSnapshot)
if err != nil {
return nil, err
}
}
log.With(
zap.Int("len(changes)", len(rawChanges)),
zap.String("id", a.id)).
Debug("returning all changes after common snapshot")
return rawChanges, nil
}
func (a *docTree) DebugDump() (string, error) {
return a.tree.Graph(ACLDescriptionParser)
}