Change pogreb implementation

This commit is contained in:
mcrakhman 2022-10-15 16:26:03 +02:00 committed by Mikhail Iudin
parent ff2baaeb79
commit ba7beb4d5a
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
11 changed files with 289 additions and 100 deletions

View File

@ -133,6 +133,20 @@ func (mr *MockSpaceStorageMockRecorder) ACLStorage() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ACLStorage", reflect.TypeOf((*MockSpaceStorage)(nil).ACLStorage))
}
// Close mocks base method.
func (m *MockSpaceStorage) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockSpaceStorageMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSpaceStorage)(nil).Close))
}
// CreateTreeStorage mocks base method.
func (m *MockSpaceStorage) CreateTreeStorage(arg0 storage0.TreeStorageCreatePayload) (storage0.TreeStorage, error) {
m.ctrl.T.Helper()

View File

@ -5,6 +5,24 @@ import (
"strings"
)
type aclKeys struct {
}
var aclHeadIdKey = []byte("a/headId")
var aclRootIdKey = []byte("a/rootId")
func (a aclKeys) HeadIdKey() []byte {
return aclHeadIdKey
}
func (a aclKeys) RootIdKey() []byte {
return aclRootIdKey
}
func (a aclKeys) RawRecordKey(id string) []byte {
return joinStringsToBytes("a", id)
}
type treeKeys struct {
id string
}
@ -13,8 +31,8 @@ func (t treeKeys) HeadsKey() []byte {
return joinStringsToBytes("t", t.id, "heads")
}
func (t treeKeys) RootKey() []byte {
return joinStringsToBytes("t", t.id)
func (t treeKeys) RootIdKey() []byte {
return joinStringsToBytes("t", t.id, "rootId")
}
func (t treeKeys) RawChangeKey(id string) []byte {
@ -22,21 +40,25 @@ func (t treeKeys) RawChangeKey(id string) []byte {
}
type spaceKeys struct {
headerKey []byte
}
var headerKey = []byte("header")
var aclKey = []byte("acl")
func newSpaceKeys(spaceId string) spaceKeys {
return spaceKeys{headerKey: joinStringsToBytes("s", spaceId)}
}
var spaceIdKey = []byte("spaceId")
func (s spaceKeys) SpaceIdKey() []byte {
return spaceIdKey
}
func (s spaceKeys) HeaderKey() []byte {
return headerKey
return s.headerKey
}
func (s spaceKeys) ACLKey() []byte {
return aclKey
}
func isRootKey(key string) bool {
return strings.HasPrefix(key, "t/") && strings.Count(key, "/") == 2
func isRootIdKey(key string) bool {
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "rootId")
}
func joinStringsToBytes(strs ...string) []byte {

135
node/storage/liststorage.go Normal file
View File

@ -0,0 +1,135 @@
package storage
import (
"context"
"github.com/akrylysov/pogreb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
)
type listStorage struct {
db *pogreb.DB
keys aclKeys
id string
root *aclrecordproto.RawACLRecordWithId
}
func newListStorage(db *pogreb.DB) (ls storage.ListStorage, err error) {
keys := aclKeys{}
head, err := db.Get(keys.HeadIdKey())
if err != nil {
return
}
if head == nil {
err = storage.ErrUnknownACLId
return
}
rootId, err := db.Get(keys.RootIdKey())
if err != nil {
return
}
if rootId == nil {
err = storage.ErrUnknownACLId
return
}
root, err := db.Get(keys.RawRecordKey(string(rootId)))
if err != nil {
return
}
if root == nil {
err = storage.ErrUnknownACLId
return
}
rootWithId := &aclrecordproto.RawACLRecordWithId{
Payload: root,
Id: string(rootId),
}
ls = &listStorage{
db: db,
keys: aclKeys{},
id: string(rootId),
root: rootWithId,
}
return
}
func createListStorage(db *pogreb.DB, root *aclrecordproto.RawACLRecordWithId) (ls storage.ListStorage, err error) {
keys := aclKeys{}
has, err := db.Has(keys.RootIdKey())
if err != nil {
return
}
if has {
err = storage.ErrACLExists
return
}
err = db.Put(keys.HeadIdKey(), []byte(root.Id))
if err != nil {
return
}
err = db.Put(keys.RawRecordKey(root.Id), root.Payload)
if err != nil {
return
}
err = db.Put(keys.RootIdKey(), []byte(root.Id))
if err != nil {
return
}
ls = &listStorage{
db: db,
keys: aclKeys{},
id: root.Id,
root: root,
}
return
}
func (l *listStorage) ID() (string, error) {
return l.id, nil
}
func (l *listStorage) Root() (*aclrecordproto.RawACLRecordWithId, error) {
return l.root, nil
}
func (l *listStorage) Head() (head string, err error) {
bytes, err := l.db.Get(l.keys.HeadIdKey())
if err != nil {
return
}
if bytes == nil {
err = storage.ErrUnknownACLId
return
}
head = string(bytes)
return
}
func (l *listStorage) GetRawRecord(ctx context.Context, id string) (raw *aclrecordproto.RawACLRecordWithId, err error) {
res, err := l.db.Get(l.keys.RawRecordKey(id))
if err != nil {
return
}
if res == nil {
err = storage.ErrUnknownRecord
return
}
raw = &aclrecordproto.RawACLRecordWithId{
Payload: res,
Id: id,
}
return
}
func (l *listStorage) AddRawRecord(ctx context.Context, rec *aclrecordproto.RawACLRecordWithId) error {
return l.db.Put([]byte(rec.Id), rec.Payload)
}

View File

@ -5,7 +5,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/gogo/protobuf/proto"
"path"
"sync"
"time"
@ -16,9 +15,11 @@ var defPogrebOptions = &pogreb.Options{
}
type spaceStorage struct {
objDb *pogreb.DB
keys spaceKeys
mx sync.Mutex
objDb *pogreb.DB
keys spaceKeys
aclStorage storage.ListStorage
header *spacesyncproto.RawSpaceHeaderWithId
mx sync.Mutex
}
func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceStorage, err error) {
@ -27,8 +28,15 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
if err != nil {
return
}
keys := spaceKeys{}
has, err := objDb.Has(keys.HeaderKey())
defer func() {
if err != nil {
objDb.Close()
}
}()
keys := newSpaceKeys(spaceId)
has, err := objDb.Has(keys.SpaceIdKey())
if err != nil {
return
}
@ -37,18 +45,28 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
return
}
has, err = objDb.Has(keys.ACLKey())
header, err := objDb.Get(keys.HeaderKey())
if err != nil {
return
}
if !has {
if header == nil {
err = spacestorage.ErrSpaceStorageMissing
return
}
aclStorage, err := newListStorage(objDb)
if err != nil {
return
}
store = &spaceStorage{
objDb: objDb,
keys: keys,
header: &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: header,
Id: spaceId,
},
aclStorage: aclStorage,
}
return
}
@ -66,37 +84,36 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate
}
}()
keys := spaceKeys{}
has, err := db.Has(keys.HeaderKey())
keys := newSpaceKeys(payload.SpaceHeaderWithId.Id)
has, err := db.Has(keys.SpaceIdKey())
if err != nil {
return
}
if has {
err = spacestorage.ErrSpaceStorageExists
err = spacesyncproto.ErrSpaceExists
return
}
marshalledRec, err := payload.RecWithId.Marshal()
if err != nil {
return
}
err = db.Put(keys.ACLKey(), marshalledRec)
aclStorage, err := createListStorage(db, payload.RecWithId)
if err != nil {
return
}
marshalledHeader, err := payload.SpaceHeaderWithId.Marshal()
err = db.Put(keys.HeaderKey(), payload.SpaceHeaderWithId.RawHeader)
if err != nil {
return
}
err = db.Put(keys.HeaderKey(), marshalledHeader)
err = db.Put(keys.SpaceIdKey(), []byte(payload.SpaceHeaderWithId.Id))
if err != nil {
return
}
store = &spaceStorage{
objDb: db,
keys: keys,
objDb: db,
keys: keys,
aclStorage: aclStorage,
header: payload.SpaceHeaderWithId,
}
return
}
@ -106,47 +123,31 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) {
}
func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
// we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation
s.mx.Lock()
defer s.mx.Unlock()
treeKeys := treeKeys{payload.TreeId}
has, err := s.objDb.Has(treeKeys.RootKey())
if err != nil {
return
}
if has {
err = spacestorage.ErrSpaceStorageExists
return
}
return createTreeStorage(s.objDb, payload)
}
func (s *spaceStorage) ACLStorage() (storage.ListStorage, error) {
return nil, nil
return s.aclStorage, nil
}
func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithId, err error) {
res, err := s.objDb.Get(s.keys.HeaderKey())
if err != nil {
return
}
header = &spacesyncproto.RawSpaceHeaderWithId{}
err = proto.Unmarshal(res, header)
return
return s.header, nil
}
func (s *spaceStorage) StoredIds() (ids []string, err error) {
index := s.objDb.Items()
_, value, err := index.Next()
key, val, err := index.Next()
for err == nil {
strVal := string(value)
if isRootKey(strVal) {
ids = append(ids, string(value))
strKey := string(key)
if isRootIdKey(strKey) {
ids = append(ids, string(val))
}
_, value, err = index.Next()
key, val, err = index.Next()
}
if err != pogreb.ErrIterationDone {

View File

@ -6,24 +6,28 @@ import (
"github.com/akrylysov/pogreb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"github.com/gogo/protobuf/proto"
"strings"
"sync"
)
type treeStorage struct {
db *pogreb.DB
keys treeKeys
id string
rootKey []byte
headsKey []byte
heads []string
root *treechangeproto.RawTreeChangeWithId
headsMx sync.Mutex
}
func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err error) {
keys := treeKeys{treeId}
has, err := db.Has(keys.RootIdKey())
if err != nil {
return
}
if !has {
err = storage.ErrUnknownTreeId
return
}
heads, err := db.Get(keys.HeadsKey())
if err != nil {
return
@ -33,17 +37,19 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err e
return
}
res, err := db.Get(keys.RootKey())
root, err := db.Get(keys.RawChangeKey(treeId))
if err != nil {
return
}
if res == nil {
if root == nil {
err = storage.ErrUnknownTreeId
return
}
root := &treechangeproto.RawTreeChangeWithId{}
err = proto.Unmarshal(res, root)
rootWithId := &treechangeproto.RawTreeChangeWithId{
RawChange: root,
Id: treeId,
}
if err != nil {
return
}
@ -51,23 +57,21 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err e
ts = &treeStorage{
db: db,
keys: keys,
rootKey: keys.RootKey(),
headsKey: keys.HeadsKey(),
id: treeId,
heads: parseHeads(heads),
root: root,
root: rootWithId,
}
return
}
func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
keys := treeKeys{id: payload.TreeId}
has, err := db.Has(keys.RootKey())
has, err := db.Has(keys.RootIdKey())
if err != nil {
return
}
if !has {
err = storage.ErrUnknownTreeId
if has {
err = storage.ErrTreeExists
return
}
@ -80,18 +84,17 @@ func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload)
}
}
err = db.Put(keys.RawChangeKey(payload.RootRawChange.Id), payload.RootRawChange.GetRawChange())
if err != nil {
return
}
err = db.Put(keys.HeadsKey(), heads)
if err != nil {
return
}
// duplicating same change in raw changes
err = db.Put(keys.RawChangeKey(payload.TreeId), payload.RootRawChange.GetRawChange())
if err != nil {
return
}
err = db.Put(keys.RootKey(), payload.RootRawChange.GetRawChange())
err = db.Put(keys.RootIdKey(), []byte(payload.RootRawChange.Id))
if err != nil {
return
}
@ -99,10 +102,8 @@ func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload)
ts = &treeStorage{
db: db,
keys: keys,
rootKey: keys.RootKey(),
headsKey: keys.HeadsKey(),
id: payload.TreeId,
heads: payload.Heads,
id: payload.RootRawChange.Id,
root: payload.RootRawChange,
}
return
@ -116,20 +117,20 @@ func (t *treeStorage) Root() (raw *treechangeproto.RawTreeChangeWithId, err erro
return t.root, nil
}
func (t *treeStorage) Heads() ([]string, error) {
t.headsMx.Lock()
defer t.headsMx.Unlock()
return t.heads, nil
func (t *treeStorage) Heads() (heads []string, err error) {
headsBytes, err := t.db.Get(t.keys.HeadsKey())
if err != nil {
return
}
if heads == nil {
err = storage.ErrUnknownTreeId
return
}
heads = parseHeads(headsBytes)
return
}
func (t *treeStorage) SetHeads(heads []string) (err error) {
t.headsMx.Lock()
defer t.headsMx.Unlock()
defer func() {
if err == nil {
t.heads = heads
}
}()
payload := createHeadsPayload(heads)
return t.db.Put(t.headsKey, payload)
}
@ -143,6 +144,9 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
if err != nil {
return
}
if res == nil {
err = storage.ErrUnkownChange
}
raw = &treechangeproto.RawTreeChangeWithId{
RawChange: res,

View File

@ -67,6 +67,7 @@ func BuildACLList(storage storage.ListStorage) (ACLList, error) {
}
func build(id string, stateBuilder *aclStateBuilder, recBuilder ACLRecordBuilder, storage storage.ListStorage) (list ACLList, err error) {
// TODO: need to add context here
rootWithId, err := storage.Root()
if err != nil {
return
@ -76,7 +77,12 @@ func build(id string, stateBuilder *aclStateBuilder, recBuilder ACLRecordBuilder
return
}
rawRecordWithId, err := storage.Head()
head, err := storage.Head()
if err != nil {
return
}
rawRecordWithId, err := storage.GetRawRecord(context.Background(), head)
if err != nil {
return
}

View File

@ -31,10 +31,10 @@ func (i *inMemoryACLListStorage) Root() (*aclrecordproto.RawACLRecordWithId, err
return i.records[0], nil
}
func (i *inMemoryACLListStorage) Head() (*aclrecordproto.RawACLRecordWithId, error) {
func (i *inMemoryACLListStorage) Head() (string, error) {
i.RLock()
defer i.RUnlock()
return i.records[len(i.records)-1], nil
return i.records[len(i.records)-1].Id, nil
}
func (i *inMemoryACLListStorage) GetRawRecord(ctx context.Context, id string) (*aclrecordproto.RawACLRecordWithId, error) {

View File

@ -3,13 +3,18 @@ package storage
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto"
)
var ErrUnknownACLId = errors.New("acl does not exist")
var ErrACLExists = errors.New("acl already exists")
var ErrUnknownRecord = errors.New("record doesn't exist")
type ListStorage interface {
Storage
Root() (*aclrecordproto.RawACLRecordWithId, error)
Head() (*aclrecordproto.RawACLRecordWithId, error)
Head() (string, error)
GetRawRecord(ctx context.Context, id string) (*aclrecordproto.RawACLRecordWithId, error)
AddRawRecord(ctx context.Context, rec *aclrecordproto.RawACLRecordWithId) error

View File

@ -66,10 +66,10 @@ func (mr *MockListStorageMockRecorder) GetRawRecord(arg0, arg1 interface{}) *gom
}
// Head mocks base method.
func (m *MockListStorage) Head() (*aclrecordproto.RawACLRecordWithId, error) {
func (m *MockListStorage) Head() (string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Head")
ret0, _ := ret[0].(*aclrecordproto.RawACLRecordWithId)
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
}

View File

@ -6,6 +6,8 @@ import (
)
var ErrUnknownTreeId = errors.New("tree does not exist")
var ErrTreeExists = errors.New("tree already exists")
var ErrUnkownChange = errors.New("change doesn't exist")
type TreeStorageCreatePayload struct {
TreeId string

View File

@ -89,12 +89,12 @@ func (t *ACLListStorageBuilder) createRaw(rec proto.Marshaler, identity []byte)
}
}
func (t *ACLListStorageBuilder) Head() (*aclrecordproto.RawACLRecordWithId, error) {
func (t *ACLListStorageBuilder) Head() (string, error) {
l := len(t.records)
if l > 0 {
return t.rawRecords[l-1], nil
return t.rawRecords[l-1].Id, nil
}
return t.rawRoot, nil
return t.rawRoot.Id, nil
}
func (t *ACLListStorageBuilder) Root() (*aclrecordproto.RawACLRecordWithId, error) {