Add getting common snapshot logic

This commit is contained in:
mcrakhman 2022-07-16 12:24:49 +02:00 committed by Mikhail Iudin
parent 24c59e40ee
commit 46cee8c9ed
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
11 changed files with 111 additions and 45 deletions

View File

@ -2,6 +2,7 @@ package acltree
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage"
@ -35,7 +36,16 @@ func (n NoOpListener) Update(tree ACLTree) {}
func (n NoOpListener) Rebuild(tree ACLTree) {}
type RWLocker interface {
sync.Locker
RLock()
RUnlock()
}
var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot")
type ACLTree interface {
RWLocker
ACLState() *ACLState
AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error)
AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error)
@ -45,6 +55,7 @@ type ACLTree interface {
IterateFrom(string, func(change *Change) bool)
HasChange(string) bool
SnapshotPath() []string
ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error)
Close() error
}
@ -204,17 +215,12 @@ func (a *aclTree) rebuildFromStorage(fromStart bool) error {
}
func (a *aclTree) ACLState() *ACLState {
// TODO: probably locks should be happening outside because we are using object cache
a.RLock()
defer a.RUnlock()
return a.aclState
}
func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*Change, error) {
// TODO: add snapshot creation logic
a.Lock()
defer func() {
a.Unlock()
// TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree)
a.updateListener.Update(a)
}()
@ -248,7 +254,6 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild
}
func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) {
a.Lock()
// TODO: make proper error handling, because there are a lot of corner cases where this will break
var err error
var mode Mode
@ -278,7 +283,6 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
return
}
a.Unlock()
switch mode {
case Append:
a.updateListener.Update(a)
@ -350,20 +354,14 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha
}
func (a *aclTree) Iterate(f func(change *Change) bool) {
a.RLock()
defer a.RUnlock()
a.fullTree.Iterate(a.fullTree.RootId(), f)
}
func (a *aclTree) IterateFrom(s string, f func(change *Change) bool) {
a.RLock()
defer a.RUnlock()
a.fullTree.Iterate(s, f)
}
func (a *aclTree) HasChange(s string) bool {
a.RLock()
defer a.RUnlock()
_, attachedExists := a.fullTree.attached[s]
_, unattachedExists := a.fullTree.unAttached[s]
_, invalidExists := a.fullTree.invalidChanges[s]
@ -371,14 +369,10 @@ func (a *aclTree) HasChange(s string) bool {
}
func (a *aclTree) Heads() []string {
a.RLock()
defer a.RUnlock()
return a.fullTree.Heads()
}
func (a *aclTree) Root() *Change {
a.RLock()
defer a.RUnlock()
return a.fullTree.Root()
}
@ -387,8 +381,7 @@ func (a *aclTree) Close() error {
}
func (a *aclTree) SnapshotPath() []string {
a.RLock()
defer a.RUnlock()
// TODO: think about caching this
var path []string
// TODO: think that the user may have not all of the snapshots locally
@ -403,3 +396,63 @@ func (a *aclTree) SnapshotPath() []string {
}
return path
}
func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) {
// TODO: think about when the clients will have their full acl tree and thus full snapshots
// but no changes after some of the snapshots
commonSnapshot, err := a.commonSnapshotForTwoPaths(a.SnapshotPath(), theirPath)
if err != nil {
return nil, err
}
// we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes
changes, err := a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, func(id string) (*Change, error) {
// using custom load function to skip verification step and save raw changes
raw, err := a.treeStorage.GetChange(context.Background(), id)
if err != nil {
return nil, err
}
aclChange, err := a.treeBuilder.makeUnverifiedACLChange(raw)
if err != nil {
return nil, err
}
ch := NewChange(id, aclChange)
ch.Raw = raw
return ch, nil
})
if err != nil {
return nil, err
}
var rawChanges []*aclpb.RawChange
for _, ch := range changes {
rawChanges = append(rawChanges, ch.Raw)
}
return rawChanges, nil
}
func (a *aclTree) commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) {
var i int
var j int
OuterLoop:
// find starting point from the right
for i = len(ourPath) - 1; i >= 0; i-- {
for j = len(theirPath) - 1; j >= 0; j-- {
// most likely there would be only one comparison, because mostly the snapshot path will start from the root for nodes
if ourPath[i] == theirPath[j] {
break OuterLoop
}
}
}
if i < 0 || j < 0 {
return "", ErrNoCommonSnapshot
}
// find last common element of the sequence moving from right to left
for i >= 0 && j >= 0 {
if ourPath[i] == theirPath[j] {
i--
j--
}
}
return ourPath[i+1], nil
}

View File

@ -23,6 +23,7 @@ type Change struct {
SnapshotId string
IsSnapshot bool
DecryptedDocumentChange []byte
Raw *aclpb.RawChange // this will not be present on all changes, we only need it sometimes
Content *aclpb.ACLChange
Sign []byte

View File

@ -91,3 +91,9 @@ func (c *changeLoader) makeVerifiedACLChange(change *aclpb.RawChange) (aclChange
}
return
}
func (c *changeLoader) makeUnverifiedACLChange(change *aclpb.RawChange) (aclChange *aclpb.ACLChange, err error) {
aclChange = new(aclpb.ACLChange)
err = proto.Unmarshal(change.Payload, aclChange)
return
}

View File

@ -135,13 +135,16 @@ func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error)
return
}
tb.tree.AddFast(ch)
changes, err := tb.dfs(heads, breakpoint)
changes, err := tb.dfs(heads, breakpoint, tb.loadChange)
tb.tree.AddFast(changes...)
return
}
func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, err error) {
func (tb *treeBuilder) dfs(
heads []string,
breakpoint string,
load func(string) (*Change, error)) (buf []*Change, err error) {
stack := make([]string, len(heads), len(heads)*2)
copy(stack, heads)
@ -154,7 +157,7 @@ func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, er
continue
}
ch, err := tb.loadChange(id)
ch, err := load(id)
if err != nil {
continue
}

View File

@ -17,6 +17,7 @@ type PlainTextDocument interface {
AddText(ctx context.Context, text string) error
}
// TODO: this struct is not thread-safe, so use it wisely :-)
type plainTextDocument struct {
heads []string
aclTree acltree.ACLTree

View File

@ -143,7 +143,7 @@ func (i *inMemoryTreeStorageProvider) TreeStorage(treeId string) (TreeStorage, e
if tree, exists := i.trees[treeId]; exists {
return tree, nil
}
return nil, UnknownTreeId
return nil, ErrUnknownTreeId
}
func (i *inMemoryTreeStorageProvider) InsertTree(tree TreeStorage) error {

View File

@ -2,7 +2,7 @@ package treestorage
import "errors"
var UnknownTreeId = errors.New("tree does not exist")
var ErrUnknownTreeId = errors.New("tree does not exist")
type Provider interface {
TreeStorage(treeId string) (TreeStorage, error)

View File

@ -1,13 +0,0 @@
package sync
type PubSubPayload struct {
}
type PubSub interface {
Send(msg *PubSubPayload) error
Listen(chan *PubSubPayload) error
}
func NewPubSub(topic string) PubSub {
return nil
}

View File

@ -30,7 +30,7 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u
shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads())
snapshotPath = tree.SnapshotPath()
if shouldFullSync {
fullRequest, err = r.prepareFullSyncRequest(tree)
fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.SnapshotPath, tree)
if err != nil {
return err
}
@ -38,7 +38,7 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u
return nil
})
// if there are no such tree
if err == treestorage.UnknownTreeId {
if err == treestorage.ErrUnknownTreeId {
fullRequest = &syncpb.SyncFullRequest{
TreeId: update.TreeId,
}
@ -62,12 +62,25 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u
return
}
func (r *requestHander) HandleFullSync(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error {
func (r *requestHander) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error {
// TODO: add case of new tree
return nil
}
func (r *requestHander) prepareFullSyncRequest(tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
return nil, nil
func (r *requestHander) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error {
// TODO: add case of new tree
return nil
}
func (r *requestHander) prepareFullSyncRequest(treeId string, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) {
ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath)
if err != nil {
return nil, err
}
return &syncpb.SyncFullRequest{
Heads: tree.Heads(),
Changes: ourChanges,
TreeId: treeId,
SnapshotPath: tree.SnapshotPath(),
}, nil
}

View File

@ -6,7 +6,6 @@ import (
)
type service struct {
pubSub PubSub
}
const CName = "SyncService"

View File

@ -31,6 +31,9 @@ func (s *service) Do(ctx context.Context, treeId string, f func(tree acltree.ACL
if err != nil {
return err
}
aclTree := tree.(acltree.ACLTree)
aclTree.Lock()
defer aclTree.Unlock()
return f(tree.(acltree.ACLTree))
}