Change push space and some tree deletion logic
This commit is contained in:
parent
bba4efaa50
commit
5715450aff
@ -45,7 +45,8 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
|
||||
SpaceSettingsPayload: req.Payload.SpaceSettingsPayload,
|
||||
SpaceSettingsId: req.Payload.SpaceSettingsPayloadId,
|
||||
}
|
||||
err = r.s.AddSpace(ctx, description)
|
||||
ctx = context.WithValue(ctx, commonspace.AddSpaceCtxKey, description)
|
||||
_, err = r.s.GetSpace(ctx, description.SpaceHeader.GetId())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -23,7 +23,6 @@ func New() Service {
|
||||
|
||||
type Service interface {
|
||||
GetSpace(ctx context.Context, id string) (commonspace.Space, error)
|
||||
AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error)
|
||||
CreateSpace(ctx context.Context, payload commonspace.SpaceCreatePayload) (commonspace.Space, error)
|
||||
DeriveSpace(ctx context.Context, payload commonspace.SpaceDerivePayload) (commonspace.Space, error)
|
||||
app.ComponentRunnable
|
||||
@ -91,10 +90,6 @@ func (s *service) GetSpace(ctx context.Context, id string) (container commonspac
|
||||
return v.(commonspace.Space), nil
|
||||
}
|
||||
|
||||
func (s *service) AddSpace(ctx context.Context, description commonspace.SpaceDescription) (err error) {
|
||||
return s.commonSpace.AddSpace(ctx, description)
|
||||
}
|
||||
|
||||
func (s *service) loadSpace(ctx context.Context, id string) (value ocache.Object, err error) {
|
||||
cc, err := s.commonSpace.NewSpace(ctx, id)
|
||||
if err != nil {
|
||||
|
||||
@ -65,6 +65,7 @@ func (t treeKeys) RawChangePrefix() []byte {
|
||||
}
|
||||
|
||||
type spaceKeys struct {
|
||||
spaceId string
|
||||
headerKey []byte
|
||||
treePrefixKey []byte
|
||||
spaceSettingsIdKey []byte
|
||||
@ -72,6 +73,7 @@ type spaceKeys struct {
|
||||
|
||||
func newSpaceKeys(spaceId string) spaceKeys {
|
||||
return spaceKeys{
|
||||
spaceId: spaceId,
|
||||
headerKey: storage.JoinStringsToBytes("space", "header", spaceId),
|
||||
treePrefixKey: storage.JoinStringsToBytes("space", spaceId, "t", "rootId"),
|
||||
spaceSettingsIdKey: storage.JoinStringsToBytes("space", spaceId, "spaceSettingsId"),
|
||||
@ -90,6 +92,10 @@ func (s spaceKeys) SpaceSettingsId() []byte {
|
||||
return s.spaceSettingsIdKey
|
||||
}
|
||||
|
||||
func (s spaceKeys) TreeDeletedKey(id string) []byte {
|
||||
return storage.JoinStringsToBytes("space", s.spaceId, "deleted", id)
|
||||
}
|
||||
|
||||
type storageServiceKeys struct {
|
||||
spacePrefix []byte
|
||||
}
|
||||
|
||||
@ -6,7 +6,6 @@ import (
|
||||
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||
"github.com/dgraph-io/badger/v3"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type spaceStorage struct {
|
||||
@ -16,7 +15,6 @@ type spaceStorage struct {
|
||||
keys spaceKeys
|
||||
aclStorage storage.ListStorage
|
||||
header *spacesyncproto.RawSpaceHeaderWithId
|
||||
mx sync.Mutex
|
||||
}
|
||||
|
||||
var spaceValidationFunc = spacestorage.ValidateSpaceStorageCreatePayload
|
||||
@ -118,10 +116,6 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) {
|
||||
}
|
||||
|
||||
func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
|
||||
// we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
|
||||
return createTreeStorage(s.objDb, s.spaceId, payload)
|
||||
}
|
||||
|
||||
@ -156,6 +150,27 @@ func (s *spaceStorage) StoredIds() (ids []string, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *spaceStorage) SetTreeDeletedStatus(id, status string) (err error) {
|
||||
return s.objDb.Update(func(txn *badger.Txn) error {
|
||||
return txn.Set(s.keys.TreeDeletedKey(id), []byte(status))
|
||||
})
|
||||
}
|
||||
|
||||
func (s *spaceStorage) TreeDeletedStatus(id string) (status string, err error) {
|
||||
err = s.objDb.View(func(txn *badger.Txn) error {
|
||||
res, err := getTxn(txn, s.keys.TreeDeletedKey(id))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
status = string(res)
|
||||
return nil
|
||||
})
|
||||
if err == badger.ErrKeyNotFound {
|
||||
err = nil
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *spaceStorage) Close() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -21,7 +21,7 @@ func (c *commonStorage) CreateTreeStorage(payload treestorage.TreeStorageCreateP
|
||||
return
|
||||
}
|
||||
if status == "" {
|
||||
return c.CreateTreeStorage(payload)
|
||||
return c.SpaceStorage.CreateTreeStorage(payload)
|
||||
}
|
||||
err = storage.ErrTreeStorageAlreadyDeleted
|
||||
return
|
||||
|
||||
@ -26,11 +26,14 @@ func New() Service {
|
||||
return &service{}
|
||||
}
|
||||
|
||||
type ctxKey int
|
||||
|
||||
const AddSpaceCtxKey ctxKey = 0
|
||||
|
||||
type Service interface {
|
||||
DeriveSpace(ctx context.Context, payload SpaceDerivePayload) (string, error)
|
||||
CreateSpace(ctx context.Context, payload SpaceCreatePayload) (string, error)
|
||||
NewSpace(ctx context.Context, id string) (sp Space, err error)
|
||||
AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error)
|
||||
app.Component
|
||||
}
|
||||
|
||||
@ -83,36 +86,6 @@ func (s *service) DeriveSpace(ctx context.Context, payload SpaceDerivePayload) (
|
||||
return store.Id(), nil
|
||||
}
|
||||
|
||||
func (s *service) AddSpace(ctx context.Context, spaceDescription SpaceDescription) (err error) {
|
||||
_, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id)
|
||||
if err == nil {
|
||||
err = spacesyncproto.ErrSpaceExists
|
||||
return
|
||||
}
|
||||
if err != storage.ErrSpaceStorageMissing {
|
||||
err = spacesyncproto.ErrUnexpected
|
||||
return
|
||||
}
|
||||
|
||||
payload := storage.SpaceStorageCreatePayload{
|
||||
AclWithId: &aclrecordproto.RawACLRecordWithId{
|
||||
Payload: spaceDescription.AclPayload,
|
||||
Id: spaceDescription.AclId,
|
||||
},
|
||||
SpaceHeaderWithId: spaceDescription.SpaceHeader,
|
||||
}
|
||||
st, err := s.storageProvider.CreateSpaceStorage(payload)
|
||||
if err != nil {
|
||||
err = spacesyncproto.ErrUnexpected
|
||||
if err == storage.ErrSpaceStorageExists {
|
||||
err = spacesyncproto.ErrSpaceExists
|
||||
}
|
||||
return
|
||||
}
|
||||
err = st.Close()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
st, err := s.storageProvider.SpaceStorage(id)
|
||||
if err != nil {
|
||||
@ -120,12 +93,19 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if description, ok := ctx.Value(AddSpaceCtxKey).(SpaceDescription); ok {
|
||||
st, err = s.addSpaceStorage(ctx, description)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
st, err = s.getSpaceStorageFromRemote(ctx, id)
|
||||
if err != nil {
|
||||
err = storage.ErrSpaceStorageMissing
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
lastConfiguration := s.configurationService.GetLast()
|
||||
confConnector := nodeconf.NewConfConnector(lastConfiguration, s.pool)
|
||||
@ -143,10 +123,39 @@ func (s *service) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
return sp, nil
|
||||
}
|
||||
|
||||
func (s *service) addSpaceStorage(ctx context.Context, spaceDescription SpaceDescription) (st storage.SpaceStorage, err error) {
|
||||
_, err = s.storageProvider.SpaceStorage(spaceDescription.SpaceHeader.Id)
|
||||
if err == nil {
|
||||
err = spacesyncproto.ErrSpaceExists
|
||||
return
|
||||
}
|
||||
if err != storage.ErrSpaceStorageMissing {
|
||||
err = spacesyncproto.ErrUnexpected
|
||||
return
|
||||
}
|
||||
|
||||
payload := storage.SpaceStorageCreatePayload{
|
||||
AclWithId: &aclrecordproto.RawACLRecordWithId{
|
||||
Payload: spaceDescription.AclPayload,
|
||||
Id: spaceDescription.AclId,
|
||||
},
|
||||
SpaceHeaderWithId: spaceDescription.SpaceHeader,
|
||||
}
|
||||
st, err = s.storageProvider.CreateSpaceStorage(payload)
|
||||
if err != nil {
|
||||
err = spacesyncproto.ErrUnexpected
|
||||
if err == storage.ErrSpaceStorageExists {
|
||||
err = spacesyncproto.ErrSpaceExists
|
||||
}
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *service) getSpaceStorageFromRemote(ctx context.Context, id string) (st storage.SpaceStorage, err error) {
|
||||
var p peer.Peer
|
||||
lastConfiguration := s.configurationService.GetLast()
|
||||
// for nodes we always get remote space only if we have id in the context
|
||||
// we can't connect to client if it is a node
|
||||
if lastConfiguration.IsResponsible(id) {
|
||||
err = spacesyncproto.ErrSpaceMissing
|
||||
return
|
||||
|
||||
@ -21,14 +21,12 @@ func newDeleter(st storage.SpaceStorage, state *deletionstate.DeletionState, get
|
||||
func (d *deleter) delete() {
|
||||
allQueued := d.state.GetQueued()
|
||||
for _, id := range allQueued {
|
||||
if _, err := d.st.TreeStorage(id); err == nil {
|
||||
err := d.getter.DeleteTree(context.Background(), d.st.Id(), id)
|
||||
if err != nil {
|
||||
if err != nil && err != storage.ErrTreeStorageAlreadyDeleted {
|
||||
log.With(zap.String("id", id), zap.Error(err)).Error("failed to delete object")
|
||||
continue
|
||||
}
|
||||
}
|
||||
err := d.state.Delete(id)
|
||||
err = d.state.Delete(id)
|
||||
if err != nil {
|
||||
log.With(zap.String("id", id), zap.Error(err)).Error("failed to mark object as deleted")
|
||||
}
|
||||
|
||||
@ -18,7 +18,6 @@ var log = logger.NewNamed("commonspace.settingsdocument")
|
||||
type SettingsDocument interface {
|
||||
tree.ObjectTree
|
||||
Init(ctx context.Context) (err error)
|
||||
Refresh()
|
||||
DeleteObject(id string) (err error)
|
||||
}
|
||||
|
||||
@ -75,8 +74,9 @@ func NewSettingsDocument(deps Deps, spaceId string) (doc SettingsDocument, err e
|
||||
return
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Update(tr tree.ObjectTree) {
|
||||
ids, lastId, err := s.prov.ProvideIds(tr, s.lastChangeId)
|
||||
func (s *settingsDocument) updateIds(lastChangeId string) {
|
||||
s.lastChangeId = lastChangeId
|
||||
ids, lastId, err := s.prov.ProvideIds(s, s.lastChangeId)
|
||||
if err != nil {
|
||||
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to update state")
|
||||
return
|
||||
@ -87,16 +87,12 @@ func (s *settingsDocument) Update(tr tree.ObjectTree) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Update(tr tree.ObjectTree) {
|
||||
s.updateIds(s.lastChangeId)
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Rebuild(tr tree.ObjectTree) {
|
||||
ids, lastId, err := s.prov.ProvideIds(tr, "")
|
||||
if err != nil {
|
||||
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to rebuild state")
|
||||
return
|
||||
}
|
||||
s.lastChangeId = lastId
|
||||
if err = s.deletionState.Add(ids); err != nil {
|
||||
log.With(zap.Strings("ids", ids), zap.Error(err)).Error("failed to queue ids to delete")
|
||||
}
|
||||
s.updateIds("")
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Init(ctx context.Context) (err error) {
|
||||
@ -108,16 +104,6 @@ func (s *settingsDocument) Init(ctx context.Context) (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Refresh() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
if s.lastChangeId == "" {
|
||||
s.Rebuild(s)
|
||||
} else {
|
||||
s.Update(s)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *settingsDocument) Close() error {
|
||||
s.loop.Close()
|
||||
return s.ObjectTree.Close()
|
||||
|
||||
@ -148,6 +148,15 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
|
||||
return
|
||||
}
|
||||
|
||||
status, err := deps.SpaceStorage.TreeDeletedStatus(id)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if status != "" {
|
||||
err = spacestorage.ErrTreeStorageAlreadyDeleted
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := getTreeRemote()
|
||||
if err != nil {
|
||||
return
|
||||
|
||||
@ -45,7 +45,8 @@ func (r *rpcHandler) PushSpace(ctx context.Context, req *spacesyncproto.PushSpac
|
||||
SpaceSettingsPayload: req.Payload.SpaceSettingsPayload,
|
||||
SpaceSettingsId: req.Payload.SpaceSettingsPayloadId,
|
||||
}
|
||||
err = r.s.AddSpace(ctx, description)
|
||||
ctx = context.WithValue(ctx, commonspace.AddSpaceCtxKey, description)
|
||||
_, err = r.s.GetSpace(ctx, description.SpaceHeader.GetId())
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@ -46,7 +46,7 @@ func (t treeKeys) RawChangeKey(id string) []byte {
|
||||
return storage.JoinStringsToBytes("t", t.id, id)
|
||||
}
|
||||
|
||||
func (t treeKeys) isTreeRecordKey(key string) bool {
|
||||
func (t treeKeys) isTreeRelatedKey(key string) bool {
|
||||
return strings.HasPrefix(key, t.prefix)
|
||||
}
|
||||
|
||||
@ -75,6 +75,10 @@ func (s spaceKeys) SpaceSettingsIdKey() []byte {
|
||||
return spaceSettingsIdKey
|
||||
}
|
||||
|
||||
func (s spaceKeys) TreeDeletedKey(id string) []byte {
|
||||
return storage.JoinStringsToBytes("del", id)
|
||||
}
|
||||
|
||||
func isTreeHeadsKey(key string) bool {
|
||||
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads")
|
||||
}
|
||||
|
||||
@ -9,7 +9,6 @@ import (
|
||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
|
||||
"go.uber.org/zap"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -26,7 +25,6 @@ type spaceStorage struct {
|
||||
keys spaceKeys
|
||||
aclStorage storage.ListStorage
|
||||
header *spacesyncproto.RawSpaceHeaderWithId
|
||||
mx sync.Mutex
|
||||
}
|
||||
|
||||
func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) {
|
||||
@ -174,10 +172,6 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) {
|
||||
}
|
||||
|
||||
func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
|
||||
// we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation
|
||||
s.mx.Lock()
|
||||
defer s.mx.Unlock()
|
||||
|
||||
return createTreeStorage(s.objDb, payload)
|
||||
}
|
||||
|
||||
@ -189,6 +183,19 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI
|
||||
return s.header, nil
|
||||
}
|
||||
|
||||
func (s *spaceStorage) SetTreeDeletedStatus(id, state string) (err error) {
|
||||
return s.objDb.Put(s.keys.TreeDeletedKey(id), []byte(state))
|
||||
}
|
||||
|
||||
func (s *spaceStorage) TreeDeletedStatus(id string) (status string, err error) {
|
||||
res, err := s.objDb.Get(s.keys.TreeDeletedKey(id))
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
status = string(res)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *spaceStorage) StoredIds() (ids []string, err error) {
|
||||
index := s.objDb.Items()
|
||||
|
||||
|
||||
@ -156,7 +156,7 @@ func (t *treeStorage) storedKeys() (keys [][]byte, err error) {
|
||||
key, _, err := index.Next()
|
||||
for err == nil {
|
||||
strKey := string(key)
|
||||
if t.keys.isTreeRecordKey(strKey) {
|
||||
if t.keys.isTreeRelatedKey(strKey) {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
key, _, err = index.Next()
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user