WIP document deletion

This commit is contained in:
mcrakhman 2022-11-11 12:37:45 +01:00
parent 020b1f1d71
commit 72d83ca171
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
15 changed files with 263 additions and 93 deletions

View File

@ -31,18 +31,20 @@ func (a aclKeys) RawRecordKey(id string) []byte {
}
type treeKeys struct {
id string
spaceId string
headsKey []byte
rootKey []byte
id string
spaceId string
headsKey []byte
rootKey []byte
rawChangePrefix []byte
}
func newTreeKeys(spaceId, id string) treeKeys {
return treeKeys{
id: id,
spaceId: spaceId,
headsKey: storage.JoinStringsToBytes("space", spaceId, "t", id, "heads"),
rootKey: storage.JoinStringsToBytes("space", spaceId, "t", "rootId", id),
id: id,
spaceId: spaceId,
headsKey: storage.JoinStringsToBytes("space", spaceId, "t", id, "heads"),
rootKey: storage.JoinStringsToBytes("space", spaceId, "t", "rootId", id),
rawChangePrefix: storage.JoinStringsToBytes("space", spaceId, "t", id),
}
}
@ -58,6 +60,10 @@ func (t treeKeys) RawChangeKey(id string) []byte {
return storage.JoinStringsToBytes("space", t.spaceId, "t", t.id, id)
}
func (t treeKeys) RawChangePrefix() []byte {
return t.rawChangePrefix
}
type spaceKeys struct {
headerKey []byte
treePrefixKey []byte

View File

@ -136,3 +136,45 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
func (t *treeStorage) HasChange(ctx context.Context, id string) (bool, error) {
return hasDB(t.db, t.keys.RawChangeKey(id)), nil
}
func (t *treeStorage) Delete() (err error) {
storedKeys, err := t.storedKeys()
if err != nil {
return
}
err = t.db.Update(func(txn *badger.Txn) error {
for _, k := range storedKeys {
err = txn.Delete(k)
if err != nil {
return err
}
}
return nil
})
return
}
func (t *treeStorage) storedKeys() (keys [][]byte, err error) {
err = t.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.PrefetchValues = false
opts.Prefix = t.keys.RawChangePrefix()
it := txn.NewIterator(opts)
defer it.Close()
for it.Rewind(); it.Valid(); it.Next() {
item := it.Item()
key := item.Key()
// if it is a heads key
if len(key) <= len(t.keys.HeadsKey()) {
continue
}
keyCopy := make([]byte, 0, len(key))
keyCopy = item.KeyCopy(key)
keys = append(keys, keyCopy)
}
return nil
})
return
}

View File

@ -1,28 +0,0 @@
package deletionservice
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
)
type Service interface {
}
const deletionChangeType = "space.deletionlist"
type service struct {
account account.Service
}
func New() Service {
return nil
}
func deriveDeletionTreePayload(account account.Service, spaceId string) tree.ObjectTreeCreatePayload {
return tree.ObjectTreeCreatePayload{
SignKey: account.Account().SignKey,
ChangeType: deletionChangeType,
SpaceId: spaceId,
Identity: account.Account().Identity,
}
}

View File

@ -3,7 +3,9 @@ package commonspace
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
aclrecordproto2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
aclrecordproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/common"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/cid"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"hash/fnv"
@ -11,6 +13,11 @@ import (
"time"
)
const (
SpaceSettingsChangeType = "reserved.spacesettings"
SpaceDerivationScheme = "derivation.standard"
)
func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload storage.SpaceStorageCreatePayload, err error) {
// unmarshalling signing and encryption keys
identity, err := payload.SigningKey.GetPublic().Raw()
@ -23,8 +30,8 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload st
}
// preparing header and space id
bytes := make([]byte, 32)
_, err = rand.Read(bytes)
spaceHeaderSeed := make([]byte, 32)
_, err = rand.Read(spaceHeaderSeed)
if err != nil {
return
}
@ -33,7 +40,7 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload st
Timestamp: time.Now().UnixNano(),
SpaceType: payload.SpaceType,
ReplicationKey: payload.ReplicationKey,
Seed: bytes,
Seed: spaceHeaderSeed,
}
marshalled, err := header.Marshal()
if err != nil {
@ -68,12 +75,11 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload st
}
// preparing acl
aclRoot := &aclrecordproto2.ACLRoot{
aclRoot := &aclrecordproto.ACLRoot{
Identity: identity,
EncryptionKey: encPubKey,
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
DerivationScheme: "",
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
}
@ -82,10 +88,31 @@ func storagePayloadForSpaceCreate(payload SpaceCreatePayload) (storagePayload st
return
}
builder := tree.NewChangeBuilder(common.NewKeychain(), nil)
spaceSettingsSeed := make([]byte, 32)
_, err = rand.Read(spaceSettingsSeed)
if err != nil {
return
}
_, settingsRoot, err := builder.BuildInitialContent(tree.InitialContent{
AclHeadId: rawWithId.Id,
Identity: aclRoot.Identity,
SigningKey: payload.SigningKey,
SpaceId: spaceId,
Seed: spaceSettingsSeed,
ChangeType: SpaceSettingsChangeType,
Timestamp: time.Now().UnixNano(),
})
if err != nil {
return
}
// creating storage
storagePayload = storage.SpaceStorageCreatePayload{
AclWithId: rawWithId,
SpaceHeaderWithId: rawHeaderWithId,
AclWithId: rawWithId,
SpaceHeaderWithId: rawHeaderWithId,
SpaceSettingsWithId: settingsRoot,
}
return
}
@ -144,7 +171,7 @@ func storagePayloadForSpaceDerive(payload SpaceDerivePayload) (storagePayload st
}
// deriving and encrypting read key
readKey, err := aclrecordproto2.ACLReadKeyDerive(signPrivKey, encPrivKey)
readKey, err := aclrecordproto.ACLReadKeyDerive(signPrivKey, encPrivKey)
if err != nil {
return
}
@ -160,29 +187,41 @@ func storagePayloadForSpaceDerive(payload SpaceDerivePayload) (storagePayload st
}
// preparing acl
aclRoot := &aclrecordproto2.ACLRoot{
aclRoot := &aclrecordproto.ACLRoot{
Identity: identity,
EncryptionKey: encPubKey,
SpaceId: spaceId,
EncryptedReadKey: encReadKey,
DerivationScheme: "",
DerivationScheme: SpaceDerivationScheme,
CurrentReadKeyHash: readKeyHash,
Timestamp: time.Now().UnixNano(),
}
rawWithId, err := marshalACLRoot(aclRoot, payload.SigningKey)
if err != nil {
return
}
builder := tree.NewChangeBuilder(common.NewKeychain(), nil)
_, settingsRoot, err := builder.BuildInitialContent(tree.InitialContent{
AclHeadId: rawWithId.Id,
Identity: aclRoot.Identity,
SigningKey: payload.SigningKey,
SpaceId: spaceId,
ChangeType: SpaceSettingsChangeType,
})
if err != nil {
return
}
// creating storage
storagePayload = storage.SpaceStorageCreatePayload{
AclWithId: rawWithId,
SpaceHeaderWithId: rawHeaderWithId,
AclWithId: rawWithId,
SpaceHeaderWithId: rawHeaderWithId,
SpaceSettingsWithId: settingsRoot,
}
return
}
func marshalACLRoot(aclRoot *aclrecordproto2.ACLRoot, key signingkey.PrivKey) (rawWithId *aclrecordproto2.RawACLRecordWithId, err error) {
func marshalACLRoot(aclRoot *aclrecordproto.ACLRoot, key signingkey.PrivKey) (rawWithId *aclrecordproto.RawACLRecordWithId, err error) {
marshalledRoot, err := aclRoot.Marshal()
if err != nil {
return
@ -191,7 +230,7 @@ func marshalACLRoot(aclRoot *aclrecordproto2.ACLRoot, key signingkey.PrivKey) (r
if err != nil {
return
}
raw := &aclrecordproto2.RawACLRecord{
raw := &aclrecordproto.RawACLRecord{
Payload: marshalledRoot,
Signature: signature,
}
@ -203,7 +242,7 @@ func marshalACLRoot(aclRoot *aclrecordproto2.ACLRoot, key signingkey.PrivKey) (r
if err != nil {
return
}
rawWithId = &aclrecordproto2.RawACLRecordWithId{
rawWithId = &aclrecordproto.RawACLRecordWithId{
Payload: marshalledRaw,
Id: aclHeadId,
}

View File

@ -0,0 +1,22 @@
package settingsservice
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
)
type Service interface {
}
const deletionChangeType = "space.deletionlist"
type service struct {
account account.Service
}
func New() Service {
return nil
}
type DeletedDocumentNotifiable interface {
NotifyDeleted(id string)
}

View File

@ -141,6 +141,7 @@ func (s *space) Init(ctx context.Context) (err error) {
if err != nil {
return
}
s.aclList = syncacl.NewSyncACL(aclList, s.syncService.StreamPool())
objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache)
s.syncService.Init(objectGetter)

View File

@ -6,6 +6,7 @@ import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/settingsservice"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler"
@ -20,15 +21,20 @@ import (
"go.uber.org/zap"
)
var ErrSyncTreeClosed = errors.New("sync tree is closed")
var (
ErrSyncTreeClosed = errors.New("sync tree is closed")
ErrSyncTreeDeleted = errors.New("sync tree is deleted")
)
// SyncTree sends head updates to sync service and also sends new changes to update listener
type SyncTree struct {
tree.ObjectTree
synchandler.SyncHandler
syncClient SyncClient
listener updatelistener.UpdateListener
isClosed bool
syncClient SyncClient
listener updatelistener.UpdateListener
deletedNotifiable settingsservice.DeletedDocumentNotifiable
isClosed bool
isDeleted bool
}
var log = logger.NewNamed("commonspace.synctree").Sugar()
@ -39,25 +45,27 @@ var buildObjectTree = tree.BuildObjectTree
var createSyncClient = newSyncClient
type CreateDeps struct {
SpaceId string
Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener
AclList list.ACLList
CreateStorage storage.TreeStorageCreatorFunc
SpaceId string
Payload tree.ObjectTreeCreatePayload
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
StreamPool syncservice.StreamPool
Listener updatelistener.UpdateListener
AclList list.ACLList
CreateStorage storage.TreeStorageCreatorFunc
DeletedNotifiable settingsservice.DeletedDocumentNotifiable
}
type BuildDeps struct {
SpaceId string
StreamPool syncservice.StreamPool
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
SpaceId string
StreamPool syncservice.StreamPool
Configuration nodeconf.Configuration
HeadNotifiable diffservice.HeadNotifiable
Listener updatelistener.UpdateListener
AclList list.ACLList
SpaceStorage spacestorage.SpaceStorage
TreeStorage storage.TreeStorage
DeletedNotifiable settingsservice.DeletedDocumentNotifiable
}
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) {
@ -72,9 +80,10 @@ func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er
sharedFactory,
deps.Configuration)
syncTree := &SyncTree{
ObjectTree: t,
syncClient: syncClient,
listener: deps.Listener,
ObjectTree: t,
syncClient: syncClient,
listener: deps.Listener,
deletedNotifiable: deps.DeletedNotifiable,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
@ -97,9 +106,10 @@ func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, er
GetRequestFactory(),
deps.Configuration)
syncTree := &SyncTree{
ObjectTree: t,
syncClient: syncClient,
listener: deps.Listener,
ObjectTree: t,
syncClient: syncClient,
listener: deps.Listener,
deletedNotifiable: deps.DeletedNotifiable,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
@ -182,9 +192,10 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tr
GetRequestFactory(),
deps.Configuration)
syncTree := &SyncTree{
ObjectTree: t,
syncClient: syncClient,
listener: deps.Listener,
ObjectTree: t,
syncClient: syncClient,
listener: deps.Listener,
deletedNotifiable: deps.DeletedNotifiable,
}
syncHandler := newSyncTreeHandler(syncTree, syncClient)
syncTree.SyncHandler = syncHandler
@ -203,8 +214,7 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tr
}
func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeContent) (res tree.AddResult, err error) {
if s.isClosed { // checkAlive err
err = ErrSyncTreeClosed
if err = s.checkAlive(); err != nil {
return
}
res, err = s.ObjectTree.AddContent(ctx, content)
@ -217,8 +227,7 @@ func (s *SyncTree) AddContent(ctx context.Context, content tree.SignableChangeCo
}
func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeproto.RawTreeChangeWithId) (res tree.AddResult, err error) {
if s.isClosed {
err = ErrSyncTreeClosed
if err = s.checkAlive(); err != nil {
return
}
res, err = s.ObjectTree.AddRawChanges(ctx, changes...)
@ -242,15 +251,44 @@ func (s *SyncTree) AddRawChanges(ctx context.Context, changes ...*treechangeprot
return
}
func (s *SyncTree) Delete() (err error) {
log.With("id", s.ID()).Debug("deleting sync tree")
s.Lock()
defer func() {
s.Unlock()
if err == nil {
s.deletedNotifiable.NotifyDeleted(s.ID())
}
}()
if err = s.checkAlive(); err != nil {
return
}
err = s.ObjectTree.Delete()
if err != nil {
return
}
s.isDeleted = true
return
}
func (s *SyncTree) Close() (err error) {
log.With("id", s.ID()).Debug("closing sync tree")
s.Lock()
defer s.Unlock()
log.With("id", s.ID()).Debug("taken lock on sync tree")
if s.isClosed {
err = ErrSyncTreeClosed
if err = s.checkAlive(); err != nil {
return
}
s.isClosed = true
return
}
func (s *SyncTree) checkAlive() (err error) {
if s.isClosed {
err = ErrSyncTreeClosed
}
if s.isDeleted {
err = ErrSyncTreeDeleted
}
return
}

View File

@ -157,6 +157,10 @@ func (t *inMemoryTreeStorage) GetRawChange(ctx context.Context, changeId string)
return nil, fmt.Errorf("could not get change with id: %s", changeId)
}
func (t *inMemoryTreeStorage) Delete() error {
return nil
}
type inMemoryStorageProvider struct {
objects map[string]TreeStorage
sync.RWMutex

View File

@ -14,6 +14,7 @@ type TreeStorage interface {
AddRawChange(change *treechangeproto.RawTreeChangeWithId) error
GetRawChange(ctx context.Context, id string) (*treechangeproto.RawTreeChangeWithId, error)
HasChange(ctx context.Context, id string) (bool, error)
Delete() error
}
type TreeStorageCreatorFunc = func(payload TreeStorageCreatePayload) (TreeStorage, error)

View File

@ -48,7 +48,7 @@ type changeBuilder struct {
keys *common.Keychain
}
func newChangeBuilder(keys *common.Keychain, rootChange *treechangeproto.RawTreeChangeWithId) ChangeBuilder {
func NewChangeBuilder(keys *common.Keychain, rootChange *treechangeproto.RawTreeChangeWithId) ChangeBuilder {
return &changeBuilder{keys: keys, rootChange: rootChange}
}

View File

@ -57,6 +57,7 @@ type ObjectTree interface {
AddContent(ctx context.Context, content SignableChangeContent) (AddResult, error)
AddRawChanges(ctx context.Context, changes ...*treechangeproto.RawTreeChangeWithId) (AddResult, error)
Delete() error
Close() error
}
@ -100,7 +101,7 @@ func defaultObjectTreeDeps(
aclList list2.ACLList) objectTreeDeps {
keychain := common.NewKeychain()
changeBuilder := newChangeBuilder(keychain, rootChange)
changeBuilder := NewChangeBuilder(keychain, rootChange)
treeBuilder := newTreeBuilder(treeStorage, changeBuilder)
return objectTreeDeps{
changeBuilder: changeBuilder,
@ -508,6 +509,10 @@ func (ot *objectTree) Close() error {
return nil
}
func (ot *objectTree) Delete() error {
return nil
}
func (ot *objectTree) SnapshotPath() []string {
// TODO: Add error as return parameter
if ot.snapshotPathIsActual() {

View File

@ -116,7 +116,7 @@ func prepareTreeContext(t *testing.T, aclList list.ACLList) testTreeContext {
treeStorage := changeCreator.createNewTreeStorage("0", aclList.Head().Id)
root, _ := treeStorage.Root()
changeBuilder := &mockChangeBuilder{
originalBuilder: newChangeBuilder(nil, root),
originalBuilder: NewChangeBuilder(nil, root),
}
deps := objectTreeDeps{
changeBuilder: changeBuilder,

View File

@ -71,7 +71,7 @@ func createObjectTree(
Seed: seed,
}
_, raw, err := newChangeBuilder(common.NewKeychain(), nil).BuildInitialContent(cnt)
_, raw, err := NewChangeBuilder(common.NewKeychain(), nil).BuildInitialContent(cnt)
if err != nil {
return
}

View File

@ -1,6 +1,7 @@
package storage
import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"strings"
)
@ -25,6 +26,7 @@ func (a aclKeys) RawRecordKey(id string) []byte {
type treeKeys struct {
id string
prefix string
headsKey []byte
}
@ -32,6 +34,7 @@ func newTreeKeys(id string) treeKeys {
return treeKeys{
id: id,
headsKey: storage.JoinStringsToBytes("t", id, "heads"),
prefix: fmt.Sprintf("t/%s", id),
}
}
@ -43,6 +46,10 @@ func (t treeKeys) RawChangeKey(id string) []byte {
return storage.JoinStringsToBytes("t", t.id, id)
}
func (t treeKeys) isTreeRecordKey(key string) bool {
return strings.HasPrefix(key, t.prefix) && !strings.HasSuffix(key, "/heads")
}
type spaceKeys struct {
headerKey []byte
}

View File

@ -135,3 +135,36 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
func (t *treeStorage) HasChange(ctx context.Context, id string) (bool, error) {
return t.db.Has(t.keys.RawChangeKey(id))
}
func (t *treeStorage) Delete() (err error) {
storedKeys, err := t.storedKeys()
if err != nil {
return
}
for _, k := range storedKeys {
err = t.db.Delete(k)
if err != nil {
return
}
}
return
}
func (t *treeStorage) storedKeys() (keys [][]byte, err error) {
index := t.db.Items()
key, _, err := index.Next()
for err == nil {
strKey := string(key)
if t.keys.isTreeRecordKey(strKey) {
keys = append(keys, key)
}
key, _, err = index.Next()
}
if err != pogreb.ErrIterationDone {
return
}
err = nil
return
}