diff --git a/pkg/acl/acltree/acltree.go b/pkg/acl/acltree/acltree.go index 49e530db..5cd85957 100644 --- a/pkg/acl/acltree/acltree.go +++ b/pkg/acl/acltree/acltree.go @@ -2,6 +2,7 @@ package acltree import ( "context" + "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" @@ -35,7 +36,16 @@ func (n NoOpListener) Update(tree ACLTree) {} func (n NoOpListener) Rebuild(tree ACLTree) {} +type RWLocker interface { + sync.Locker + RLock() + RUnlock() +} + +var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot") + type ACLTree interface { + RWLocker ACLState() *ACLState AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error) AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error) @@ -45,6 +55,7 @@ type ACLTree interface { IterateFrom(string, func(change *Change) bool) HasChange(string) bool SnapshotPath() []string + ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) Close() error } @@ -204,17 +215,12 @@ func (a *aclTree) rebuildFromStorage(fromStart bool) error { } func (a *aclTree) ACLState() *ACLState { - // TODO: probably locks should be happening outside because we are using object cache - a.RLock() - defer a.RUnlock() return a.aclState } func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuilder) error) (*Change, error) { // TODO: add snapshot creation logic - a.Lock() defer func() { - a.Unlock() // TODO: should this be called in a separate goroutine to prevent accidental cycles (tree->updater->tree) a.updateListener.Update(a) }() @@ -248,7 +254,6 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild } func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) { - a.Lock() // TODO: make proper error handling, because there are a lot of corner cases where this will break var err error var mode Mode @@ -278,7 +283,6 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha return } - a.Unlock() switch mode { case Append: a.updateListener.Update(a) @@ -350,20 +354,14 @@ func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawCha } func (a *aclTree) Iterate(f func(change *Change) bool) { - a.RLock() - defer a.RUnlock() a.fullTree.Iterate(a.fullTree.RootId(), f) } func (a *aclTree) IterateFrom(s string, f func(change *Change) bool) { - a.RLock() - defer a.RUnlock() a.fullTree.Iterate(s, f) } func (a *aclTree) HasChange(s string) bool { - a.RLock() - defer a.RUnlock() _, attachedExists := a.fullTree.attached[s] _, unattachedExists := a.fullTree.unAttached[s] _, invalidExists := a.fullTree.invalidChanges[s] @@ -371,14 +369,10 @@ func (a *aclTree) HasChange(s string) bool { } func (a *aclTree) Heads() []string { - a.RLock() - defer a.RUnlock() return a.fullTree.Heads() } func (a *aclTree) Root() *Change { - a.RLock() - defer a.RUnlock() return a.fullTree.Root() } @@ -387,8 +381,7 @@ func (a *aclTree) Close() error { } func (a *aclTree) SnapshotPath() []string { - a.RLock() - defer a.RUnlock() + // TODO: think about caching this var path []string // TODO: think that the user may have not all of the snapshots locally @@ -403,3 +396,63 @@ func (a *aclTree) SnapshotPath() []string { } return path } + +func (a *aclTree) ChangesAfterCommonSnapshot(theirPath []string) ([]*aclpb.RawChange, error) { + // TODO: think about when the clients will have their full acl tree and thus full snapshots + // but no changes after some of the snapshots + commonSnapshot, err := a.commonSnapshotForTwoPaths(a.SnapshotPath(), theirPath) + if err != nil { + return nil, err + } + // we presume that we have everything after the common snapshot, though this may not be the case in case of clients and only ACL tree changes + changes, err := a.treeBuilder.dfs(a.fullTree.Heads(), commonSnapshot, func(id string) (*Change, error) { + // using custom load function to skip verification step and save raw changes + raw, err := a.treeStorage.GetChange(context.Background(), id) + if err != nil { + return nil, err + } + + aclChange, err := a.treeBuilder.makeUnverifiedACLChange(raw) + if err != nil { + return nil, err + } + + ch := NewChange(id, aclChange) + ch.Raw = raw + return ch, nil + }) + if err != nil { + return nil, err + } + var rawChanges []*aclpb.RawChange + for _, ch := range changes { + rawChanges = append(rawChanges, ch.Raw) + } + return rawChanges, nil +} + +func (a *aclTree) commonSnapshotForTwoPaths(ourPath []string, theirPath []string) (string, error) { + var i int + var j int +OuterLoop: + // find starting point from the right + for i = len(ourPath) - 1; i >= 0; i-- { + for j = len(theirPath) - 1; j >= 0; j-- { + // most likely there would be only one comparison, because mostly the snapshot path will start from the root for nodes + if ourPath[i] == theirPath[j] { + break OuterLoop + } + } + } + if i < 0 || j < 0 { + return "", ErrNoCommonSnapshot + } + // find last common element of the sequence moving from right to left + for i >= 0 && j >= 0 { + if ourPath[i] == theirPath[j] { + i-- + j-- + } + } + return ourPath[i+1], nil +} diff --git a/pkg/acl/acltree/change.go b/pkg/acl/acltree/change.go index c768ef23..67eedf00 100644 --- a/pkg/acl/acltree/change.go +++ b/pkg/acl/acltree/change.go @@ -23,6 +23,7 @@ type Change struct { SnapshotId string IsSnapshot bool DecryptedDocumentChange []byte + Raw *aclpb.RawChange // this will not be present on all changes, we only need it sometimes Content *aclpb.ACLChange Sign []byte diff --git a/pkg/acl/acltree/changeloader.go b/pkg/acl/acltree/changeloader.go index d3b57a32..b308397a 100644 --- a/pkg/acl/acltree/changeloader.go +++ b/pkg/acl/acltree/changeloader.go @@ -91,3 +91,9 @@ func (c *changeLoader) makeVerifiedACLChange(change *aclpb.RawChange) (aclChange } return } + +func (c *changeLoader) makeUnverifiedACLChange(change *aclpb.RawChange) (aclChange *aclpb.ACLChange, err error) { + aclChange = new(aclpb.ACLChange) + err = proto.Unmarshal(change.Payload, aclChange) + return +} diff --git a/pkg/acl/acltree/treebuilder.go b/pkg/acl/acltree/treebuilder.go index c8aa9535..261a49f9 100644 --- a/pkg/acl/acltree/treebuilder.go +++ b/pkg/acl/acltree/treebuilder.go @@ -135,13 +135,16 @@ func (tb *treeBuilder) buildTree(heads []string, breakpoint string) (err error) return } tb.tree.AddFast(ch) - changes, err := tb.dfs(heads, breakpoint) + changes, err := tb.dfs(heads, breakpoint, tb.loadChange) tb.tree.AddFast(changes...) return } -func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, err error) { +func (tb *treeBuilder) dfs( + heads []string, + breakpoint string, + load func(string) (*Change, error)) (buf []*Change, err error) { stack := make([]string, len(heads), len(heads)*2) copy(stack, heads) @@ -154,7 +157,7 @@ func (tb *treeBuilder) dfs(heads []string, breakpoint string) (buf []*Change, er continue } - ch, err := tb.loadChange(id) + ch, err := load(id) if err != nil { continue } diff --git a/pkg/acl/example/plaintextdocument/document.go b/pkg/acl/example/plaintextdocument/document.go index 80e760b0..b25b2f8e 100644 --- a/pkg/acl/example/plaintextdocument/document.go +++ b/pkg/acl/example/plaintextdocument/document.go @@ -17,6 +17,7 @@ type PlainTextDocument interface { AddText(ctx context.Context, text string) error } +// TODO: this struct is not thread-safe, so use it wisely :-) type plainTextDocument struct { heads []string aclTree acltree.ACLTree diff --git a/pkg/acl/treestorage/inmemory.go b/pkg/acl/treestorage/inmemory.go index 8a2b7865..b9015116 100644 --- a/pkg/acl/treestorage/inmemory.go +++ b/pkg/acl/treestorage/inmemory.go @@ -143,7 +143,7 @@ func (i *inMemoryTreeStorageProvider) TreeStorage(treeId string) (TreeStorage, e if tree, exists := i.trees[treeId]; exists { return tree, nil } - return nil, UnknownTreeId + return nil, ErrUnknownTreeId } func (i *inMemoryTreeStorageProvider) InsertTree(tree TreeStorage) error { diff --git a/pkg/acl/treestorage/provider.go b/pkg/acl/treestorage/provider.go index c62096db..fd442655 100644 --- a/pkg/acl/treestorage/provider.go +++ b/pkg/acl/treestorage/provider.go @@ -2,7 +2,7 @@ package treestorage import "errors" -var UnknownTreeId = errors.New("tree does not exist") +var ErrUnknownTreeId = errors.New("tree does not exist") type Provider interface { TreeStorage(treeId string) (TreeStorage, error) diff --git a/service/sync/pubsub.go b/service/sync/pubsub.go deleted file mode 100644 index b85c8e65..00000000 --- a/service/sync/pubsub.go +++ /dev/null @@ -1,13 +0,0 @@ -package sync - -type PubSubPayload struct { -} - -type PubSub interface { - Send(msg *PubSubPayload) error - Listen(chan *PubSubPayload) error -} - -func NewPubSub(topic string) PubSub { - return nil -} diff --git a/service/sync/requesthandler.go b/service/sync/requesthandler.go index b4d0758b..349f59d3 100644 --- a/service/sync/requesthandler.go +++ b/service/sync/requesthandler.go @@ -20,7 +20,7 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u snapshotPath []string result acltree.AddResult ) - + err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { // TODO: check if we already have those changes result, err = tree.AddRawChanges(ctx, update.Changes...) @@ -30,7 +30,7 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) snapshotPath = tree.SnapshotPath() if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(tree) + fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.SnapshotPath, tree) if err != nil { return err } @@ -38,7 +38,7 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u return nil }) // if there are no such tree - if err == treestorage.UnknownTreeId { + if err == treestorage.ErrUnknownTreeId { fullRequest = &syncpb.SyncFullRequest{ TreeId: update.TreeId, } @@ -62,12 +62,25 @@ func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, u return } -func (r *requestHander) HandleFullSync(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error { +func (r *requestHander) HandleFullSyncRequest(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error { // TODO: add case of new tree return nil } -func (r *requestHander) prepareFullSyncRequest(tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { - - return nil, nil +func (r *requestHander) HandleFullSyncResponse(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error { + // TODO: add case of new tree + return nil +} + +func (r *requestHander) prepareFullSyncRequest(treeId string, theirPath []string, tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { + ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) + if err != nil { + return nil, err + } + return &syncpb.SyncFullRequest{ + Heads: tree.Heads(), + Changes: ourChanges, + TreeId: treeId, + SnapshotPath: tree.SnapshotPath(), + }, nil } diff --git a/service/sync/service.go b/service/sync/service.go index e26d54eb..fd6839bb 100644 --- a/service/sync/service.go +++ b/service/sync/service.go @@ -6,7 +6,6 @@ import ( ) type service struct { - pubSub PubSub } const CName = "SyncService" diff --git a/service/treecache/service.go b/service/treecache/service.go index 0aef289c..654d12ca 100644 --- a/service/treecache/service.go +++ b/service/treecache/service.go @@ -31,6 +31,9 @@ func (s *service) Do(ctx context.Context, treeId string, f func(tree acltree.ACL if err != nil { return err } + aclTree := tree.(acltree.ACLTree) + aclTree.Lock() + defer aclTree.Unlock() return f(tree.(acltree.ACLTree)) }