any-sync/pkg/acl/tree/doctree.go

535 lines
12 KiB
Go

package tree
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/list"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/util/slice"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"sync"
"time"
)
type TreeUpdateListener interface {
Update(tree DocTree)
Rebuild(tree DocTree)
}
type RWLocker interface {
sync.Locker
RLock()
RUnlock()
}
var (
ErrHasInvalidChanges = errors.New("the change is invalid")
ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot")
)
type AddResultSummary int
const (
AddResultSummaryNothing AddResultSummary = iota
AddResultSummaryAppend
AddResultSummaryRebuild
)
type AddResult struct {
OldHeads []string
Heads []string
Added []*aclpb.RawChange
Summary AddResultSummary
}
type DocTree interface {
RWLocker
CommonTree
AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (*aclpb.RawChange, error)
AddRawChanges(ctx context.Context, aclList list.ACLList, changes ...*aclpb.RawChange) (AddResult, error)
}
type docTree struct {
treeStorage storage.TreeStorage
updateListener TreeUpdateListener
id string
header *aclpb.Header
tree *Tree
treeBuilder *treeBuilder
validator DocTreeValidator
kch *keychain
// buffers
difSnapshotBuf []*aclpb.RawChange
tmpChangesBuf []*Change
newSnapshotsBuf []*Change
notSeenIdxBuf []int
snapshotPath []string
sync.RWMutex
}
func BuildDocTree(t storage.TreeStorage, listener TreeUpdateListener, aclList list.ACLList) (DocTree, error) {
treeBuilder := newTreeBuilder(t)
validator := newTreeValidator()
docTree := &docTree{
treeStorage: t,
tree: nil,
treeBuilder: treeBuilder,
validator: validator,
updateListener: listener,
tmpChangesBuf: make([]*Change, 0, 10),
difSnapshotBuf: make([]*aclpb.RawChange, 0, 10),
notSeenIdxBuf: make([]int, 0, 10),
kch: newKeychain(),
}
err := docTree.rebuildFromStorage(aclList, nil)
if err != nil {
return nil, err
}
storageHeads, err := t.Heads()
if err != nil {
return nil, err
}
// comparing rebuilt heads with heads in storage
// in theory it can happen that we didn't set heads because the process has crashed
// therefore we want to set them later
if !slice.UnsortedEquals(storageHeads, docTree.tree.Heads()) {
log.With(zap.Strings("storage", storageHeads), zap.Strings("rebuilt", docTree.tree.Heads())).
Errorf("the heads in storage and tree are different")
err = t.SetHeads(docTree.tree.Heads())
if err != nil {
return nil, err
}
}
docTree.id, err = t.ID()
if err != nil {
return nil, err
}
docTree.header, err = t.Header()
if err != nil {
return nil, err
}
if listener != nil {
listener.Rebuild(docTree)
}
return docTree, nil
}
func (d *docTree) rebuildFromStorage(aclList list.ACLList, newChanges []*Change) (err error) {
d.treeBuilder.Init(d.kch)
d.tree, err = d.treeBuilder.Build(newChanges)
if err != nil {
return
}
// during building the tree we may have marked some changes as possible roots,
// but obviously they are not roots, because of the way how we construct the tree
d.tree.clearPossibleRoots()
return d.validator.ValidateTree(d.tree, aclList)
}
func (d *docTree) ID() string {
return d.id
}
func (d *docTree) Header() *aclpb.Header {
return d.header
}
func (d *docTree) Storage() storage.TreeStorage {
return d.treeStorage
}
func (d *docTree) AddContent(ctx context.Context, aclList list.ACLList, content SignableChangeContent) (rawChange *aclpb.RawChange, err error) {
defer func() {
if d.updateListener != nil {
d.updateListener.Update(d)
}
}()
state := aclList.ACLState() // special method for own keys
aclChange := &aclpb.Change{
TreeHeadIds: d.tree.Heads(),
AclHeadId: aclList.Head().Id,
SnapshotBaseId: d.tree.RootId(),
CurrentReadKeyHash: state.CurrentReadKeyHash(),
Timestamp: int64(time.Now().Nanosecond()),
Identity: content.Identity,
IsSnapshot: content.IsSnapshot,
}
marshalledData, err := content.Proto.Marshal()
if err != nil {
return nil, err
}
readKey, err := state.CurrentReadKey()
if err != nil {
return nil, err
}
encrypted, err := readKey.Encrypt(marshalledData)
if err != nil {
return nil, err
}
aclChange.ChangesData = encrypted
fullMarshalledChange, err := proto.Marshal(aclChange)
if err != nil {
return nil, err
}
signature, err := content.Key.Sign(fullMarshalledChange)
if err != nil {
return nil, err
}
id, err := cid.NewCIDFromBytes(fullMarshalledChange)
if err != nil {
return nil, err
}
docChange := NewChange(id, aclChange, signature)
docChange.ParsedModel = content
if content.IsSnapshot {
// clearing tree, because we already fixed everything in the last snapshot
d.tree = &Tree{}
}
err = d.tree.AddMergedHead(docChange)
if err != nil {
panic(err)
}
rawChange = &aclpb.RawChange{
Payload: fullMarshalledChange,
Signature: docChange.Signature(),
Id: docChange.Id,
}
err = d.treeStorage.AddRawChange(rawChange)
if err != nil {
return
}
err = d.treeStorage.SetHeads([]string{docChange.Id})
return
}
func (d *docTree) AddRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (addResult AddResult, err error) {
var mode Mode
mode, addResult, err = d.addRawChanges(ctx, aclList, rawChanges...)
if err != nil {
return
}
// reducing tree if we have new roots
d.tree.reduceTree()
// adding to database all the added changes only after they are good
for _, ch := range addResult.Added {
err = d.treeStorage.AddRawChange(ch)
if err != nil {
return
}
}
// setting heads
err = d.treeStorage.SetHeads(d.tree.Heads())
if err != nil {
return
}
if d.updateListener == nil {
return
}
switch mode {
case Append:
d.updateListener.Update(d)
case Rebuild:
d.updateListener.Rebuild(d)
default:
break
}
return
}
func (d *docTree) addRawChanges(ctx context.Context, aclList list.ACLList, rawChanges ...*aclpb.RawChange) (mode Mode, addResult AddResult, err error) {
// resetting buffers
d.tmpChangesBuf = d.tmpChangesBuf[:0]
d.notSeenIdxBuf = d.notSeenIdxBuf[:0]
d.difSnapshotBuf = d.difSnapshotBuf[:0]
d.newSnapshotsBuf = d.newSnapshotsBuf[:0]
// this will be returned to client so we shouldn't use buffer here
prevHeadsCopy := make([]string, 0, len(d.tree.Heads()))
copy(prevHeadsCopy, d.tree.Heads())
// filtering changes, verifying and unmarshalling them
for idx, ch := range rawChanges {
if d.HasChange(ch.Id) {
continue
}
var change *Change
change, err = NewVerifiedChangeFromRaw(ch, d.kch)
if err != nil {
return
}
if change.IsSnapshot {
d.newSnapshotsBuf = append(d.newSnapshotsBuf, change)
}
d.tmpChangesBuf = append(d.tmpChangesBuf, change)
d.notSeenIdxBuf = append(d.notSeenIdxBuf, idx)
}
// if no new changes, then returning
if len(d.notSeenIdxBuf) == 0 {
addResult = AddResult{
OldHeads: prevHeadsCopy,
Heads: prevHeadsCopy,
Summary: AddResultSummaryNothing,
}
return
}
headsCopy := func() []string {
newHeads := make([]string, 0, len(d.tree.Heads()))
copy(newHeads, d.tree.Heads())
return newHeads
}
// returns changes that we added to the tree
getAddedChanges := func() []*aclpb.RawChange {
var added []*aclpb.RawChange
for _, idx := range d.notSeenIdxBuf {
rawChange := rawChanges[idx]
if _, exists := d.tree.attached[rawChange.Id]; exists {
added = append(added, rawChange)
}
}
return added
}
rollback := func() {
for _, ch := range d.tmpChangesBuf {
if _, exists := d.tree.attached[ch.Id]; exists {
delete(d.tree.attached, ch.Id)
} else if _, exists := d.tree.unAttached[ch.Id]; exists {
delete(d.tree.unAttached, ch.Id)
}
}
}
// checks if we need to go to database
isOldSnapshot := func(ch *Change) bool {
if ch.SnapshotId == d.tree.RootId() {
return false
}
for _, sn := range d.newSnapshotsBuf {
// if change refers to newly received snapshot
if ch.SnapshotId == sn.Id {
return false
}
}
return true
}
// checking if we have some changes with different snapshot and then rebuilding
for _, ch := range d.tmpChangesBuf {
if isOldSnapshot(ch) {
err = d.rebuildFromStorage(aclList, d.tmpChangesBuf)
if err != nil {
// rebuilding without new changes
d.rebuildFromStorage(aclList, nil)
return
}
addResult = AddResult{
OldHeads: prevHeadsCopy,
Heads: headsCopy(),
Added: getAddedChanges(),
Summary: AddResultSummaryRebuild,
}
return
}
}
// normal mode of operation, where we don't need to rebuild from database
mode = d.tree.Add(d.tmpChangesBuf...)
switch mode {
case Nothing:
addResult = AddResult{
OldHeads: prevHeadsCopy,
Heads: prevHeadsCopy,
Summary: AddResultSummaryNothing,
}
return
default:
// just rebuilding the state from start without reloading everything from tree storage
// as an optimization we could've started from current heads, but I didn't implement that
err = d.validator.ValidateTree(d.tree, aclList)
if err != nil {
rollback()
err = ErrHasInvalidChanges
return
}
addResult = AddResult{
OldHeads: prevHeadsCopy,
Heads: headsCopy(),
Added: getAddedChanges(),
Summary: AddResultSummaryAppend,
}
}
return
}
func (d *docTree) Iterate(f func(change *Change) bool) {
d.tree.Iterate(d.tree.RootId(), f)
}
func (d *docTree) IterateFrom(s string, f func(change *Change) bool) {
d.tree.Iterate(s, f)
}
func (d *docTree) HasChange(s string) bool {
_, attachedExists := d.tree.attached[s]
_, unattachedExists := d.tree.unAttached[s]
return attachedExists || unattachedExists
}
func (d *docTree) Heads() []string {
return d.tree.Heads()
}
func (d *docTree) Root() *Change {
return d.tree.Root()
}
func (d *docTree) Close() error {
return nil
}
func (d *docTree) SnapshotPath() []string {
if d.snapshotPathIsActual() {
return d.snapshotPath
}
var path []string
// TODO: think that the user may have not all of the snapshots locally
currentSnapshotId := d.tree.RootId()
for currentSnapshotId != "" {
sn, err := d.treeBuilder.loadChange(currentSnapshotId)
if err != nil {
break
}
path = append(path, currentSnapshotId)
currentSnapshotId = sn.SnapshotId
}
d.snapshotPath = path
return path
}
func (d *docTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
var (
needFullDocument = len(theirPath) == 0
ourPath = d.SnapshotPath()
// by default returning everything we have
commonSnapshot = ourPath[len(ourPath)-1]
err error
)
// if this is non-empty request
if !needFullDocument {
commonSnapshot, err = commonSnapshotForTwoPaths(ourPath, theirPath)
if err != nil {
return nil, err
}
}
log.With(
zap.Strings("heads", d.tree.Heads()),
zap.String("breakpoint", commonSnapshot),
zap.String("id", d.id)).
Debug("getting all changes from common snapshot")
if commonSnapshot == d.tree.RootId() {
return d.getChangesFromTree()
} else {
return d.getChangesFromDB(commonSnapshot, needFullDocument)
}
}
func (d *docTree) getChangesFromTree() (rawChanges []*aclpb.RawChange, err error) {
d.tree.dfsPrev(d.tree.HeadsChanges(), func(ch *Change) bool {
var marshalled []byte
marshalled, err = ch.Content.Marshal()
if err != nil {
return false
}
raw := &aclpb.RawChange{
Payload: marshalled,
Signature: ch.Signature(),
Id: ch.Id,
}
rawChanges = append(rawChanges, raw)
return true
}, func(changes []*Change) {})
return
}
func (d *docTree) getChangesFromDB(commonSnapshot string, needStartSnapshot bool) (rawChanges []*aclpb.RawChange, err error) {
load := func(id string) (*Change, error) {
raw, err := d.treeStorage.GetRawChange(context.Background(), id)
if err != nil {
return nil, err
}
ch, err := NewChangeFromRaw(raw)
if err != nil {
return nil, err
}
rawChanges = append(rawChanges, raw)
return ch, nil
}
_, err = d.treeBuilder.dfs(d.tree.Heads(), commonSnapshot, load)
if err != nil {
return
}
if needStartSnapshot {
// adding snapshot to raw changes
_, err = load(commonSnapshot)
}
return
}
func (d *docTree) snapshotPathIsActual() bool {
return len(d.snapshotPath) != 0 && d.snapshotPath[len(d.snapshotPath)-1] == d.tree.RootId()
}
func (d *docTree) DebugDump() (string, error) {
return d.tree.Graph(NoOpDescriptionParser)
}