From 6d38f381ceee38b12e371c4ee87f1b8914c987d1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 15 Sep 2022 13:42:51 +0200 Subject: [PATCH] Update tree sync logic --- common/commonspace/cache/treecache.go | 12 +++++-- common/commonspace/service.go | 8 +++++ common/commonspace/space.go | 32 +++++++++++++++++++ common/commonspace/storage/storage.go | 13 ++++++++ common/commonspace/syncservice/synchandler.go | 6 ++-- common/commonspace/synctree/synctree.go | 2 +- 6 files changed, 67 insertions(+), 6 deletions(-) create mode 100644 common/commonspace/storage/storage.go diff --git a/common/commonspace/cache/treecache.go b/common/commonspace/cache/treecache.go index 342c9042..9d688f50 100644 --- a/common/commonspace/cache/treecache.go +++ b/common/commonspace/cache/treecache.go @@ -2,16 +2,24 @@ package cache import ( "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" ) +const CName = "commonspace.cache" + +type TreeContainer interface { + Tree() tree.ObjectTree +} + type TreeResult struct { - Release func() - Tree tree.ObjectTree + Release func() + TreeContainer TreeContainer } type TreeCache interface { + app.ComponentRunnable GetTree(ctx context.Context, id string) (TreeResult, error) AddTree(ctx context.Context, payload storage.TreeStorageCreatePayload) error } diff --git a/common/commonspace/service.go b/common/commonspace/service.go index daeb321d..3c084562 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -4,6 +4,8 @@ import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/app/logger" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" @@ -25,11 +27,15 @@ type Service interface { type service struct { config config.Space configurationService nodeconf.Service + storage storage.Storage + cache cache.TreeCache } func (s *service) Init(a *app.App) (err error) { s.config = a.MustComponent(config.CName).(*config.Config).Space s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) + s.storage = a.MustComponent(storage.CName).(storage.Storage) + s.cache = a.MustComponent(cache.CName).(cache.TreeCache) return nil } @@ -44,6 +50,8 @@ func (s *service) CreateSpace(ctx context.Context, id string) (Space, error) { nconf: s.configurationService.GetLast(), conf: s.config, syncService: syncService, + cache: s.cache, + storage: s.storage, } if err := sp.Init(ctx); err != nil { return nil, err diff --git a/common/commonspace/space.go b/common/commonspace/space.go index 0c223a09..58b4391e 100644 --- a/common/commonspace/space.go +++ b/common/commonspace/space.go @@ -3,12 +3,17 @@ package commonspace import ( "context" "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/cache" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/remotediff" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/synctree" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf" "github.com/anytypeio/go-anytype-infrastructure-experiments/config" + treestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/tree" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ldiff" "go.uber.org/zap" "math/rand" @@ -22,6 +27,9 @@ type Space interface { SpaceSyncRpc() RpcHandler SyncService() syncservice.SyncService + CreateTree(payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) + BuildTree(id string, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) + Close() error } @@ -35,6 +43,21 @@ type space struct { rpc *rpcHandler periodicSync *periodicSync syncService syncservice.SyncService + storage storage.Storage + cache cache.TreeCache +} + +func (s *space) CreateTree(payload tree.ObjectTreeCreatePayload, listener tree.ObjectTreeUpdateListener) (tree.ObjectTree, error) { + return synctree.CreateSyncTree(payload, s.syncService, listener, nil, s.storage.CreateTreeStorage) +} + +func (s *space) BuildTree(id string, listener tree.ObjectTreeUpdateListener) (t tree.ObjectTree, err error) { + store, err := s.storage.Storage(id) + if err != nil { + return + } + + return synctree.BuildSyncTree(s.syncService, store.(treestorage.TreeStorage), listener, nil) } func (s *space) Id() string { @@ -100,10 +123,19 @@ func (s *space) syncWithPeer(ctx context.Context, p peer.Peer) (err error) { if err != nil { return nil } + s.pingTreesInCache(ctx, newIds) + s.pingTreesInCache(ctx, changedIds) + log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), zap.Int("removedIds", len(removedIds))) return } +func (s *space) pingTreesInCache(ctx context.Context, trees []string) { + for _, tId := range trees { + _, _ = s.cache.GetTree(ctx, tId) + } +} + func (s *space) getPeers(ctx context.Context) (peers []peer.Peer, err error) { if s.nconf.IsResponsible(s.id) { return s.nconf.AllPeers(ctx, s.id) diff --git a/common/commonspace/storage/storage.go b/common/commonspace/storage/storage.go new file mode 100644 index 00000000..a9694988 --- /dev/null +++ b/common/commonspace/storage/storage.go @@ -0,0 +1,13 @@ +package storage + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" +) + +type Storage interface { + storage.Provider + app.ComponentRunnable +} + +const CName = "commonspace.storage" diff --git a/common/commonspace/syncservice/synchandler.go b/common/commonspace/syncservice/synchandler.go index e53373b5..87d8d445 100644 --- a/common/commonspace/syncservice/synchandler.go +++ b/common/commonspace/syncservice/synchandler.go @@ -59,7 +59,7 @@ func (s *syncHandler) HandleHeadUpdate( } err = func() error { - objTree := res.Tree + objTree := res.TreeContainer.Tree() objTree.Lock() defer res.Release() defer objTree.Unlock() @@ -111,7 +111,7 @@ func (s *syncHandler) HandleFullSyncRequest( // TODO: check if sync request contains changes and add them (also do head update in this case) err = func() error { - objTree := res.Tree + objTree := res.TreeContainer.Tree() objTree.Lock() defer res.Release() defer objTree.Unlock() @@ -143,7 +143,7 @@ func (s *syncHandler) HandleFullSyncResponse( } err = func() error { - objTree := res.Tree + objTree := res.TreeContainer.Tree() objTree.Lock() defer res.Release() defer objTree.Unlock() diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 8a948735..3b84ee77 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -16,8 +16,8 @@ type SyncTree struct { } func CreateSyncTree( - syncService syncservice.SyncService, payload tree.ObjectTreeCreatePayload, + syncService syncservice.SyncService, listener tree.ObjectTreeUpdateListener, aclList list.ACLList, createStorage storage.TreeStorageCreatorFunc) (t tree.ObjectTree, err error) {