From 5b821c0d2a29ee9f57debd7be91785dc3fef3bfa Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 15 Jul 2022 16:27:10 +0200 Subject: [PATCH] Start using raw changes + wip request handler --- pkg/acl/acltree/acltree.go | 63 ++++++++++++++++++++++++++------- pkg/acl/acltree/acltree_test.go | 22 ++---------- service/sync/requesthandler.go | 36 ++++++++++++++++--- service/sync/syncpb/pbutils.go | 22 ++++++++++++ 4 files changed, 106 insertions(+), 37 deletions(-) create mode 100644 service/sync/syncpb/pbutils.go diff --git a/pkg/acl/acltree/acltree.go b/pkg/acl/acltree/acltree.go index 90bf337a..53690850 100644 --- a/pkg/acl/acltree/acltree.go +++ b/pkg/acl/acltree/acltree.go @@ -19,10 +19,16 @@ const ( type AddResult struct { OldHeads []string Heads []string + Added []*aclpb.RawChange // TODO: add summary for changes Summary AddResultSummary } +type HeadWithPathToRoot struct { + Id string + Path []string +} + type TreeUpdateListener interface { Update(tree ACLTree) Rebuild(tree ACLTree) @@ -37,13 +43,13 @@ func (n NoOpListener) Rebuild(tree ACLTree) {} type ACLTree interface { ACLState() *ACLState AddContent(ctx context.Context, f func(builder ChangeBuilder) error) (*Change, error) - AddChanges(ctx context.Context, changes ...*Change) (AddResult, error) AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error) Heads() []string Root() *Change Iterate(func(change *Change) bool) IterateFrom(string, func(change *Change) bool) HasChange(string) bool + HeadsPathToRoot() []HeadWithPathToRoot Close() error } @@ -247,25 +253,21 @@ func (a *aclTree) AddContent(ctx context.Context, build func(builder ChangeBuild } func (a *aclTree) AddRawChanges(ctx context.Context, rawChanges ...*aclpb.RawChange) (AddResult, error) { - var aclChanges []*Change + a.Lock() + // TODO: make proper error handling, because there are a lot of corner cases where this will break + var err error + var mode Mode + + var changes []*Change for _, ch := range rawChanges { change, err := NewFromRawChange(ch) // TODO: think what if we will have incorrect signatures on rawChanges, how everything will work if err != nil { continue } - aclChanges = append(aclChanges, change) + changes = append(changes, change) } - return a.AddChanges(ctx, aclChanges...) -} - -func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult, error) { - a.Lock() - // TODO: make proper error handling, because there are a lot of corner cases where this will break - var err error - var mode Mode - defer func() { if err != nil { return @@ -292,6 +294,16 @@ func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult } }() + getAddedChanges := func() []*aclpb.RawChange { + var added []*aclpb.RawChange + for _, ch := range rawChanges { + if _, exists := a.fullTree.attached[ch.Id]; exists { + added = append(added, ch) + } + } + return added + } + for _, ch := range changes { err = a.treeStorage.AddChange(ch) if err != nil { @@ -322,6 +334,7 @@ func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult return AddResult{ OldHeads: prevHeads, Heads: a.fullTree.Heads(), + Added: getAddedChanges(), Summary: AddResultSummaryRebuild, }, nil default: @@ -335,6 +348,7 @@ func (a *aclTree) AddChanges(ctx context.Context, changes ...*Change) (AddResult return AddResult{ OldHeads: prevHeads, Heads: a.fullTree.Heads(), + Added: getAddedChanges(), Summary: AddResultSummaryAppend, }, nil } @@ -376,3 +390,28 @@ func (a *aclTree) Root() *Change { func (a *aclTree) Close() error { return nil } + +func (a *aclTree) HeadsPathToRoot() []HeadWithPathToRoot { + a.RLock() + defer a.RUnlock() + var headsWithPath []HeadWithPathToRoot + for _, h := range a.fullTree.Heads() { + headWithPath := HeadWithPathToRoot{ + Id: h, + } + var path []string + // TODO: think that the user may have not all of the snapshots locally + currentSnapshotId := a.fullTree.attached[h].SnapshotId + for currentSnapshotId != "" { + sn, err := a.treeBuilder.loadChange(currentSnapshotId) + if err != nil { + break + } + path = append(path, currentSnapshotId) + currentSnapshotId = sn.SnapshotId + } + headWithPath.Path = path + headsWithPath = append(headsWithPath, headWithPath) + } + return headsWithPath +} diff --git a/pkg/acl/acltree/acltree_test.go b/pkg/acl/acltree/acltree_test.go index e9a72226..3a0d8e5b 100644 --- a/pkg/acl/acltree/acltree_test.go +++ b/pkg/acl/acltree/acltree_test.go @@ -85,16 +85,7 @@ func TestACLTree_UserJoinUpdate_Append(t *testing.T) { t.Fatalf("should Build acl ACLState without err: %v", err) } rawChanges := thr.GetUpdates("append") - var changes []*Change - for _, ch := range rawChanges { - newCh, err := NewFromRawChange(ch) - if err != nil { - t.Fatalf("should be able to create change from raw: %v", err) - } - changes = append(changes, newCh) - } - - res, err := tree.AddChanges(context.Background(), changes...) + res, err := tree.AddRawChanges(context.Background(), rawChanges...) assert.Equal(t, res.Summary, AddResultSummaryAppend) aclState := tree.ACLState() @@ -135,16 +126,7 @@ func TestACLTree_UserJoinUpdate_Rebuild(t *testing.T) { t.Fatalf("should Build acl ACLState without err: %v", err) } rawChanges := thr.GetUpdates("rebuild") - var changes []*Change - for _, ch := range rawChanges { - newCh, err := NewFromRawChange(ch) - if err != nil { - t.Fatalf("should be able to create change from raw: %v", err) - } - changes = append(changes, newCh) - } - - res, err := tree.AddChanges(context.Background(), changes...) + res, err := tree.AddRawChanges(context.Background(), rawChanges...) assert.Equal(t, res.Summary, AddResultSummaryRebuild) aclState := tree.ACLState() diff --git a/service/sync/requesthandler.go b/service/sync/requesthandler.go index c935885a..b425af6f 100644 --- a/service/sync/requesthandler.go +++ b/service/sync/requesthandler.go @@ -2,6 +2,7 @@ package sync import ( "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/sync/syncpb" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/treecache" @@ -13,19 +14,44 @@ type requestHander struct { client SyncClient } -func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) error { - err := r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { - _, err := tree.AddRawChanges(ctx, update.Changes...) +func (r *requestHander) HandleHeadUpdate(ctx context.Context, senderId string, update *syncpb.SyncHeadUpdate) (err error) { + var fullRequest *syncpb.SyncFullRequest + var addedChanges []*aclpb.RawChange + var headsWithPath []acltree.HeadWithPathToRoot + defer func() { + if err != nil || fullRequest != nil { + return + } + newUpdate := syncpb.NewHeadsUpdate(update.TreeId, headsWithPath, addedChanges) + err = r.client.NotifyHeadsChanged(newUpdate) + }() + err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { + // TODO: check if we already have those changes + res, err := tree.AddRawChanges(ctx, update.Changes...) if err != nil { return err } + addedChanges = res.Added shouldFullSync := !r.compareHeads(update.Heads, tree.Heads()) - + if shouldFullSync { + fullRequest, err = r.prepareFullSyncRequest(tree) + if err != nil { + return err + } + } + headsWithPath = tree.HeadsPathToRoot() return nil }) if err != nil { return err } + if fullRequest != nil { + return r.client.RequestFullSync(senderId, fullRequest) + } + return +} + +func (r *requestHander) HandleFullSync(ctx context.Context, senderId string, request *syncpb.SyncFullRequest) error { return nil } @@ -39,5 +65,5 @@ func (r *requestHander) compareHeads(syncHeads []*syncpb.SyncHead, heads []strin } func (r *requestHander) prepareFullSyncRequest(tree acltree.ACLTree) (*syncpb.SyncFullRequest, error) { - + return nil, nil } diff --git a/service/sync/syncpb/pbutils.go b/service/sync/syncpb/pbutils.go new file mode 100644 index 00000000..fddc86ec --- /dev/null +++ b/service/sync/syncpb/pbutils.go @@ -0,0 +1,22 @@ +package syncpb + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" +) + +func NewHeadsUpdate(treeId string, headsWithPath []acltree.HeadWithPathToRoot, changes []*aclpb.RawChange) *SyncHeadUpdate { + var heads []*SyncHead + for _, headWithPath := range headsWithPath { + syncHead := &SyncHead{ + Id: headWithPath.Id, + SnapshotPath: headWithPath.Path, + } + heads = append(heads, syncHead) + } + return &SyncHeadUpdate{ + Heads: heads, + Changes: changes, + TreeId: treeId, + } +}