Change push space and some tree deletion logic

This commit is contained in:
mcrakhman 2022-11-24 20:24:59 +01:00
parent f6d8183001
commit 55e04cd2cd
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
14 changed files with 118 additions and 92 deletions

View File

@ -45,7 +45,8 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
SpaceSettingsPayload: req.Payload.SpaceSettingsPayload,
SpaceSettingsId: req.Payload.SpaceSettingsPayloadId,
}
err = r.s.AddSpace(ctx, description)
ctx = context.WithValue(ctx, commonspace.AddSpaceCtxKey, description)
_, err = r.s.GetSpace(ctx, description.SpaceHeader.GetId())
if err != nil {
return
}

View File

@ -23,7 +23,6 @@ func New() Service {
type Service interface {
GetSpace(ctx context.Context, id string) (commonspace.Space, error)
AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error)
CreateSpace(ctx context.Context, payload commonspace.SpaceCreatePayload) (commonspace.Space, error)
DeriveSpace(ctx context.Context, payload commonspace.SpaceDerivePayload) (commonspace.Space, error)
app.ComponentRunnable
@ -91,10 +90,6 @@ func (s *service) GetSpace(ctx context.Context, id string) (container commonspac
return v.(commonspace.Space), nil
}
func (s *service) AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) {
return s.commonSpace.AddSpace(ctx, description)
}
func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) {
cc, err := s.commonSpace.NewSpace(ctx, id)
if err != nil {

View File

@ -65,6 +65,7 @@ func (t treeKeys) RawChangePrefix() []byte {
}
type spaceKeys struct {
spaceId string
headerKey []byte
treePrefixKey []byte
spaceSettingsIdKey []byte
@ -72,6 +73,7 @@ type spaceKeys struct {
func newSpaceKeys(spaceId string) spaceKeys {
return spaceKeys{
spaceId: spaceId,
headerKey: storage.JoinStringsToBytes("space", "header", spaceId),
treePrefixKey: storage.JoinStringsToBytes("space", spaceId, "t", "rootId"),
spaceSettingsIdKey: storage.JoinStringsToBytes("space", spaceId, "spaceSettingsId"),
@ -90,6 +92,10 @@ func (s spaceKeys) SpaceSettingsId() []byte {
return s.spaceSettingsIdKey
}
func (s spaceKeys) TreeDeletedKey(id string) []byte {
return storage.JoinStringsToBytes("space", s.spaceId, "deleted", id)
}
type storageServiceKeys struct {
spacePrefix []byte
}

View File

@ -6,7 +6,6 @@ import (
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/dgraph-io/badger/v3"
"sync"
)
type spaceStorage struct {
@ -16,7 +15,6 @@ type spaceStorage struct {
keys spaceKeys
aclStorage storage.ListStorage
header *spacesyncproto.RawSpaceHeaderWithId
mx sync.Mutex
}
var spaceValidationFunc = spacestorage.ValidateSpaceStorageCreatePayload
@ -118,10 +116,6 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) {
}
func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
// we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation
s.mx.Lock()
defer s.mx.Unlock()
return createTreeStorage(s.objDb, s.spaceId, payload)
}
@ -156,6 +150,27 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) {
return
}
func (s *spaceStorage) SetTreeDeletedStatus(id, status string) (err error) {
return s.objDb.Update(func(txn *badger.Txn) error {
return txn.Set(s.keys.TreeDeletedKey(id), []byte(status))
})
}
func (s *spaceStorage) TreeDeletedStatus(id string) (status string, err error) {
err = s.objDb.View(func(txn *badger.Txn) error {
res, err := getTxn(txn, s.keys.TreeDeletedKey(id))
if err != nil {
return err
}
status = string(res)
return nil
})
if err == badger.ErrKeyNotFound {
err = nil
}
return
}
func (s *spaceStorage) Close() (err error) {
return nil
}

View File

@ -21,7 +21,7 @@ func (c *commonStorage) CreateTreeStorage(payload treestorage.TreeStorageCreateP
return
}
if status == "" {
return c.CreateTreeStorage(payload)
return c.SpaceStorage.CreateTreeStorage(payload)
}
err = storage.ErrTreeStorageAlreadyDeleted
return

View File

@ -26,11 +26,14 @@ func New() Service {
return &service{}
}
type ctxKey int
const AddSpaceCtxKey ctxKey = 0
type Service interface {
DeriveSpace(ctx context.Context, payload SpaceDerivePayload) (string, error)
CreateSpace(ctx context.Context, payload SpaceCreatePayload) (string, error)
NewSpace(ctx context.Context, id string) (sp Space, err error)
AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error)
app.Component
}
@ -83,36 +86,6 @@ func (s *service) DeriveSpace(ctx context.Context, payload SpaceDerivePayload) (
return store.Id(), nil
}
func (s *service) AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error) {
_, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id)
if err == nil {
err = spacesyncproto.ErrSpaceExists
return
}
if err != storage.ErrSpaceStorageMissing {
err = spacesyncproto.ErrUnexpected
return
}
payload := storage.SpaceStorageCreatePayload{
AclWithId: &aclrecordproto.RawACLRecordWithId{
Payload: spaceDescription.AclPayload,
Id: spaceDescription.AclId,
},
SpaceHeaderWithId: spaceDescription.SpaceHeader,
}
st, err := s.storageProvider.CreateSpaceStorage(payload)
if err != nil {
err = spacesyncproto.ErrUnexpected
if err == storage.ErrSpaceStorageExists {
err = spacesyncproto.ErrSpaceExists
}
return
}
err = st.Close()
return
}
func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
st, err := s.storageProvider.SpaceStorage(id)
if err != nil {
@ -120,10 +93,17 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
return nil, err
}
st, err = s.getSpaceStorageFromRemote(ctx, id)
if err != nil {
err = storage.ErrSpaceStorageMissing
return nil, err
if description, ok := ctx.Value(AddSpaceCtxKey).(SpaceDescription); ok {
st, err = s.addSpaceStorage(ctx, description)
if err != nil {
return nil, err
}
} else {
st, err = s.getSpaceStorageFromRemote(ctx, id)
if err != nil {
err = storage.ErrSpaceStorageMissing
return nil, err
}
}
}
@ -143,10 +123,39 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
return sp, nil
}
func (s *service) addSpaceStorage(ctx context.Context, spaceDescription SpaceDescription) (st storage.SpaceStorage, err error) {
_, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id)
if err == nil {
err = spacesyncproto.ErrSpaceExists
return
}
if err != storage.ErrSpaceStorageMissing {
err = spacesyncproto.ErrUnexpected
return
}
payload := storage.SpaceStorageCreatePayload{
AclWithId: &aclrecordproto.RawACLRecordWithId{
Payload: spaceDescription.AclPayload,
Id: spaceDescription.AclId,
},
SpaceHeaderWithId: spaceDescription.SpaceHeader,
}
st, err = s.storageProvider.CreateSpaceStorage(payload)
if err != nil {
err = spacesyncproto.ErrUnexpected
if err == storage.ErrSpaceStorageExists {
err = spacesyncproto.ErrSpaceExists
}
return
}
return
}
func (s *service) getSpaceStorageFromRemote(ctx context.Context, id string) (st storage.SpaceStorage, err error) {
var p peer.Peer
lastConfiguration := s.configurationService.GetLast()
// for nodes we always get remote space only if we have id in the context
// we can't connect to client if it is a node
if lastConfiguration.IsResponsible(id) {
err = spacesyncproto.ErrSpaceMissing
return

View File

@ -21,14 +21,12 @@ func newDeleter(st storage.SpaceStorage, state *deletionstate.DeletionState, get
func (d *deleter) delete() {
allQueued := d.state.GetQueued()
for _, id := range allQueued {
if _, err := d.st.TreeStorage(id); err == nil {
err := d.getter.DeleteTree(context.Background(), d.st.Id(), id)
if err != nil {
log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object")
continue
}
err := d.getter.DeleteTree(context.Background(), d.st.Id(), id)
if err != nil && err != storage.ErrTreeStorageAlreadyDeleted {
log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object")
continue
}
err := d.state.Delete(id)
err = d.state.Delete(id)
if err != nil {
log.With(zap.String("id", id), zap.Error(err)).Error("failed to mark object as deleted")
}

View File

@ -18,7 +18,6 @@ var log = logger.NewNamed("commonspace.settingsdocument")
type SettingsDocument interface {
tree.ObjectTree
Init(ctx context.Context) (err error)
Refresh()
DeleteObject(id string) (err error)
}
@ -75,8 +74,9 @@ func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument, err e
return
}
func (s *settingsDocument) Update(tr tree.ObjectTree) {
ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId)
func (s *settingsDocument) updateIds(lastChangeId string) {
s.lastChangeId = lastChangeId
ids, lastId, err := s.prov.ProvideIds(s, s.lastChangeId)
if err != nil {
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to update state")
return
@ -87,16 +87,12 @@ func (s *settingsDocument) Update(tr tree.ObjectTree) {
}
}
func (s *settingsDocument) Update(tr tree.ObjectTree) {
s.updateIds(s.lastChangeId)
}
func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
ids, lastId, err := s.prov.ProvideIds(tr, "")
if err != nil {
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to rebuild state")
return
}
s.lastChangeId = lastId
if err = s.deletionState.Add(ids); err != nil {
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete")
}
s.updateIds("")
}
func (s *settingsDocument) Init(ctx context.Context) (err error) {
@ -108,16 +104,6 @@ func (s *settingsDocument) Init(ctx context.Context) (err error) {
return
}
func (s *settingsDocument) Refresh() {
s.Lock()
defer s.Unlock()
if s.lastChangeId == "" {
s.Rebuild(s)
} else {
s.Update(s)
}
}
func (s *settingsDocument) Close() error {
s.loop.Close()
return s.ObjectTree.Close()

View File

@ -150,6 +150,15 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
return
}
status, err := deps.SpaceStorage.TreeDeletedStatus(id)
if err != nil {
return
}
if status != "" {
err = spacestorage.ErrTreeStorageAlreadyDeleted
return
}
resp, err := getTreeRemote()
if err != nil {
return

View File

@ -45,7 +45,8 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
SpaceSettingsPayload: req.Payload.SpaceSettingsPayload,
SpaceSettingsId: req.Payload.SpaceSettingsPayloadId,
}
err = r.s.AddSpace(ctx, description)
ctx = context.WithValue(ctx, commonspace.AddSpaceCtxKey, description)
_, err = r.s.GetSpace(ctx, description.SpaceHeader.GetId())
if err != nil {
return
}

View File

@ -22,7 +22,6 @@ func New() Service {
}
type Service interface {
AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error)
GetSpace(ctx context.Context, id string) (commonspace.Space, error)
app.ComponentRunnable
}
@ -63,10 +62,6 @@ func (s *service) GetSpace(ctx context.Context, id string) (commonspace.Space, e
return v.(commonspace.Space), nil
}
func (s *service) AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) {
return s.commonSpace.AddSpace(ctx, description)
}
func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) {
cc, err := s.commonSpace.NewSpace(ctx, id)
if err != nil {

View File

@ -46,7 +46,7 @@ func (t treeKeys) RawChangeKey(id string) []byte {
return storage.JoinStringsToBytes("t", t.id, id)
}
func (t treeKeys) isTreeRecordKey(key string) bool {
func (t treeKeys) isTreeRelatedKey(key string) bool {
return strings.HasPrefix(key, t.prefix)
}
@ -75,6 +75,10 @@ func (s spaceKeys) SpaceSettingsIdKey() []byte {
return spaceSettingsIdKey
}
func (s spaceKeys) TreeDeletedKey(id string) []byte {
return storage.JoinStringsToBytes("del", id)
}
func isTreeHeadsKey(key string) bool {
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads")
}

View File

@ -9,7 +9,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"go.uber.org/zap"
"path"
"sync"
"time"
)
@ -26,7 +25,6 @@ type spaceStorage struct {
keys spaceKeys
aclStorage storage.ListStorage
header *spacesyncproto.RawSpaceHeaderWithId
mx sync.Mutex
}
func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) {
@ -174,10 +172,6 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) {
}
func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
// we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation
s.mx.Lock()
defer s.mx.Unlock()
return createTreeStorage(s.objDb, payload)
}
@ -189,6 +183,19 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI
return s.header, nil
}
func (s *spaceStorage) SetTreeDeletedStatus(id, state string) (err error) {
return s.objDb.Put(s.keys.TreeDeletedKey(id), []byte(state))
}
func (s *spaceStorage) TreeDeletedStatus(id string) (status string, err error) {
res, err := s.objDb.Get(s.keys.TreeDeletedKey(id))
if err != nil {
return
}
status = string(res)
return
}
func (s *spaceStorage) StoredIds() (ids []string, err error) {
index := s.objDb.Items()

View File

@ -156,7 +156,7 @@ func (t *treeStorage) storedKeys() (keys [][]byte, err error) {
key, _, err := index.Next()
for err == nil {
strKey := string(key)
if t.keys.isTreeRecordKey(strKey) {
if t.keys.isTreeRelatedKey(strKey) {
keys = append(keys, key)
}
key, _, err = index.Next()