From 5715450aff27afe079e61c02e8dec48fc687424e Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 24 Nov 2022 20:24:59 +0100 Subject: [PATCH] Change push space and some tree deletion logic --- client/clientspace/rpchandler.go | 3 +- client/clientspace/service.go | 5 -- client/storage/keys.go | 6 ++ client/storage/spacestorage.go | 27 +++++-- common/commonspace/commonstorage.go | 2 +- common/commonspace/service.go | 81 ++++++++++--------- .../commonspace/settingsdocument/deleter.go | 12 ++- .../settingsdocument/settingsdocument.go | 30 ++----- common/commonspace/synctree/synctree.go | 9 +++ node/nodespace/rpchandler.go | 3 +- node/storage/keys.go | 6 +- node/storage/spacestorage.go | 19 +++-- node/storage/treestorage.go | 2 +- 13 files changed, 118 insertions(+), 87 deletions(-) diff --git a/client/clientspace/rpchandler.go b/client/clientspace/rpchandler.go index bef22138..223efb84 100644 --- a/client/clientspace/rpchandler.go +++ b/client/clientspace/rpchandler.go @@ -45,7 +45,8 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac SpaceSettingsPayload: req.Payload.SpaceSettingsPayload, SpaceSettingsId: req.Payload.SpaceSettingsPayloadId, } - err = r.s.AddSpace(ctx, description) + ctx = context.WithValue(ctx, commonspace.AddSpaceCtxKey, description) + _, err = r.s.GetSpace(ctx, description.SpaceHeader.GetId()) if err != nil { return } diff --git a/client/clientspace/service.go b/client/clientspace/service.go index d22ab135..fc43d7b8 100644 --- a/client/clientspace/service.go +++ b/client/clientspace/service.go @@ -23,7 +23,6 @@ func New() Service { type Service interface { GetSpace(ctx context.Context, id string) (commonspace.Space, error) - AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) CreateSpace(ctx context.Context, payload commonspace.SpaceCreatePayload) (commonspace.Space, error) DeriveSpace(ctx context.Context, payload commonspace.SpaceDerivePayload) (commonspace.Space, error) app.ComponentRunnable @@ -91,10 +90,6 @@ func (s *service) GetSpace(ctx context.Context, id string) (container commonspac return v.(commonspace.Space), nil } -func (s *service) AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) { - return s.commonSpace.AddSpace(ctx, description) -} - func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) { cc, err := s.commonSpace.NewSpace(ctx, id) if err != nil { diff --git a/client/storage/keys.go b/client/storage/keys.go index 45eb8cb0..cd19c07a 100644 --- a/client/storage/keys.go +++ b/client/storage/keys.go @@ -65,6 +65,7 @@ func (t treeKeys) RawChangePrefix() []byte { } type spaceKeys struct { + spaceId string headerKey []byte treePrefixKey []byte spaceSettingsIdKey []byte @@ -72,6 +73,7 @@ type spaceKeys struct { func newSpaceKeys(spaceId string) spaceKeys { return spaceKeys{ + spaceId: spaceId, headerKey: storage.JoinStringsToBytes("space", "header", spaceId), treePrefixKey: storage.JoinStringsToBytes("space", spaceId, "t", "rootId"), spaceSettingsIdKey: storage.JoinStringsToBytes("space", spaceId, "spaceSettingsId"), @@ -90,6 +92,10 @@ func (s spaceKeys) SpaceSettingsId() []byte { return s.spaceSettingsIdKey } +func (s spaceKeys) TreeDeletedKey(id string) []byte { + return storage.JoinStringsToBytes("space", s.spaceId, "deleted", id) +} + type storageServiceKeys struct { spacePrefix []byte } diff --git a/client/storage/spacestorage.go b/client/storage/spacestorage.go index 5c4f1866..7479808c 100644 --- a/client/storage/spacestorage.go +++ b/client/storage/spacestorage.go @@ -6,7 +6,6 @@ import ( storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/dgraph-io/badger/v3" - "sync" ) type spaceStorage struct { @@ -16,7 +15,6 @@ type spaceStorage struct { keys spaceKeys aclStorage storage.ListStorage header *spacesyncproto.RawSpaceHeaderWithId - mx sync.Mutex } var spaceValidationFunc = spacestorage.ValidateSpaceStorageCreatePayload @@ -118,10 +116,6 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) { } func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { - // we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation - s.mx.Lock() - defer s.mx.Unlock() - return createTreeStorage(s.objDb, s.spaceId, payload) } @@ -156,6 +150,27 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) { return } +func (s *spaceStorage) SetTreeDeletedStatus(id, status string) (err error) { + return s.objDb.Update(func(txn *badger.Txn) error { + return txn.Set(s.keys.TreeDeletedKey(id), []byte(status)) + }) +} + +func (s *spaceStorage) TreeDeletedStatus(id string) (status string, err error) { + err = s.objDb.View(func(txn *badger.Txn) error { + res, err := getTxn(txn, s.keys.TreeDeletedKey(id)) + if err != nil { + return err + } + status = string(res) + return nil + }) + if err == badger.ErrKeyNotFound { + err = nil + } + return +} + func (s *spaceStorage) Close() (err error) { return nil } diff --git a/common/commonspace/commonstorage.go b/common/commonspace/commonstorage.go index ab6d5ca6..3299f54e 100644 --- a/common/commonspace/commonstorage.go +++ b/common/commonspace/commonstorage.go @@ -21,7 +21,7 @@ func (c *commonStorage) CreateTreeStorage(payload treestorage.TreeStorageCreateP return } if status == "" { - return c.CreateTreeStorage(payload) + return c.SpaceStorage.CreateTreeStorage(payload) } err = storage.ErrTreeStorageAlreadyDeleted return diff --git a/common/commonspace/service.go b/common/commonspace/service.go index dc4d4b83..a5d45a75 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -26,11 +26,14 @@ func New() Service { return &service{} } +type ctxKey int + +const AddSpaceCtxKey ctxKey = 0 + type Service interface { DeriveSpace(ctx context.Context, payload SpaceDerivePayload) (string, error) CreateSpace(ctx context.Context, payload SpaceCreatePayload) (string, error) NewSpace(ctx context.Context, id string) (sp Space, err error) - AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error) app.Component } @@ -83,36 +86,6 @@ func (s *service) DeriveSpace(ctx context.Context, payload SpaceDerivePayload) ( return store.Id(), nil } -func (s *service) AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error) { - _, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id) - if err == nil { - err = spacesyncproto.ErrSpaceExists - return - } - if err != storage.ErrSpaceStorageMissing { - err = spacesyncproto.ErrUnexpected - return - } - - payload := storage.SpaceStorageCreatePayload{ - AclWithId: &aclrecordproto.RawACLRecordWithId{ - Payload: spaceDescription.AclPayload, - Id: spaceDescription.AclId, - }, - SpaceHeaderWithId: spaceDescription.SpaceHeader, - } - st, err := s.storageProvider.CreateSpaceStorage(payload) - if err != nil { - err = spacesyncproto.ErrUnexpected - if err == storage.ErrSpaceStorageExists { - err = spacesyncproto.ErrSpaceExists - } - return - } - err = st.Close() - return -} - func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { st, err := s.storageProvider.SpaceStorage(id) if err != nil { @@ -120,10 +93,17 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { return nil, err } - st, err = s.getSpaceStorageFromRemote(ctx, id) - if err != nil { - err = storage.ErrSpaceStorageMissing - return nil, err + if description, ok := ctx.Value(AddSpaceCtxKey).(SpaceDescription); ok { + st, err = s.addSpaceStorage(ctx, description) + if err != nil { + return nil, err + } + } else { + st, err = s.getSpaceStorageFromRemote(ctx, id) + if err != nil { + err = storage.ErrSpaceStorageMissing + return nil, err + } } } @@ -143,10 +123,39 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) { return sp, nil } +func (s *service) addSpaceStorage(ctx context.Context, spaceDescription SpaceDescription) (st storage.SpaceStorage, err error) { + _, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id) + if err == nil { + err = spacesyncproto.ErrSpaceExists + return + } + if err != storage.ErrSpaceStorageMissing { + err = spacesyncproto.ErrUnexpected + return + } + + payload := storage.SpaceStorageCreatePayload{ + AclWithId: &aclrecordproto.RawACLRecordWithId{ + Payload: spaceDescription.AclPayload, + Id: spaceDescription.AclId, + }, + SpaceHeaderWithId: spaceDescription.SpaceHeader, + } + st, err = s.storageProvider.CreateSpaceStorage(payload) + if err != nil { + err = spacesyncproto.ErrUnexpected + if err == storage.ErrSpaceStorageExists { + err = spacesyncproto.ErrSpaceExists + } + return + } + return +} + func (s *service) getSpaceStorageFromRemote(ctx context.Context, id string) (st storage.SpaceStorage, err error) { var p peer.Peer lastConfiguration := s.configurationService.GetLast() - // for nodes we always get remote space only if we have id in the context + // we can't connect to client if it is a node if lastConfiguration.IsResponsible(id) { err = spacesyncproto.ErrSpaceMissing return diff --git a/common/commonspace/settingsdocument/deleter.go b/common/commonspace/settingsdocument/deleter.go index 48b6c871..c259e0d3 100644 --- a/common/commonspace/settingsdocument/deleter.go +++ b/common/commonspace/settingsdocument/deleter.go @@ -21,14 +21,12 @@ func newDeleter(st storage.SpaceStorage, state *deletionstate.DeletionState, get func (d *deleter) delete() { allQueued := d.state.GetQueued() for _, id := range allQueued { - if _, err := d.st.TreeStorage(id); err == nil { - err := d.getter.DeleteTree(context.Background(), d.st.Id(), id) - if err != nil { - log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object") - continue - } + err := d.getter.DeleteTree(context.Background(), d.st.Id(), id) + if err != nil && err != storage.ErrTreeStorageAlreadyDeleted { + log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object") + continue } - err := d.state.Delete(id) + err = d.state.Delete(id) if err != nil { log.With(zap.String("id", id), zap.Error(err)).Error("failed to mark object as deleted") } diff --git a/common/commonspace/settingsdocument/settingsdocument.go b/common/commonspace/settingsdocument/settingsdocument.go index 3060cb20..0dea6b8e 100644 --- a/common/commonspace/settingsdocument/settingsdocument.go +++ b/common/commonspace/settingsdocument/settingsdocument.go @@ -18,7 +18,6 @@ var log = logger.NewNamed("commonspace.settingsdocument") type SettingsDocument interface { tree.ObjectTree Init(ctx context.Context) (err error) - Refresh() DeleteObject(id string) (err error) } @@ -75,8 +74,9 @@ func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument, err e return } -func (s *settingsDocument) Update(tr tree.ObjectTree) { - ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId) +func (s *settingsDocument) updateIds(lastChangeId string) { + s.lastChangeId = lastChangeId + ids, lastId, err := s.prov.ProvideIds(s, s.lastChangeId) if err != nil { log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to update state") return @@ -87,16 +87,12 @@ func (s *settingsDocument) Update(tr tree.ObjectTree) { } } +func (s *settingsDocument) Update(tr tree.ObjectTree) { + s.updateIds(s.lastChangeId) +} + func (s *settingsDocument) Rebuild(tr tree.ObjectTree) { - ids, lastId, err := s.prov.ProvideIds(tr, "") - if err != nil { - log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to rebuild state") - return - } - s.lastChangeId = lastId - if err = s.deletionState.Add(ids); err != nil { - log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete") - } + s.updateIds("") } func (s *settingsDocument) Init(ctx context.Context) (err error) { @@ -108,16 +104,6 @@ func (s *settingsDocument) Init(ctx context.Context) (err error) { return } -func (s *settingsDocument) Refresh() { - s.Lock() - defer s.Unlock() - if s.lastChangeId == "" { - s.Rebuild(s) - } else { - s.Update(s) - } -} - func (s *settingsDocument) Close() error { s.loop.Close() return s.ObjectTree.Close() diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 024aa1dd..2953cbc1 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -148,6 +148,15 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t return } + status, err := deps.SpaceStorage.TreeDeletedStatus(id) + if err != nil { + return + } + if status != "" { + err = spacestorage.ErrTreeStorageAlreadyDeleted + return + } + resp, err := getTreeRemote() if err != nil { return diff --git a/node/nodespace/rpchandler.go b/node/nodespace/rpchandler.go index 901f0a3e..f7b3f6a1 100644 --- a/node/nodespace/rpchandler.go +++ b/node/nodespace/rpchandler.go @@ -45,7 +45,8 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac SpaceSettingsPayload: req.Payload.SpaceSettingsPayload, SpaceSettingsId: req.Payload.SpaceSettingsPayloadId, } - err = r.s.AddSpace(ctx, description) + ctx = context.WithValue(ctx, commonspace.AddSpaceCtxKey, description) + _, err = r.s.GetSpace(ctx, description.SpaceHeader.GetId()) if err != nil { return } diff --git a/node/storage/keys.go b/node/storage/keys.go index ef728d74..650d392c 100644 --- a/node/storage/keys.go +++ b/node/storage/keys.go @@ -46,7 +46,7 @@ func (t treeKeys) RawChangeKey(id string) []byte { return storage.JoinStringsToBytes("t", t.id, id) } -func (t treeKeys) isTreeRecordKey(key string) bool { +func (t treeKeys) isTreeRelatedKey(key string) bool { return strings.HasPrefix(key, t.prefix) } @@ -75,6 +75,10 @@ func (s spaceKeys) SpaceSettingsIdKey() []byte { return spaceSettingsIdKey } +func (s spaceKeys) TreeDeletedKey(id string) []byte { + return storage.JoinStringsToBytes("del", id) +} + func isTreeHeadsKey(key string) bool { return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads") } diff --git a/node/storage/spacestorage.go b/node/storage/spacestorage.go index 232dc838..b89c346b 100644 --- a/node/storage/spacestorage.go +++ b/node/storage/spacestorage.go @@ -9,7 +9,6 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "go.uber.org/zap" "path" - "sync" "time" ) @@ -26,7 +25,6 @@ type spaceStorage struct { keys spaceKeys aclStorage storage.ListStorage header *spacesyncproto.RawSpaceHeaderWithId - mx sync.Mutex } func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) { @@ -174,10 +172,6 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) { } func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { - // we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation - s.mx.Lock() - defer s.mx.Unlock() - return createTreeStorage(s.objDb, payload) } @@ -189,6 +183,19 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI return s.header, nil } +func (s *spaceStorage) SetTreeDeletedStatus(id, state string) (err error) { + return s.objDb.Put(s.keys.TreeDeletedKey(id), []byte(state)) +} + +func (s *spaceStorage) TreeDeletedStatus(id string) (status string, err error) { + res, err := s.objDb.Get(s.keys.TreeDeletedKey(id)) + if err != nil { + return + } + status = string(res) + return +} + func (s *spaceStorage) StoredIds() (ids []string, err error) { index := s.objDb.Items() diff --git a/node/storage/treestorage.go b/node/storage/treestorage.go index 9cca39eb..4877203e 100644 --- a/node/storage/treestorage.go +++ b/node/storage/treestorage.go @@ -156,7 +156,7 @@ func (t *treeStorage) storedKeys() (keys [][]byte, err error) { key, _, err := index.Next() for err == nil { strKey := string(key) - if t.keys.isTreeRecordKey(strKey) { + if t.keys.isTreeRelatedKey(strKey) { keys = append(keys, key) } key, _, err = index.Next()