From 02ae77645d84e4691744f17c286bb159ba7ddbf4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 11 Aug 2022 15:33:39 +0200 Subject: [PATCH] Update sync layer to work with different trees --- pkg/acl/tree/acltree.go | 19 +- pkg/acl/tree/commontree.go | 21 ++ pkg/acl/tree/doctree.go | 18 +- service/sync/document/service.go | 2 + service/sync/requesthandler/requesthandler.go | 232 ++++++++++++++---- 5 files changed, 206 insertions(+), 86 deletions(-) create mode 100644 pkg/acl/tree/commontree.go diff --git a/pkg/acl/tree/acltree.go b/pkg/acl/tree/acltree.go index 2b5563b2..fdfc48eb 100644 --- a/pkg/acl/tree/acltree.go +++ b/pkg/acl/tree/acltree.go @@ -51,22 +51,11 @@ var ErrNoCommonSnapshot = errors.New("trees doesn't have a common snapshot") type ACLTree interface { RWLocker - ID() string - Header() *treepb.TreeHeader + CommonTree + ACLState() *ACLState AddContent(ctx context.Context, f func(builder ACLChangeBuilder) error) (*aclpb.RawChange, error) AddRawChanges(ctx context.Context, changes ...*aclpb.RawChange) (AddResult, error) - Heads() []string - Root() *Change - Iterate(func(change *Change) bool) - IterateFrom(string, func(change *Change) bool) - HasChange(string) bool - SnapshotPath() []string - ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) - Storage() treestorage.TreeStorage - DebugDump() (string, error) - - Close() error } type aclTree struct { @@ -389,10 +378,6 @@ func (a *aclTree) Root() *Change { return a.tree.Root() } -func (a *aclTree) Close() error { - return nil -} - func (a *aclTree) SnapshotPath() []string { // TODO: think about caching this diff --git a/pkg/acl/tree/commontree.go b/pkg/acl/tree/commontree.go new file mode 100644 index 00000000..063f21c5 --- /dev/null +++ b/pkg/acl/tree/commontree.go @@ -0,0 +1,21 @@ +package tree + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" +) + +type CommonTree interface { + ID() string + Header() *treepb.TreeHeader + Heads() []string + Root() *Change + Iterate(func(change *Change) bool) + IterateFrom(string, func(change *Change) bool) + HasChange(string) bool + SnapshotPath() []string + ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) + Storage() treestorage.TreeStorage + DebugDump() (string, error) +} diff --git a/pkg/acl/tree/doctree.go b/pkg/acl/tree/doctree.go index 8118fedb..ff4c6d4e 100644 --- a/pkg/acl/tree/doctree.go +++ b/pkg/acl/tree/doctree.go @@ -21,21 +21,9 @@ type TreeUpdateListener interface { type DocTree interface { RWLocker - ID() string - Header() *treepb.TreeHeader + CommonTree AddContent(ctx context.Context, aclTree ACLTree, content proto.Marshaler, isSnapshot bool) (*aclpb.RawChange, error) AddRawChanges(ctx context.Context, aclTree ACLTree, changes ...*aclpb.RawChange) (AddResult, error) - Heads() []string - Root() *Change - Iterate(func(change *Change) bool) - IterateFrom(string, func(change *Change) bool) - HasChange(string) bool - SnapshotPath() []string - ChangesAfterCommonSnapshot(snapshotPath []string) ([]*aclpb.RawChange, error) - Storage() treestorage.TreeStorage - DebugDump() (string, error) - - Close() error } type docTree struct { @@ -372,10 +360,6 @@ func (d *docTree) Root() *Change { return d.tree.Root() } -func (d *docTree) Close() error { - return nil -} - func (d *docTree) SnapshotPath() []string { // TODO: think about caching this diff --git a/service/sync/document/service.go b/service/sync/document/service.go index 7d5d2b18..cea7a3d8 100644 --- a/service/sync/document/service.go +++ b/service/sync/document/service.go @@ -77,6 +77,8 @@ func (s *service) UpdateDocumentTree(ctx context.Context, id, text string) (err err = s.treeCache.Do(ctx, id, func(obj interface{}) error { docTree := obj.(tree.DocTree) + docTree.Lock() + defer docTree.Unlock() err = s.treeCache.Do(ctx, docTree.Header().AclTreeId, func(obj interface{}) error { aclTree := obj.(tree.ACLTree) aclTree.RLock() diff --git a/service/sync/requesthandler/requesthandler.go b/service/sync/requesthandler/requesthandler.go index 2eb70551..4892ef14 100644 --- a/service/sync/requesthandler/requesthandler.go +++ b/service/sync/requesthandler/requesthandler.go @@ -2,10 +2,11 @@ package requesthandler import ( "context" + "errors" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclchanges/aclpb" - "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/acltree" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treestorage/treepb" "github.com/anytypeio/go-anytype-infrastructure-experiments/service/account" @@ -23,6 +24,8 @@ type requestHandler struct { var log = logger.NewNamed("requesthandler") +var ErrIncorrectDocType = errors.New("incorrec doc type") + func New() app.Component { return &requestHandler{} } @@ -74,29 +77,74 @@ func (r *requestHandler) HandleHeadUpdate(ctx context.Context, senderId string, var ( fullRequest *syncproto.SyncFullRequest snapshotPath []string - result acltree.AddResult + result tree.AddResult ) log.With(zap.String("peerId", senderId), zap.String("treeId", update.TreeId)). Debug("processing head update") - err = r.treeCache.Do(ctx, update.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - result, err = tree.AddRawChanges(ctx, update.Changes...) - if err != nil { - return err - } - log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", tree.Heads())). - Debug("comparing heads after head update") - shouldFullSync := !slice.UnsortedEquals(update.Heads, tree.Heads()) - snapshotPath = tree.SnapshotPath() - if shouldFullSync { - fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, tree) + updateACLTree := func() { + err = r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error { + t := obj.(tree.ACLTree) + t.Lock() + defer t.Unlock() + // TODO: check if we already have those changes + result, err = t.AddRawChanges(ctx, update.Changes...) if err != nil { return err } - } - return nil - }) + log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", t.Heads())). + Debug("comparing heads after head update") + shouldFullSync := !slice.UnsortedEquals(update.Heads, t.Heads()) + snapshotPath = t.SnapshotPath() + if shouldFullSync { + fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, t) + if err != nil { + return err + } + } + return nil + }) + } + + updateDocTree := func() { + err = r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error { + docTree := obj.(tree.DocTree) + docTree.Lock() + defer docTree.Unlock() + + return r.treeCache.Do(ctx, update.TreeId, func(obj interface{}) error { + aclTree := obj.(tree.ACLTree) + aclTree.RLock() + defer aclTree.RUnlock() + // TODO: check if we already have those changes + result, err = docTree.AddRawChanges(ctx, aclTree, update.Changes...) + if err != nil { + return err + } + log.With(zap.Strings("update heads", update.Heads), zap.Strings("tree heads", docTree.Heads())). + Debug("comparing heads after head update") + shouldFullSync := !slice.UnsortedEquals(update.Heads, docTree.Heads()) + snapshotPath = docTree.SnapshotPath() + if shouldFullSync { + fullRequest, err = r.prepareFullSyncRequest(update.TreeId, update.TreeHeader, update.SnapshotPath, docTree) + if err != nil { + return err + } + } + return nil + }) + }) + } + + switch update.TreeHeader.Type { + case treepb.TreeHeader_ACLTree: + updateACLTree() + case treepb.TreeHeader_DocTree: + updateDocTree() + default: + return ErrIncorrectDocType + } + // if there are no such tree if err == treestorage.ErrUnknownTreeId { // TODO: maybe we can optimize this by sending the header and stuff right away, so when the tree is created we are able to add it on first request @@ -128,27 +176,71 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str var ( fullResponse *syncproto.SyncFullResponse snapshotPath []string - result acltree.AddResult + result tree.AddResult ) log.With(zap.String("peerId", senderId), zap.String("treeId", request.TreeId)). Debug("processing full sync request") - err = r.treeCache.Do(ctx, request.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - // if we have non-empty request - if len(request.Heads) != 0 { - result, err = tree.AddRawChanges(ctx, request.Changes...) + requestACLTree := func() { + err = r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error { + t := obj.(tree.ACLTree) + t.Lock() + defer t.Unlock() + + // TODO: check if we already have those changes + // if we have non-empty request + if len(request.Heads) != 0 { + result, err = t.AddRawChanges(ctx, request.Changes...) + if err != nil { + return err + } + } + snapshotPath = t.SnapshotPath() + fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, t) if err != nil { return err } - } - snapshotPath = tree.SnapshotPath() - fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, tree) - if err != nil { - return err - } - return nil - }) + return nil + }) + } + + requestDocTree := func() { + err = r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error { + docTree := obj.(tree.DocTree) + docTree.Lock() + defer docTree.Unlock() + + return r.treeCache.Do(ctx, request.TreeId, func(obj interface{}) error { + aclTree := obj.(tree.ACLTree) + aclTree.RLock() + defer aclTree.RUnlock() + // TODO: check if we already have those changes + // if we have non-empty request + if len(request.Heads) != 0 { + result, err = docTree.AddRawChanges(ctx, aclTree, request.Changes...) + if err != nil { + return err + } + } + snapshotPath = docTree.SnapshotPath() + fullResponse, err = r.prepareFullSyncResponse(request.TreeId, request.SnapshotPath, request.Changes, docTree) + if err != nil { + return err + } + return nil + }) + }) + } + + switch request.TreeHeader.Type { + case treepb.TreeHeader_ACLTree: + requestACLTree() + case treepb.TreeHeader_DocTree: + requestDocTree() + default: + return ErrIncorrectDocType + } + if err != nil { return err } @@ -172,20 +264,56 @@ func (r *requestHandler) HandleFullSyncRequest(ctx context.Context, senderId str func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId string, response *syncproto.SyncFullResponse) (err error) { var ( snapshotPath []string - result acltree.AddResult + result tree.AddResult ) log.With(zap.String("peerId", senderId), zap.String("treeId", response.TreeId)). Debug("processing full sync response") - err = r.treeCache.Do(ctx, response.TreeId, func(tree acltree.ACLTree) error { - // TODO: check if we already have those changes - result, err = tree.AddRawChanges(ctx, response.Changes...) - if err != nil { - return err - } - snapshotPath = tree.SnapshotPath() - return nil - }) + responseACLTree := func() { + err = r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error { + t := obj.(tree.ACLTree) + t.Lock() + defer t.Unlock() + // TODO: check if we already have those changes + result, err = t.AddRawChanges(ctx, response.Changes...) + if err != nil { + return err + } + snapshotPath = t.SnapshotPath() + return nil + }) + } + + responseDocTree := func() { + err = r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error { + docTree := obj.(tree.DocTree) + docTree.Lock() + defer docTree.Unlock() + + return r.treeCache.Do(ctx, response.TreeId, func(obj interface{}) error { + aclTree := obj.(tree.ACLTree) + aclTree.RLock() + defer aclTree.RUnlock() + // TODO: check if we already have those changes + result, err = docTree.AddRawChanges(ctx, aclTree, response.Changes...) + if err != nil { + return err + } + snapshotPath = docTree.SnapshotPath() + return nil + }) + }) + } + + switch response.TreeHeader.Type { + case treepb.TreeHeader_ACLTree: + responseACLTree() + case treepb.TreeHeader_DocTree: + responseDocTree() + default: + return ErrIncorrectDocType + } + // if error or nothing has changed if (err != nil || len(result.Added) == 0) && err != treestorage.ErrUnknownTreeId { return err @@ -196,7 +324,7 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st if err != nil { return err } - result = acltree.AddResult{ + result = tree.AddResult{ OldHeads: []string{}, Heads: response.Heads, Added: response.Changes, @@ -212,16 +340,16 @@ func (r *requestHandler) HandleFullSyncResponse(ctx context.Context, senderId st return r.messageService.SendToSpaceAsync("", syncproto.WrapHeadUpdate(newUpdate)) } -func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, tree acltree.ACLTree) (*syncproto.SyncFullRequest, error) { - ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) +func (r *requestHandler) prepareFullSyncRequest(treeId string, header *treepb.TreeHeader, theirPath []string, t tree.CommonTree) (*syncproto.SyncFullRequest, error) { + ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath) if err != nil { return nil, err } return &syncproto.SyncFullRequest{ - Heads: tree.Heads(), + Heads: t.Heads(), Changes: ourChanges, TreeId: treeId, - SnapshotPath: tree.SnapshotPath(), + SnapshotPath: t.SnapshotPath(), TreeHeader: header, }, nil } @@ -230,9 +358,9 @@ func (r *requestHandler) prepareFullSyncResponse( treeId string, theirPath []string, theirChanges []*aclpb.RawChange, - tree acltree.ACLTree) (*syncproto.SyncFullResponse, error) { + t tree.CommonTree) (*syncproto.SyncFullResponse, error) { // TODO: we can probably use the common snapshot calculated on the request step from previous peer - ourChanges, err := tree.ChangesAfterCommonSnapshot(theirPath) + ourChanges, err := t.ChangesAfterCommonSnapshot(theirPath) if err != nil { return nil, err } @@ -252,11 +380,11 @@ func (r *requestHandler) prepareFullSyncResponse( Debug("preparing changes for tree") return &syncproto.SyncFullResponse{ - Heads: tree.Heads(), + Heads: t.Heads(), Changes: final, TreeId: treeId, - SnapshotPath: tree.SnapshotPath(), - TreeHeader: tree.Header(), + SnapshotPath: t.SnapshotPath(), + TreeHeader: t.Header(), }, nil } @@ -266,7 +394,7 @@ func (r *requestHandler) createTree(ctx context.Context, response *syncproto.Syn response.TreeId, response.TreeHeader, response.Changes, - func(tree acltree.ACLTree) error { + func(obj interface{}) error { return nil }) }