Change pogreb implementation

This commit is contained in:
mcrakhman 2022-10-15 16:26:03 +02:00
parent a1aacbffed
commit 3facea9a06
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
11 changed files with 289 additions and 100 deletions

View File

@ -133,6 +133,20 @@ func (mr *MockSpaceStorageMockRecorder) ACLStorage() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ACLStorage", reflect.TypeOf((*MockSpaceStorage)(nil).ACLStorage)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ACLStorage", reflect.TypeOf((*MockSpaceStorage)(nil).ACLStorage))
} }
// Close mocks base method.
func (m *MockSpaceStorage) Close() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Close")
ret0, _ := ret[0].(error)
return ret0
}
// Close indicates an expected call of Close.
func (mr *MockSpaceStorageMockRecorder) Close() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSpaceStorage)(nil).Close))
}
// CreateTreeStorage mocks base method. // CreateTreeStorage mocks base method.
func (m *MockSpaceStorage) CreateTreeStorage(arg0 storage0.TreeStorageCreatePayload) (storage0.TreeStorage, error) { func (m *MockSpaceStorage) CreateTreeStorage(arg0 storage0.TreeStorageCreatePayload) (storage0.TreeStorage, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -5,6 +5,24 @@ import (
"strings" "strings"
) )
type aclKeys struct {
}
var aclHeadIdKey = []byte("a/headId")
var aclRootIdKey = []byte("a/rootId")
func (a aclKeys) HeadIdKey() []byte {
return aclHeadIdKey
}
func (a aclKeys) RootIdKey() []byte {
return aclRootIdKey
}
func (a aclKeys) RawRecordKey(id string) []byte {
return joinStringsToBytes("a", id)
}
type treeKeys struct { type treeKeys struct {
id string id string
} }
@ -13,8 +31,8 @@ func (t treeKeys) HeadsKey() []byte {
return joinStringsToBytes("t", t.id, "heads") return joinStringsToBytes("t", t.id, "heads")
} }
func (t treeKeys) RootKey() []byte { func (t treeKeys) RootIdKey() []byte {
return joinStringsToBytes("t", t.id) return joinStringsToBytes("t", t.id, "rootId")
} }
func (t treeKeys) RawChangeKey(id string) []byte { func (t treeKeys) RawChangeKey(id string) []byte {
@ -22,21 +40,25 @@ func (t treeKeys) RawChangeKey(id string) []byte {
} }
type spaceKeys struct { type spaceKeys struct {
headerKey []byte
} }
var headerKey = []byte("header") func newSpaceKeys(spaceId string) spaceKeys {
var aclKey = []byte("acl") return spaceKeys{headerKey: joinStringsToBytes("s", spaceId)}
}
var spaceIdKey = []byte("spaceId")
func (s spaceKeys) SpaceIdKey() []byte {
return spaceIdKey
}
func (s spaceKeys) HeaderKey() []byte { func (s spaceKeys) HeaderKey() []byte {
return headerKey return s.headerKey
} }
func (s spaceKeys) ACLKey() []byte { func isRootIdKey(key string) bool {
return aclKey return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "rootId")
}
func isRootKey(key string) bool {
return strings.HasPrefix(key, "t/") && strings.Count(key, "/") == 2
} }
func joinStringsToBytes(strs ...string) []byte { func joinStringsToBytes(strs ...string) []byte {

135
node/storage/liststorage.go Normal file
View File

@ -0,0 +1,135 @@
package storage
import (
"context"
"github.com/akrylysov/pogreb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
)
type listStorage struct {
db *pogreb.DB
keys aclKeys
id string
root *aclrecordproto.RawACLRecordWithId
}
func newListStorage(db *pogreb.DB) (ls storage.ListStorage, err error) {
keys := aclKeys{}
head, err := db.Get(keys.HeadIdKey())
if err != nil {
return
}
if head == nil {
err = storage.ErrUnknownACLId
return
}
rootId, err := db.Get(keys.RootIdKey())
if err != nil {
return
}
if rootId == nil {
err = storage.ErrUnknownACLId
return
}
root, err := db.Get(keys.RawRecordKey(string(rootId)))
if err != nil {
return
}
if root == nil {
err = storage.ErrUnknownACLId
return
}
rootWithId := &aclrecordproto.RawACLRecordWithId{
Payload: root,
Id: string(rootId),
}
ls = &listStorage{
db: db,
keys: aclKeys{},
id: string(rootId),
root: rootWithId,
}
return
}
func createListStorage(db *pogreb.DB, root *aclrecordproto.RawACLRecordWithId) (ls storage.ListStorage, err error) {
keys := aclKeys{}
has, err := db.Has(keys.RootIdKey())
if err != nil {
return
}
if has {
err = storage.ErrACLExists
return
}
err = db.Put(keys.HeadIdKey(), []byte(root.Id))
if err != nil {
return
}
err = db.Put(keys.RawRecordKey(root.Id), root.Payload)
if err != nil {
return
}
err = db.Put(keys.RootIdKey(), []byte(root.Id))
if err != nil {
return
}
ls = &listStorage{
db: db,
keys: aclKeys{},
id: root.Id,
root: root,
}
return
}
func (l *listStorage) ID() (string, error) {
return l.id, nil
}
func (l *listStorage) Root() (*aclrecordproto.RawACLRecordWithId, error) {
return l.root, nil
}
func (l *listStorage) Head() (head string, err error) {
bytes, err := l.db.Get(l.keys.HeadIdKey())
if err != nil {
return
}
if bytes == nil {
err = storage.ErrUnknownACLId
return
}
head = string(bytes)
return
}
func (l *listStorage) GetRawRecord(ctx context.Context, id string) (raw *aclrecordproto.RawACLRecordWithId, err error) {
res, err := l.db.Get(l.keys.RawRecordKey(id))
if err != nil {
return
}
if res == nil {
err = storage.ErrUnknownRecord
return
}
raw = &aclrecordproto.RawACLRecordWithId{
Payload: res,
Id: id,
}
return
}
func (l *listStorage) AddRawRecord(ctx context.Context, rec *aclrecordproto.RawACLRecordWithId) error {
return l.db.Put([]byte(rec.Id), rec.Payload)
}

View File

@ -5,7 +5,6 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/gogo/protobuf/proto"
"path" "path"
"sync" "sync"
"time" "time"
@ -18,6 +17,8 @@ var defPogrebOptions = &pogreb.Options{
type spaceStorage struct { type spaceStorage struct {
objDb *pogreb.DB objDb *pogreb.DB
keys spaceKeys keys spaceKeys
aclStorage storage.ListStorage
header *spacesyncproto.RawSpaceHeaderWithId
mx sync.Mutex mx sync.Mutex
} }
@ -27,8 +28,15 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
if err != nil { if err != nil {
return return
} }
keys := spaceKeys{}
has, err := objDb.Has(keys.HeaderKey()) defer func() {
if err != nil {
objDb.Close()
}
}()
keys := newSpaceKeys(spaceId)
has, err := objDb.Has(keys.SpaceIdKey())
if err != nil { if err != nil {
return return
} }
@ -37,18 +45,28 @@ func newSpaceStorage(rootPath string, spaceId string) (store spacestorage.SpaceS
return return
} }
has, err = objDb.Has(keys.ACLKey()) header, err := objDb.Get(keys.HeaderKey())
if err != nil { if err != nil {
return return
} }
if !has { if header == nil {
err = spacestorage.ErrSpaceStorageMissing err = spacestorage.ErrSpaceStorageMissing
return return
} }
aclStorage, err := newListStorage(objDb)
if err != nil {
return
}
store = &spaceStorage{ store = &spaceStorage{
objDb: objDb, objDb: objDb,
keys: keys, keys: keys,
header: &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: header,
Id: spaceId,
},
aclStorage: aclStorage,
} }
return return
} }
@ -66,30 +84,27 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate
} }
}() }()
keys := spaceKeys{} keys := newSpaceKeys(payload.SpaceHeaderWithId.Id)
has, err := db.Has(keys.HeaderKey()) has, err := db.Has(keys.SpaceIdKey())
if err != nil { if err != nil {
return return
} }
if has { if has {
err = spacestorage.ErrSpaceStorageExists err = spacesyncproto.ErrSpaceExists
return return
} }
marshalledRec, err := payload.RecWithId.Marshal() aclStorage, err := createListStorage(db, payload.RecWithId)
if err != nil {
return
}
err = db.Put(keys.ACLKey(), marshalledRec)
if err != nil { if err != nil {
return return
} }
marshalledHeader, err := payload.SpaceHeaderWithId.Marshal() err = db.Put(keys.HeaderKey(), payload.SpaceHeaderWithId.RawHeader)
if err != nil { if err != nil {
return return
} }
err = db.Put(keys.HeaderKey(), marshalledHeader)
err = db.Put(keys.SpaceIdKey(), []byte(payload.SpaceHeaderWithId.Id))
if err != nil { if err != nil {
return return
} }
@ -97,6 +112,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate
store = &spaceStorage{ store = &spaceStorage{
objDb: db, objDb: db,
keys: keys, keys: keys,
aclStorage: aclStorage,
header: payload.SpaceHeaderWithId,
} }
return return
} }
@ -106,47 +123,31 @@ func (s *spaceStorage) TreeStorage(id string) (storage.TreeStorage, error) {
} }
func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { func (s *spaceStorage) CreateTreeStorage(payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
// we have mutex here, so we prevent overwriting the heads of a tree on concurrent creation
s.mx.Lock() s.mx.Lock()
defer s.mx.Unlock() defer s.mx.Unlock()
treeKeys := treeKeys{payload.TreeId}
has, err := s.objDb.Has(treeKeys.RootKey())
if err != nil {
return
}
if has {
err = spacestorage.ErrSpaceStorageExists
return
}
return createTreeStorage(s.objDb, payload) return createTreeStorage(s.objDb, payload)
} }
func (s *spaceStorage) ACLStorage() (storage.ListStorage, error) { func (s *spaceStorage) ACLStorage() (storage.ListStorage, error) {
return nil, nil return s.aclStorage, nil
} }
func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithId, err error) { func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithId, err error) {
res, err := s.objDb.Get(s.keys.HeaderKey()) return s.header, nil
if err != nil {
return
}
header = &spacesyncproto.RawSpaceHeaderWithId{}
err = proto.Unmarshal(res, header)
return
} }
func (s *spaceStorage) StoredIds() (ids []string, err error) { func (s *spaceStorage) StoredIds() (ids []string, err error) {
index := s.objDb.Items() index := s.objDb.Items()
_, value, err := index.Next() key, val, err := index.Next()
for err == nil { for err == nil {
strVal := string(value) strKey := string(key)
if isRootKey(strVal) { if isRootIdKey(strKey) {
ids = append(ids, string(value)) ids = append(ids, string(val))
} }
_, value, err = index.Next() key, val, err = index.Next()
} }
if err != pogreb.ErrIterationDone { if err != pogreb.ErrIterationDone {

View File

@ -6,24 +6,28 @@ import (
"github.com/akrylysov/pogreb" "github.com/akrylysov/pogreb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/treechangeproto"
"github.com/gogo/protobuf/proto"
"strings" "strings"
"sync"
) )
type treeStorage struct { type treeStorage struct {
db *pogreb.DB db *pogreb.DB
keys treeKeys keys treeKeys
id string id string
rootKey []byte
headsKey []byte headsKey []byte
heads []string
root *treechangeproto.RawTreeChangeWithId root *treechangeproto.RawTreeChangeWithId
headsMx sync.Mutex
} }
func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err error) { func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err error) {
keys := treeKeys{treeId} keys := treeKeys{treeId}
has, err := db.Has(keys.RootIdKey())
if err != nil {
return
}
if !has {
err = storage.ErrUnknownTreeId
return
}
heads, err := db.Get(keys.HeadsKey()) heads, err := db.Get(keys.HeadsKey())
if err != nil { if err != nil {
return return
@ -33,17 +37,19 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err e
return return
} }
res, err := db.Get(keys.RootKey()) root, err := db.Get(keys.RawChangeKey(treeId))
if err != nil { if err != nil {
return return
} }
if res == nil { if root == nil {
err = storage.ErrUnknownTreeId err = storage.ErrUnknownTreeId
return return
} }
root := &treechangeproto.RawTreeChangeWithId{} rootWithId := &treechangeproto.RawTreeChangeWithId{
err = proto.Unmarshal(res, root) RawChange: root,
Id: treeId,
}
if err != nil { if err != nil {
return return
} }
@ -51,23 +57,21 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err e
ts = &treeStorage{ ts = &treeStorage{
db: db, db: db,
keys: keys, keys: keys,
rootKey: keys.RootKey(),
headsKey: keys.HeadsKey(), headsKey: keys.HeadsKey(),
id: treeId, id: treeId,
heads: parseHeads(heads), root: rootWithId,
root: root,
} }
return return
} }
func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
keys := treeKeys{id: payload.TreeId} keys := treeKeys{id: payload.TreeId}
has, err := db.Has(keys.RootKey()) has, err := db.Has(keys.RootIdKey())
if err != nil { if err != nil {
return return
} }
if !has { if has {
err = storage.ErrUnknownTreeId err = storage.ErrTreeExists
return return
} }
@ -80,18 +84,17 @@ func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload)
} }
} }
err = db.Put(keys.RawChangeKey(payload.RootRawChange.Id), payload.RootRawChange.GetRawChange())
if err != nil {
return
}
err = db.Put(keys.HeadsKey(), heads) err = db.Put(keys.HeadsKey(), heads)
if err != nil { if err != nil {
return return
} }
// duplicating same change in raw changes err = db.Put(keys.RootIdKey(), []byte(payload.RootRawChange.Id))
err = db.Put(keys.RawChangeKey(payload.TreeId), payload.RootRawChange.GetRawChange())
if err != nil {
return
}
err = db.Put(keys.RootKey(), payload.RootRawChange.GetRawChange())
if err != nil { if err != nil {
return return
} }
@ -99,10 +102,8 @@ func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload)
ts = &treeStorage{ ts = &treeStorage{
db: db, db: db,
keys: keys, keys: keys,
rootKey: keys.RootKey(),
headsKey: keys.HeadsKey(), headsKey: keys.HeadsKey(),
id: payload.TreeId, id: payload.RootRawChange.Id,
heads: payload.Heads,
root: payload.RootRawChange, root: payload.RootRawChange,
} }
return return
@ -116,20 +117,20 @@ func (t *treeStorage) Root() (raw *treechangeproto.RawTreeChangeWithId, err erro
return t.root, nil return t.root, nil
} }
func (t *treeStorage) Heads() ([]string, error) { func (t *treeStorage) Heads() (heads []string, err error) {
t.headsMx.Lock() headsBytes, err := t.db.Get(t.keys.HeadsKey())
defer t.headsMx.Unlock() if err != nil {
return t.heads, nil return
}
if heads == nil {
err = storage.ErrUnknownTreeId
return
}
heads = parseHeads(headsBytes)
return
} }
func (t *treeStorage) SetHeads(heads []string) (err error) { func (t *treeStorage) SetHeads(heads []string) (err error) {
t.headsMx.Lock()
defer t.headsMx.Unlock()
defer func() {
if err == nil {
t.heads = heads
}
}()
payload := createHeadsPayload(heads) payload := createHeadsPayload(heads)
return t.db.Put(t.headsKey, payload) return t.db.Put(t.headsKey, payload)
} }
@ -143,6 +144,9 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
if err != nil { if err != nil {
return return
} }
if res == nil {
err = storage.ErrUnkownChange
}
raw = &treechangeproto.RawTreeChangeWithId{ raw = &treechangeproto.RawTreeChangeWithId{
RawChange: res, RawChange: res,

View File

@ -67,6 +67,7 @@ func BuildACLList(storage storage.ListStorage) (ACLList, error) {
} }
func build(id string, stateBuilder *aclStateBuilder, recBuilder ACLRecordBuilder, storage storage.ListStorage) (list ACLList, err error) { func build(id string, stateBuilder *aclStateBuilder, recBuilder ACLRecordBuilder, storage storage.ListStorage) (list ACLList, err error) {
// TODO: need to add context here
rootWithId, err := storage.Root() rootWithId, err := storage.Root()
if err != nil { if err != nil {
return return
@ -76,7 +77,12 @@ func build(id string, stateBuilder *aclStateBuilder, recBuilder ACLRecordBuilder
return return
} }
rawRecordWithId, err := storage.Head() head, err := storage.Head()
if err != nil {
return
}
rawRecordWithId, err := storage.GetRawRecord(context.Background(), head)
if err != nil { if err != nil {
return return
} }

View File

@ -31,10 +31,10 @@ func (i *inMemoryACLListStorage) Root() (*aclrecordproto.RawACLRecordWithId, err
return i.records[0], nil return i.records[0], nil
} }
func (i *inMemoryACLListStorage) Head() (*aclrecordproto.RawACLRecordWithId, error) { func (i *inMemoryACLListStorage) Head() (string, error) {
i.RLock() i.RLock()
defer i.RUnlock() defer i.RUnlock()
return i.records[len(i.records)-1], nil return i.records[len(i.records)-1].Id, nil
} }
func (i *inMemoryACLListStorage) GetRawRecord(ctx context.Context, id string) (*aclrecordproto.RawACLRecordWithId, error) { func (i *inMemoryACLListStorage) GetRawRecord(ctx context.Context, id string) (*aclrecordproto.RawACLRecordWithId, error) {

View File

@ -3,13 +3,18 @@ package storage
import ( import (
"context" "context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/acl/aclrecordproto"
) )
var ErrUnknownACLId = errors.New("acl does not exist")
var ErrACLExists = errors.New("acl already exists")
var ErrUnknownRecord = errors.New("record doesn't exist")
type ListStorage interface { type ListStorage interface {
Storage Storage
Root() (*aclrecordproto.RawACLRecordWithId, error) Root() (*aclrecordproto.RawACLRecordWithId, error)
Head() (*aclrecordproto.RawACLRecordWithId, error) Head() (string, error)
GetRawRecord(ctx context.Context, id string) (*aclrecordproto.RawACLRecordWithId, error) GetRawRecord(ctx context.Context, id string) (*aclrecordproto.RawACLRecordWithId, error)
AddRawRecord(ctx context.Context, rec *aclrecordproto.RawACLRecordWithId) error AddRawRecord(ctx context.Context, rec *aclrecordproto.RawACLRecordWithId) error

View File

@ -66,10 +66,10 @@ func (mr *MockListStorageMockRecorder) GetRawRecord(arg0, arg1 interface{}) *gom
} }
// Head mocks base method. // Head mocks base method.
func (m *MockListStorage) Head() (*aclrecordproto.RawACLRecordWithId, error) { func (m *MockListStorage) Head() (string, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Head") ret := m.ctrl.Call(m, "Head")
ret0, _ := ret[0].(*aclrecordproto.RawACLRecordWithId) ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }

View File

@ -6,6 +6,8 @@ import (
) )
var ErrUnknownTreeId = errors.New("tree does not exist") var ErrUnknownTreeId = errors.New("tree does not exist")
var ErrTreeExists = errors.New("tree already exists")
var ErrUnkownChange = errors.New("change doesn't exist")
type TreeStorageCreatePayload struct { type TreeStorageCreatePayload struct {
TreeId string TreeId string

View File

@ -89,12 +89,12 @@ func (t *ACLListStorageBuilder) createRaw(rec proto.Marshaler, identity []byte)
} }
} }
func (t *ACLListStorageBuilder) Head() (*aclrecordproto.RawACLRecordWithId, error) { func (t *ACLListStorageBuilder) Head() (string, error) {
l := len(t.records) l := len(t.records)
if l > 0 { if l > 0 {
return t.rawRecords[l-1], nil return t.rawRecords[l-1].Id, nil
} }
return t.rawRoot, nil return t.rawRoot.Id, nil
} }
func (t *ACLListStorageBuilder) Root() (*aclrecordproto.RawACLRecordWithId, error) { func (t *ACLListStorageBuilder) Root() (*aclrecordproto.RawACLRecordWithId, error) {