Change storages and add iterate

This commit is contained in:
mcrakhman 2022-09-06 20:23:57 +02:00 committed by Mikhail Iudin
parent b0f2f875aa
commit b294d13254
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
7 changed files with 118 additions and 58 deletions

View File

@ -70,11 +70,10 @@ type inMemoryTreeStorage struct {
sync.RWMutex sync.RWMutex
} }
type CreatorFunc = func(string, *aclpb.Header, []*aclpb.RawChange) (TreeStorage, error)
func NewInMemoryTreeStorage( func NewInMemoryTreeStorage(
treeId string, treeId string,
header *aclpb.Header, header *aclpb.Header,
heads []string,
changes []*aclpb.RawChange) (TreeStorage, error) { changes []*aclpb.RawChange) (TreeStorage, error) {
allChanges := make(map[string]*aclpb.RawChange) allChanges := make(map[string]*aclpb.RawChange)
for _, ch := range changes { for _, ch := range changes {
@ -84,7 +83,7 @@ func NewInMemoryTreeStorage(
return &inMemoryTreeStorage{ return &inMemoryTreeStorage{
id: treeId, id: treeId,
header: header, header: header,
heads: nil, heads: heads,
changes: allChanges, changes: allChanges,
RWMutex: sync.RWMutex{}, RWMutex: sync.RWMutex{},
}, nil }, nil
@ -161,27 +160,27 @@ func (i *inMemoryStorageProvider) Storage(id string) (Storage, error) {
return nil, ErrUnknownTreeId return nil, ErrUnknownTreeId
} }
func (i *inMemoryStorageProvider) CreateTreeStorage(treeId string, header *aclpb.Header, changes []*aclpb.RawChange) (TreeStorage, error) { func (i *inMemoryStorageProvider) CreateTreeStorage(payload TreeStorageCreatePayload) (TreeStorage, error) {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
res, err := NewInMemoryTreeStorage(treeId, header, changes) res, err := NewInMemoryTreeStorage(payload.TreeId, payload.Header, payload.Heads, payload.Changes)
if err != nil { if err != nil {
return nil, err return nil, err
} }
i.objects[treeId] = res i.objects[payload.TreeId] = res
return res, nil return res, nil
} }
func (i *inMemoryStorageProvider) CreateACLListStorage(id string, header *aclpb.Header, records []*aclpb.RawRecord) (ListStorage, error) { func (i *inMemoryStorageProvider) CreateACLListStorage(payload ACLListStorageCreatePayload) (ListStorage, error) {
i.Lock() i.Lock()
defer i.Unlock() defer i.Unlock()
res, err := NewInMemoryACLListStorage(id, header, records) res, err := NewInMemoryACLListStorage(payload.ListId, payload.Header, payload.Records)
if err != nil { if err != nil {
return nil, err return nil, err
} }
i.objects[id] = res i.objects[payload.ListId] = res
return res, nil return res, nil
} }

View File

@ -7,9 +7,22 @@ import (
var ErrUnknownTreeId = errors.New("tree does not exist") var ErrUnknownTreeId = errors.New("tree does not exist")
type TreeStorageCreatePayload struct {
TreeId string
Header *aclpb.Header
Changes []*aclpb.RawChange
Heads []string
}
type ACLListStorageCreatePayload struct {
ListId string
Header *aclpb.Header
Records []*aclpb.RawRecord
}
type Provider interface { type Provider interface {
Storage(id string) (Storage, error) Storage(id string) (Storage, error)
AddStorage(id string, st Storage) error AddStorage(id string, st Storage) error
CreateTreeStorage(treeId string, header *aclpb.Header, changes []*aclpb.RawChange) (TreeStorage, error) CreateTreeStorage(payload TreeStorageCreatePayload) (TreeStorage, error)
CreateACLListStorage(id string, header *aclpb.Header, records []*aclpb.RawRecord) (ListStorage, error) CreateACLListStorage(payload ACLListStorageCreatePayload) (ListStorage, error)
} }

View File

@ -13,3 +13,5 @@ type TreeStorage interface {
AddRawChange(change *aclpb.RawChange) error AddRawChange(change *aclpb.RawChange) error
GetRawChange(ctx context.Context, recordID string) (*aclpb.RawChange, error) GetRawChange(ctx context.Context, recordID string) (*aclpb.RawChange, error)
} }
type TreeStorageCreatorFunc = func(payload TreeStorageCreatePayload) (TreeStorage, error)

View File

@ -7,6 +7,7 @@ import (
) )
type ObjectTreeValidator interface { type ObjectTreeValidator interface {
// ValidateTree should always be entered while holding a read lock on ACLList
ValidateTree(tree *Tree, aclList list.ACLList) error ValidateTree(tree *Tree, aclList list.ACLList) error
} }
@ -17,8 +18,7 @@ func newTreeValidator() ObjectTreeValidator {
} }
func (v *objectTreeValidator) ValidateTree(tree *Tree, aclList list.ACLList) (err error) { func (v *objectTreeValidator) ValidateTree(tree *Tree, aclList list.ACLList) (err error) {
aclList.RLock()
defer aclList.RUnlock()
var ( var (
perm list.UserPermissionPair perm list.UserPermissionPair
state = aclList.ACLState() state = aclList.ACLState()

View File

@ -6,6 +6,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/keys/symmetric"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice" "github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"go.uber.org/zap" "go.uber.org/zap"
"sync" "sync"
@ -43,6 +44,9 @@ type AddResult struct {
Summary AddResultSummary Summary AddResultSummary
} }
type ChangeIterateFunc = func(change *Change) bool
type ChangeConvertFunc = func(decrypted []byte) (any, error)
type ObjectTree interface { type ObjectTree interface {
RWLocker RWLocker
@ -52,8 +56,8 @@ type ObjectTree interface {
Root() *Change Root() *Change
HasChange(string) bool HasChange(string) bool
Iterate(func(change *Change) bool) Iterate(convert ChangeConvertFunc, iterate ChangeIterateFunc) error
IterateFrom(string, func(change *Change) bool) IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) error
SnapshotPath() []string SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
@ -79,6 +83,8 @@ type objectTree struct {
header *aclpb.Header header *aclpb.Header
tree *Tree tree *Tree
keys map[uint64]*symmetric.Key
// buffers // buffers
difSnapshotBuf []*aclpb.RawChange difSnapshotBuf []*aclpb.RawChange
tmpChangesBuf []*Change tmpChangesBuf []*Change
@ -119,16 +125,18 @@ func defaultObjectTreeDeps(
func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
objTree := &objectTree{ objTree := &objectTree{
treeStorage: deps.treeStorage, treeStorage: deps.treeStorage,
updateListener: deps.updateListener, updateListener: deps.updateListener,
treeBuilder: deps.treeBuilder, treeBuilder: deps.treeBuilder,
validator: deps.validator, validator: deps.validator,
aclList: deps.aclList, aclList: deps.aclList,
changeBuilder: deps.changeBuilder, changeBuilder: deps.changeBuilder,
tree: nil, tree: nil,
tmpChangesBuf: make([]*Change, 0, 10), keys: make(map[uint64]*symmetric.Key),
difSnapshotBuf: make([]*aclpb.RawChange, 0, 10), tmpChangesBuf: make([]*Change, 0, 10),
notSeenIdxBuf: make([]int, 0, 10), difSnapshotBuf: make([]*aclpb.RawChange, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
newSnapshotsBuf: make([]*Change, 0, 10),
} }
err := objTree.rebuildFromStorage(nil) err := objTree.rebuildFromStorage(nil)
@ -185,7 +193,7 @@ func (ot *objectTree) rebuildFromStorage(newChanges []*Change) (err error) {
// but obviously they are not roots, because of the way how we construct the tree // but obviously they are not roots, because of the way how we construct the tree
ot.tree.clearPossibleRoots() ot.tree.clearPossibleRoots()
return ot.validator.ValidateTree(ot.tree, ot.aclList) return ot.validateTree()
} }
func (ot *objectTree) ID() string { func (ot *objectTree) ID() string {
@ -400,7 +408,7 @@ func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*aclpb.Ra
default: default:
// just rebuilding the state from start without reloading everything from tree storage // just rebuilding the state from start without reloading everything from tree storage
// as an optimization we could've started from current heads, but I didn't implement that // as an optimization we could've started from current heads, but I didn't implement that
err = ot.validator.ValidateTree(ot.tree, ot.aclList) err = ot.validateTree()
if err != nil { if err != nil {
rollback() rollback()
err = ErrHasInvalidChanges err = ErrHasInvalidChanges
@ -417,12 +425,42 @@ func (ot *objectTree) addRawChanges(ctx context.Context, rawChanges ...*aclpb.Ra
return return
} }
func (ot *objectTree) Iterate(f func(change *Change) bool) { func (ot *objectTree) Iterate(convert ChangeConvertFunc, iterate ChangeIterateFunc) (err error) {
ot.tree.Iterate(ot.tree.RootId(), f) return ot.IterateFrom(ot.tree.RootId(), convert, iterate)
} }
func (ot *objectTree) IterateFrom(s string, f func(change *Change) bool) { func (ot *objectTree) IterateFrom(id string, convert ChangeConvertFunc, iterate ChangeIterateFunc) (err error) {
ot.tree.Iterate(s, f) if convert == nil {
ot.tree.Iterate(id, iterate)
return
}
ot.tree.Iterate(ot.tree.RootId(), func(c *Change) (isContinue bool) {
var model any
if c.ParsedModel != nil {
return iterate(c)
}
readKey, exists := ot.keys[c.Content.CurrentReadKeyHash]
if !exists {
err = list.ErrNoReadKey
return false
}
var decrypted []byte
decrypted, err = readKey.Decrypt(c.Content.GetChangesData())
if err != nil {
return false
}
model, err = convert(decrypted)
if err != nil {
return false
}
c.ParsedModel = model
return iterate(c)
})
return
} }
func (ot *objectTree) HasChange(s string) bool { func (ot *objectTree) HasChange(s string) bool {
@ -546,6 +584,21 @@ func (ot *objectTree) snapshotPathIsActual() bool {
return len(ot.snapshotPath) != 0 && ot.snapshotPath[len(ot.snapshotPath)-1] == ot.tree.RootId() return len(ot.snapshotPath) != 0 && ot.snapshotPath[len(ot.snapshotPath)-1] == ot.tree.RootId()
} }
func (ot *objectTree) validateTree() error {
ot.aclList.RLock()
defer ot.aclList.RUnlock()
state := ot.aclList.ACLState()
// just not to take lock many times, updating the key map from aclList
if len(ot.keys) != len(state.UserReadKeys()) {
for key, value := range state.UserReadKeys() {
ot.keys[key] = value
}
}
return ot.validator.ValidateTree(ot.tree, ot.aclList)
}
func (ot *objectTree) DebugDump() (string, error) { func (ot *objectTree) DebugDump() (string, error) {
return ot.tree.Graph(NoOpDescriptionParser) return ot.tree.Graph(NoOpDescriptionParser)
} }

View File

@ -37,7 +37,7 @@ func (c *mockChangeCreator) createNewTreeStorage(treeId, aclListId, aclHeadId, f
WorkspaceId: "", WorkspaceId: "",
DocType: aclpb.Header_DocTree, DocType: aclpb.Header_DocTree,
} }
treeStorage, _ := storage.NewInMemoryTreeStorage(treeId, header, []*aclpb.RawChange{firstChange}) treeStorage, _ := storage.NewInMemoryTreeStorage(treeId, header, []string{firstChangeId}, []*aclpb.RawChange{firstChange})
return treeStorage return treeStorage
} }

View File

@ -14,7 +14,7 @@ func CreateNewTreeStorage(
acc *account.AccountData, acc *account.AccountData,
aclList list.ACLList, aclList list.ACLList,
content proto.Marshaler, content proto.Marshaler,
create storage.CreatorFunc) (storage.TreeStorage, error) { create storage.TreeStorageCreatorFunc) (thr storage.TreeStorage, err error) {
state := aclList.ACLState() state := aclList.ACLState()
change := &aclpb.Change{ change := &aclpb.Change{
@ -27,34 +27,34 @@ func CreateNewTreeStorage(
marshalledData, err := content.Marshal() marshalledData, err := content.Marshal()
if err != nil { if err != nil {
return nil, err return
} }
readKey, err := state.CurrentReadKey() readKey, err := state.CurrentReadKey()
if err != nil { if err != nil {
return nil, err return
} }
encrypted, err := readKey.Encrypt(marshalledData) encrypted, err := readKey.Encrypt(marshalledData)
if err != nil { if err != nil {
return nil, err return
} }
change.ChangesData = encrypted change.ChangesData = encrypted
fullMarshalledChange, err := proto.Marshal(change) fullMarshalledChange, err := proto.Marshal(change)
if err != nil { if err != nil {
return nil, err return
} }
signature, err := acc.SignKey.Sign(fullMarshalledChange) signature, err := acc.SignKey.Sign(fullMarshalledChange)
if err != nil { if err != nil {
return nil, err return
} }
changeId, err := cid.NewCIDFromBytes(fullMarshalledChange) changeId, err := cid.NewCIDFromBytes(fullMarshalledChange)
if err != nil { if err != nil {
return nil, err return
} }
rawChange := &aclpb.RawChange{ rawChange := &aclpb.RawChange{
@ -64,35 +64,28 @@ func CreateNewTreeStorage(
} }
header, treeId, err := createTreeHeaderAndId(rawChange, aclpb.Header_DocTree, aclList.ID()) header, treeId, err := createTreeHeaderAndId(rawChange, aclpb.Header_DocTree, aclList.ID())
if err != nil { if err != nil {
return nil, err return
} }
thr, err := create(treeId, header, []*aclpb.RawChange{rawChange}) return create(storage.TreeStorageCreatePayload{
if err != nil { TreeId: treeId,
return nil, err Header: header,
} Changes: []*aclpb.RawChange{rawChange},
Heads: []string{rawChange.Id},
err = thr.SetHeads([]string{changeId}) })
if err != nil {
return nil, err
}
return thr, nil
} }
func createTreeHeaderAndId(change *aclpb.RawChange, treeType aclpb.HeaderDocType, aclListId string) (*aclpb.Header, string, error) { func createTreeHeaderAndId(change *aclpb.RawChange, treeType aclpb.HeaderDocType, aclListId string) (header *aclpb.Header, treeId string, err error) {
header := &aclpb.Header{ header = &aclpb.Header{
FirstId: change.Id, FirstId: change.Id,
DocType: treeType, DocType: treeType,
AclListId: aclListId, AclListId: aclListId,
} }
marshalledHeader, err := proto.Marshal(header) marshalledHeader, err := proto.Marshal(header)
if err != nil { if err != nil {
return nil, "", err return
}
treeId, err := cid.NewCIDFromBytes(marshalledHeader)
if err != nil {
return nil, "", err
} }
return header, treeId, nil treeId, err = cid.NewCIDFromBytes(marshalledHeader)
return
} }