Merge pull request #15 from anytypeio/streampool-test

This commit is contained in:
Mikhail Rakhmanov 2022-10-27 16:58:01 +02:00 committed by GitHub
commit 675885d875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1083 additions and 129 deletions

View File

@ -37,7 +37,7 @@ func newListStorage(spaceId string, db *badger.DB, txn *badger.Txn) (ls storage.
ls = &listStorage{
db: db,
keys: newACLKeys(spaceId),
keys: keys,
id: stringId,
root: rootWithId,
}
@ -70,15 +70,15 @@ func createListStorage(spaceId string, db *badger.DB, txn *badger.Txn, root *acl
ls = &listStorage{
db: db,
keys: newACLKeys(spaceId),
keys: keys,
id: root.Id,
root: root,
}
return
}
func (l *listStorage) ID() (string, error) {
return l.id, nil
func (l *listStorage) ID() string {
return l.id
}
func (l *listStorage) Root() (*aclrecordproto.RawACLRecordWithId, error) {

View File

@ -0,0 +1,72 @@
package storage
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/dgraph-io/badger/v3"
"github.com/stretchr/testify/require"
"testing"
)
func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) {
require.Equal(t, store.ID(), root.Id)
aclRoot, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, aclRoot)
aclHead, err := store.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
}
func TestListStorage(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
spaceId := "spaceId"
aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"}
fx.db.Update(func(txn *badger.Txn) error {
_, err := createListStorage(spaceId, fx.db, txn, aclRoot)
require.NoError(t, err)
return nil
})
var listStore storage.ListStorage
fx.db.View(func(txn *badger.Txn) (err error) {
listStore, err = newListStorage(spaceId, fx.db, txn)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
return nil
})
t.Run("create same storage returns no error", func(t *testing.T) {
fx.db.View(func(txn *badger.Txn) error {
// this is ok, because we only create new list storage when we create space storage
listStore, err := createListStorage(spaceId, fx.db, txn, aclRoot)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
return nil
})
})
t.Run("set head", func(t *testing.T) {
head := "newHead"
require.NoError(t, listStore.SetHead(head))
aclHead, err := listStore.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
})
t.Run("add raw record and get raw record", func(t *testing.T) {
newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"}
require.NoError(t, listStore.AddRawRecord(context.Background(), newRec))
aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id)
require.NoError(t, err)
require.Equal(t, newRec, aclRec)
})
}

View File

@ -77,8 +77,8 @@ func createSpaceStorage(db *badger.DB, payload spacestorage.SpaceStorageCreatePa
return
}
func (s *spaceStorage) ID() (string, error) {
return s.spaceId, nil
func (s *spaceStorage) ID() string {
return s.spaceId
}
func (s *spaceStorage) TreeStorage(id string) (storage2.TreeStorage, error) {

View File

@ -0,0 +1,108 @@
package storage
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/stretchr/testify/require"
"strconv"
"testing"
)
func spaceTestPayload() spacestorage.SpaceStorageCreatePayload {
header := &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: []byte("header"),
Id: "headerId",
}
aclRoot := &aclrecordproto.RawACLRecordWithId{
Payload: []byte("aclRoot"),
Id: "aclRootId",
}
return spacestorage.SpaceStorageCreatePayload{
RecWithId: aclRoot,
SpaceHeaderWithId: header,
}
}
func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) {
header, err := store.SpaceHeader()
require.NoError(t, err)
require.Equal(t, payload.SpaceHeaderWithId, header)
aclStorage, err := store.ACLStorage()
require.NoError(t, err)
testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id)
}
func TestSpaceStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
payload := spaceTestPayload()
store, err := createSpaceStorage(fx.db, payload)
require.NoError(t, err)
testSpace(t, store, payload)
require.NoError(t, store.Close())
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createSpaceStorage(fx.db, payload)
require.Error(t, err)
})
}
func TestSpaceStorage_NewAndCreateTree(t *testing.T) {
fx := newFixture(t)
fx.open(t)
payload := spaceTestPayload()
store, err := createSpaceStorage(fx.db, payload)
require.NoError(t, err)
require.NoError(t, store.Close())
fx.stop(t)
fx.open(t)
defer fx.stop(t)
store, err = newSpaceStorage(fx.db, payload.SpaceHeaderWithId.Id)
require.NoError(t, err)
testSpace(t, store, payload)
t.Run("create tree and get tree", func(t *testing.T) {
payload := treeTestPayload()
treeStore, err := store.CreateTreeStorage(payload)
require.NoError(t, err)
testTreePayload(t, treeStore, payload)
otherStore, err := store.TreeStorage(payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, otherStore, payload)
})
}
func TestSpaceStorage_StoredIds(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
payload := spaceTestPayload()
store, err := createSpaceStorage(fx.db, payload)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()
n := 5
var ids []string
for i := 0; i < n; i++ {
treePayload := treeTestPayload()
treePayload.RootRawChange.Id += strconv.Itoa(i)
ids = append(ids, treePayload.RootRawChange.Id)
_, err := store.CreateTreeStorage(treePayload)
require.NoError(t, err)
}
storedIds, err := store.StoredIds()
require.NoError(t, err)
require.Equal(t, ids, storedIds)
}

View File

@ -2,7 +2,7 @@ package storage
import (
"context"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/dgraph-io/badger/v3"
)
@ -14,7 +14,7 @@ type treeStorage struct {
root *treechangeproto.RawTreeChangeWithId
}
func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage2.TreeStorage, err error) {
func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage.TreeStorage, err error) {
keys := newTreeKeys(spaceId, treeId)
err = db.View(func(txn *badger.Txn) error {
_, err := txn.Get(keys.RootIdKey())
@ -43,14 +43,14 @@ func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage2.TreeStor
return
}
func createTreeStorage(db *badger.DB, spaceId string, payload storage2.TreeStorageCreatePayload) (ts storage2.TreeStorage, err error) {
keys := newTreeKeys(spaceId, payload.TreeId)
func createTreeStorage(db *badger.DB, spaceId string, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
keys := newTreeKeys(spaceId, payload.RootRawChange.Id)
if hasDB(db, keys.RootIdKey()) {
err = storage2.ErrTreeExists
err = storage.ErrTreeExists
return
}
err = db.Update(func(txn *badger.Txn) error {
heads := storage2.CreateHeadsPayload(payload.Heads)
heads := storage.CreateHeadsPayload(payload.Heads)
for _, ch := range payload.Changes {
err = txn.Set(keys.RawChangeKey(ch.Id), ch.GetRawChange())
@ -85,8 +85,8 @@ func createTreeStorage(db *badger.DB, spaceId string, payload storage2.TreeStora
return
}
func (t *treeStorage) ID() (string, error) {
return t.id, nil
func (t *treeStorage) ID() string {
return t.id
}
func (t *treeStorage) Root() (raw *treechangeproto.RawTreeChangeWithId, err error) {
@ -97,16 +97,16 @@ func (t *treeStorage) Heads() (heads []string, err error) {
headsBytes, err := getDB(t.db, t.keys.HeadsKey())
if err != nil {
if err == badger.ErrKeyNotFound {
err = storage2.ErrUnknownTreeId
err = storage.ErrUnknownTreeId
}
return
}
heads = storage2.ParseHeads(headsBytes)
heads = storage.ParseHeads(headsBytes)
return
}
func (t *treeStorage) SetHeads(heads []string) (err error) {
payload := storage2.CreateHeadsPayload(heads)
payload := storage.CreateHeadsPayload(heads)
return putDB(t.db, t.keys.HeadsKey(), payload)
}
@ -118,7 +118,7 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
res, err := getDB(t.db, t.keys.RawChangeKey(id))
if err != nil {
if err == badger.ErrKeyNotFound {
err = storage2.ErrUnknownTreeId
err = storage.ErrUnknownTreeId
}
return
}

View File

@ -0,0 +1,123 @@
package storage
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/dgraph-io/badger/v3"
"github.com/stretchr/testify/require"
"os"
"testing"
)
func treeTestPayload() storage.TreeStorageCreatePayload {
rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "someRootId"}
otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"}
changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange}
return storage.TreeStorageCreatePayload{
RootRawChange: rootRawChange,
Changes: changes,
Heads: []string{rootRawChange.Id},
}
}
type fixture struct {
dir string
db *badger.DB
}
func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) {
require.Equal(t, payload.RootRawChange.Id, store.ID())
root, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, payload.RootRawChange)
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, payload.Heads, heads)
for _, ch := range payload.Changes {
dbCh, err := store.GetRawChange(context.Background(), ch.Id)
require.NoError(t, err)
require.Equal(t, ch, dbCh)
}
return
}
func newFixture(t *testing.T) *fixture {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
return &fixture{dir: dir}
}
func (fx *fixture) open(t *testing.T) {
var err error
fx.db, err = badger.Open(badger.DefaultOptions(fx.dir))
require.NoError(t, err)
}
func (fx *fixture) stop(t *testing.T) {
require.NoError(t, fx.db.Close())
}
func TestTreeStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
spaceId := "spaceId"
payload := treeTestPayload()
store, err := createTreeStorage(fx.db, spaceId, payload)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createTreeStorage(fx.db, spaceId, payload)
require.Error(t, err)
})
}
func TestTreeStorage_Methods(t *testing.T) {
fx := newFixture(t)
fx.open(t)
payload := treeTestPayload()
spaceId := "spaceId"
_, err := createTreeStorage(fx.db, spaceId, payload)
require.NoError(t, err)
fx.stop(t)
fx.open(t)
defer fx.stop(t)
store, err := newTreeStorage(fx.db, spaceId, payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("update heads", func(t *testing.T) {
newHeads := []string{"a", "b"}
require.NoError(t, store.SetHeads(newHeads))
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, newHeads, heads)
})
t.Run("add raw change, get change and has change", func(t *testing.T) {
newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"}
require.NoError(t, store.AddRawChange(newChange))
rawCh, err := store.GetRawChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.Equal(t, newChange, rawCh)
has, err := store.HasChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.True(t, has)
})
t.Run("get and has for unknown change", func(t *testing.T) {
incorrectId := "incorrectId"
_, err := store.GetRawChange(context.Background(), incorrectId)
require.Error(t, err)
has, err := store.HasChange(context.Background(), incorrectId)
require.NoError(t, err)
require.False(t, has)
})
}

View File

@ -64,7 +64,7 @@ func (s *service) CreateSpace(
return
}
return store.ID()
return store.ID(), nil
}
func (s *service) DeriveSpace(
@ -79,7 +79,7 @@ func (s *service) DeriveSpace(
return
}
return store.ID()
return store.ID(), nil
}
func (s *service) GetSpace(ctx context.Context, id string) (Space, error) {

View File

@ -163,12 +163,11 @@ func (mr *MockSpaceStorageMockRecorder) CreateTreeStorage(arg0 interface{}) *gom
}
// ID mocks base method.
func (m *MockSpaceStorage) ID() (string, error) {
func (m *MockSpaceStorage) ID() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ID")
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
return ret0
}
// ID indicates an expected call of ID.

View File

@ -6,7 +6,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
)
const CName = "commonspace.storage"
@ -15,9 +15,9 @@ var ErrSpaceStorageExists = errors.New("space storage exists")
var ErrSpaceStorageMissing = errors.New("space storage missing")
type SpaceStorage interface {
storage2.Storage
storage2.Provider
ACLStorage() (storage2.ListStorage, error)
storage.Provider
ID() string
ACLStorage() (storage.ListStorage, error)
SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error)
StoredIds() ([]string, error)
Close() error

View File

@ -245,7 +245,8 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
Loop:
for {
msg, err := stream.Recv()
var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
break
@ -260,7 +261,8 @@ Loop:
limiter <- struct{}{}
}()
}
return s.removePeer(peerId)
s.removePeer(peerId)
return
}
func (s *streamPool) removePeer(peerId string) (err error) {

View File

@ -0,0 +1,322 @@
package syncservice
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"testing"
"time"
)
type testServer struct {
stream chan spacesyncproto.DRPCSpace_StreamStream
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error
releaseStream chan error
watchErrOnce bool
}
func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
panic("implement me")
}
func (t *testServer) PushSpace(ctx context.Context, request *spacesyncproto.PushSpaceRequest) (*spacesyncproto.PushSpaceResponse, error) {
panic("implement me")
}
func (t *testServer) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error {
t.stream <- stream
return <-t.releaseStream
}
func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpace_StreamStream {
select {
case <-time.After(time.Second * 5):
test.Fatalf("waiteStream timeout")
case st := <-t.stream:
return st
}
return nil
}
type fixture struct {
testServer *testServer
drpcTS *rpctest.TesServer
client spacesyncproto.DRPCSpaceClient
clientStream spacesyncproto.DRPCSpace_StreamStream
serverStream spacesyncproto.DRPCSpace_StreamStream
pool *streamPool
localId peer.ID
remoteId peer.ID
}
func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler) *fixture {
fx := &fixture{
testServer: &testServer{},
drpcTS: rpctest.NewTestServer(),
localId: localId,
remoteId: remoteId,
}
fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1)
require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer))
clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId)
fx.client = spacesyncproto.NewDRPCSpaceClient(fx.drpcTS.DialWrapConn(nil, clientWrapper))
var err error
fx.clientStream, err = fx.client.Stream(context.Background())
require.NoError(t, err)
fx.serverStream = fx.testServer.waitStream(t)
fx.pool = newStreamPool(handler).(*streamPool)
return fx
}
func (fx *fixture) run(t *testing.T) chan error {
waitCh := make(chan error)
go func() {
err := fx.pool.AddAndReadStreamSync(fx.clientStream)
waitCh <- err
}()
time.Sleep(time.Millisecond * 10)
fx.pool.Lock()
require.Equal(t, fx.pool.peerStreams[fx.remoteId.String()], fx.clientStream)
fx.pool.Unlock()
return waitCh
}
func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.serverStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_Close(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
var events []string
recvChan := make(chan struct{})
go func() {
fx.pool.Close()
events = append(events, "pool_close")
recvChan <- struct{}{}
}()
time.Sleep(50 * time.Millisecond) //err = <-waitCh
events = append(events, "stream_close")
err := fx.clientStream.Close()
require.NoError(t, err)
<-recvChan
require.Equal(t, []string{"stream_close", "pool_close"}, events)
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
var events []string
recvChan := make(chan struct{})
go func() {
fx.pool.Close()
events = append(events, "pool_close")
recvChan <- struct{}{}
}()
time.Sleep(50 * time.Millisecond) //err = <-waitCh
events = append(events, "stream_close")
err := fx.clientStream.Close()
require.NoError(t, err)
<-recvChan
require.Equal(t, []string{"stream_close", "pool_close"}, events)
})
}
func TestStreamPool_ReceiveMessage(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool receive message from server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
recvChan := make(chan struct{})
fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
require.Equal(t, msg, message)
recvChan <- struct{}{}
return nil
})
waitCh := fx.run(t)
err := fx.serverStream.Send(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_HasActiveStream(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool has active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
require.True(t, fx.pool.HasActiveStream(remId.String()))
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("pool has no active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.False(t, fx.pool.HasActiveStream(remId.String()))
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_SendAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool send async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.SendAsync([]string{remId.String()}, msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_SendSync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool send sync to server", func(t *testing.T) {
objectId := "objectId"
payload := []byte("payload")
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
message.Payload = payload
err = fx.serverStream.Send(message)
require.NoError(t, err)
}()
waitCh := fx.run(t)
res, err := fx.pool.SendSync(remId.String(), msg)
require.NoError(t, err)
require.Equal(t, payload, res.Payload)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("pool send sync timeout", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
syncWaitPeriod = time.Millisecond * 30
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
}()
waitCh := fx.run(t)
_, err := fx.pool.SendSync(remId.String(), msg)
require.Equal(t, ErrSyncTimeout, err)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_BroadcastAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool broadcast async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.BroadcastAsync(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}

View File

@ -6,7 +6,6 @@ package mock_synctree
import (
reflect "reflect"
time "time"
tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
treechangeproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
@ -122,20 +121,6 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest))
}
// LastUsage mocks base method.
func (m *MockSyncClient) LastUsage() time.Time {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LastUsage")
ret0, _ := ret[0].(time.Time)
return ret0
}
// LastUsage indicates an expected call of LastUsage.
func (mr *MockSyncClientMockRecorder) LastUsage() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockSyncClient)(nil).LastUsage))
}
// SendAsync mocks base method.
func (m *MockSyncClient) SendAsync(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) error {
m.ctrl.T.Helper()

View File

@ -59,9 +59,7 @@ type BuildDeps struct {
TreeStorage storage.TreeStorage
}
func DeriveSyncTree(
ctx context.Context,
deps CreateDeps) (t tree.ObjectTree, err error) {
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage)
if err != nil {
return
@ -86,9 +84,7 @@ func DeriveSyncTree(
return
}
func CreateSyncTree(
ctx context.Context,
deps CreateDeps) (t tree.ObjectTree, err error) {
func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage)
if err != nil {
return
@ -155,7 +151,6 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := storage.TreeStorageCreatePayload{
TreeId: id,
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
@ -175,10 +170,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
return buildSyncTree(ctx, true, deps)
}
func buildSyncTree(
ctx context.Context,
isFirstBuild bool,
deps BuildDeps) (t tree.ObjectTree, err error) {
func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tree.ObjectTree, err error) {
t, err = buildObjectTree(deps.TreeStorage, deps.AclList)
if err != nil {

View File

@ -2,6 +2,8 @@ package rpctest
import (
"context"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
@ -9,6 +11,48 @@ import (
"storj.io/drpc/drpcserver"
)
type SecConnMock struct {
net.Conn
localPrivKey crypto.PrivKey
remotePubKey crypto.PubKey
localId peer.ID
remoteId peer.ID
}
func (s *SecConnMock) LocalPeer() peer.ID {
return s.localId
}
func (s *SecConnMock) LocalPrivateKey() crypto.PrivKey {
return s.localPrivKey
}
func (s *SecConnMock) RemotePeer() peer.ID {
return s.remoteId
}
func (s *SecConnMock) RemotePublicKey() crypto.PubKey {
return s.remotePubKey
}
type ConnWrapper func(conn net.Conn) net.Conn
func NewSecConnWrapper(
localPrivKey crypto.PrivKey,
remotePubKey crypto.PubKey,
localId peer.ID,
remoteId peer.ID) ConnWrapper {
return func(conn net.Conn) net.Conn {
return &SecConnMock{
Conn: conn,
localPrivKey: localPrivKey,
remotePubKey: remotePubKey,
localId: localId,
remoteId: remoteId,
}
}
}
func NewTestServer() *TesServer {
ts := &TesServer{
Mux: drpcmux.New(),
@ -23,7 +67,17 @@ type TesServer struct {
}
func (ts *TesServer) Dial() drpc.Conn {
return ts.DialWrapConn(nil, nil)
}
func (ts *TesServer) DialWrapConn(serverWrapper ConnWrapper, clientWrapper ConnWrapper) drpc.Conn {
sc, cc := net.Pipe()
if serverWrapper != nil {
sc = serverWrapper(sc)
}
if clientWrapper != nil {
cc = clientWrapper(cc)
}
go ts.Server.ServeOne(context.Background(), sc)
return drpcconn.New(cc)
}

View File

@ -50,19 +50,13 @@ type aclList struct {
}
func BuildACLListWithIdentity(acc *account.AccountData, storage storage.ListStorage) (ACLList, error) {
id, err := storage.ID()
if err != nil {
return nil, err
}
id := storage.ID()
builder := newACLStateBuilderWithIdentity(acc)
return build(id, builder, newACLRecordBuilder(id, common.NewKeychain()), storage)
}
func BuildACLList(storage storage.ListStorage) (ACLList, error) {
id, err := storage.ID()
if err != nil {
return nil, err
}
id := storage.ID()
return build(id, newACLStateBuilder(), newACLRecordBuilder(id, common.NewKeychain()), storage)
}

View File

@ -56,10 +56,10 @@ func (i *inMemoryACLListStorage) AddRawRecord(ctx context.Context, rec *aclrecor
panic("implement me")
}
func (i *inMemoryACLListStorage) ID() (string, error) {
func (i *inMemoryACLListStorage) ID() string {
i.RLock()
defer i.RUnlock()
return i.id, nil
return i.id
}
type inMemoryTreeStorage struct {
@ -72,7 +72,6 @@ type inMemoryTreeStorage struct {
}
func NewInMemoryTreeStorage(
treeId string,
root *treechangeproto.RawTreeChangeWithId,
heads []string,
changes []*treechangeproto.RawTreeChangeWithId) (TreeStorage, error) {
@ -80,10 +79,10 @@ func NewInMemoryTreeStorage(
for _, ch := range changes {
allChanges[ch.Id] = ch
}
allChanges[treeId] = root
allChanges[root.Id] = root
return &inMemoryTreeStorage{
id: treeId,
id: root.Id,
root: root,
heads: heads,
changes: allChanges,
@ -96,10 +95,10 @@ func (t *inMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, e
return exists, nil
}
func (t *inMemoryTreeStorage) ID() (string, error) {
func (t *inMemoryTreeStorage) ID() string {
t.RLock()
defer t.RUnlock()
return t.id, nil
return t.id
}
func (t *inMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) {
@ -159,12 +158,12 @@ func (i *inMemoryStorageProvider) TreeStorage(id string) (TreeStorage, error) {
func (i *inMemoryStorageProvider) CreateTreeStorage(payload TreeStorageCreatePayload) (TreeStorage, error) {
i.Lock()
defer i.Unlock()
res, err := NewInMemoryTreeStorage(payload.TreeId, payload.RootRawChange, payload.Heads, payload.Changes)
res, err := NewInMemoryTreeStorage(payload.RootRawChange, payload.Heads, payload.Changes)
if err != nil {
return nil, err
}
i.objects[payload.TreeId] = res
i.objects[payload.RootRawChange.Id] = res
return res, nil
}

View File

@ -12,7 +12,7 @@ var ErrACLExists = errors.New("acl already exists")
var ErrUnknownRecord = errors.New("record doesn't exist")
type ListStorage interface {
Storage
ID() string
Root() (*aclrecordproto.RawACLRecordWithId, error)
Head() (string, error)
SetHead(headId string) error

View File

@ -81,12 +81,11 @@ func (mr *MockListStorageMockRecorder) Head() *gomock.Call {
}
// ID mocks base method.
func (m *MockListStorage) ID() (string, error) {
func (m *MockListStorage) ID() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ID")
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
return ret0
}
// ID indicates an expected call of ID.
@ -207,12 +206,11 @@ func (mr *MockTreeStorageMockRecorder) Heads() *gomock.Call {
}
// ID mocks base method.
func (m *MockTreeStorage) ID() (string, error) {
func (m *MockTreeStorage) ID() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ID")
ret0, _ := ret[0].(string)
ret1, _ := ret[1].(error)
return ret0, ret1
return ret0
}
// ID indicates an expected call of ID.

View File

@ -10,7 +10,6 @@ var ErrTreeExists = errors.New("tree already exists")
var ErrUnkownChange = errors.New("change doesn't exist")
type TreeStorageCreatePayload struct {
TreeId string
RootRawChange *treechangeproto.RawTreeChangeWithId
Changes []*treechangeproto.RawTreeChangeWithId
Heads []string

View File

@ -1,5 +0,0 @@
package storage
type Storage interface {
ID() (string, error)
}

View File

@ -6,7 +6,7 @@ import (
)
type TreeStorage interface {
Storage
ID() string
Root() (*treechangeproto.RawTreeChangeWithId, error)
Heads() ([]string, error)
SetHeads(heads []string) error

View File

@ -120,8 +120,8 @@ func (t *ACLListStorageBuilder) AddRawRecord(ctx context.Context, rec *aclrecord
panic("implement me")
}
func (t *ACLListStorageBuilder) ID() (string, error) {
return t.id, nil
func (t *ACLListStorageBuilder) ID() string {
return t.id
}
func (t *ACLListStorageBuilder) GetRawRecords() []*aclrecordproto2.RawACLRecordWithId {

View File

@ -3,7 +3,7 @@ package tree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/testutils/acllistbuilder"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/stretchr/testify/assert"
@ -53,9 +53,9 @@ func (c *mockChangeCreator) createRaw(id, aclId, snapshotId string, isSnapshot b
}
}
func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage2.TreeStorage {
func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage.TreeStorage {
root := c.createRoot(treeId, aclHeadId)
treeStorage, _ := storage2.NewInMemoryTreeStorage(treeId, root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
treeStorage, _ := storage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
return treeStorage
}
@ -95,7 +95,7 @@ func (m *mockChangeValidator) ValidateFullTree(tree *Tree, aclList list.ACLList)
type testTreeContext struct {
aclList list.ACLList
treeStorage storage2.TreeStorage
treeStorage storage.TreeStorage
changeBuilder *mockChangeBuilder
changeCreator *mockChangeCreator
objTree ObjectTree

View File

@ -3,7 +3,7 @@ package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/common"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric"
@ -20,7 +20,7 @@ type ObjectTreeCreatePayload struct {
Identity []byte
}
func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (ObjectTree, error) {
func BuildObjectTree(treeStorage storage.TreeStorage, aclList list.ACLList) (ObjectTree, error) {
rootChange, err := treeStorage.Root()
if err != nil {
return nil, err
@ -32,14 +32,14 @@ func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (Ob
func CreateDerivedObjectTree(
payload ObjectTreeCreatePayload,
aclList list.ACLList,
createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
return createObjectTree(payload, 0, nil, aclList, createStorage)
}
func CreateObjectTree(
payload ObjectTreeCreatePayload,
aclList list.ACLList,
createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
bytes := make([]byte, 32)
_, err = rand.Read(bytes)
if err != nil {
@ -53,7 +53,7 @@ func createObjectTree(
timestamp int64,
seed []byte,
aclList list.ACLList,
createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
aclList.RLock()
aclHeadId := aclList.Head().Id
aclList.RUnlock()
@ -77,8 +77,7 @@ func createObjectTree(
}
// create storage
st, err := createStorage(storage2.TreeStorageCreatePayload{
TreeId: raw.Id,
st, err := createStorage(storage.TreeStorageCreatePayload{
RootRawChange: raw,
Changes: []*treechangeproto.RawTreeChangeWithId{raw},
Heads: []string{raw.Id},
@ -127,11 +126,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
}
}
objTree.id, err = objTree.treeStorage.ID()
if err != nil {
return nil, err
}
objTree.id = objTree.treeStorage.ID()
objTree.root, err = objTree.treeStorage.Root()
if err != nil {
return nil, err

View File

@ -409,9 +409,13 @@ func (c *oCache) Close() (err error) {
}
c.closed = true
close(c.closeCh)
var toClose []*entry
var toClose, alreadyClosing []*entry
for _, e := range c.data {
toClose = append(toClose, e)
if e.isClosing {
alreadyClosing = append(alreadyClosing, e)
} else {
toClose = append(toClose, e)
}
}
c.mu.Unlock()
for _, e := range toClose {
@ -422,5 +426,8 @@ func (c *oCache) Close() (err error) {
}
}
}
for _, e := range alreadyClosing {
<-e.close
}
return nil
}

View File

@ -62,5 +62,17 @@ func (s spaceKeys) HeaderKey() []byte {
}
func isRootIdKey(key string) bool {
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "heads")
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads")
}
func getRootId(key string) string {
prefixLen := 2 // len("t/")
suffixLen := 6 // len("/heads")
rootLen := len(key) - suffixLen - prefixLen
sBuf := strings.Builder{}
sBuf.Grow(rootLen)
for i := prefixLen; i < prefixLen+rootLen; i++ {
sBuf.WriteByte(key[i])
}
return sBuf.String()
}

View File

@ -82,8 +82,8 @@ func createListStorage(db *pogreb.DB, root *aclrecordproto.RawACLRecordWithId) (
return
}
func (l *listStorage) ID() (string, error) {
return l.id, nil
func (l *listStorage) ID() string {
return l.id
}
func (l *listStorage) Root() (*aclrecordproto.RawACLRecordWithId, error) {

View File

@ -0,0 +1,70 @@
package storage
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/stretchr/testify/require"
"testing"
)
func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) {
require.Equal(t, store.ID(), root.Id)
aclRoot, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, aclRoot)
aclHead, err := store.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
}
func TestListStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"}
listStore, err := createListStorage(fx.db, aclRoot)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
t.Run("create same list storage returns nil", func(t *testing.T) {
// this is ok, because we only create new list storage when we create space storage
listStore, err := createListStorage(fx.db, aclRoot)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
})
}
func TestListStorage_Methods(t *testing.T) {
fx := newFixture(t)
fx.open(t)
aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"}
_, err := createListStorage(fx.db, aclRoot)
require.NoError(t, err)
fx.stop(t)
fx.open(t)
defer fx.stop(t)
listStore, err := newListStorage(fx.db)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
t.Run("set head", func(t *testing.T) {
head := "newHead"
require.NoError(t, listStore.SetHead(head))
aclHead, err := listStore.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
})
t.Run("add raw record and get raw record", func(t *testing.T) {
newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"}
require.NoError(t, listStore.AddRawRecord(context.Background(), newRec))
aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id)
require.NoError(t, err)
require.Equal(t, newRec, aclRec)
})
}

View File

@ -88,8 +88,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate
}
defer func() {
log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage")
if err != nil {
log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage")
db.Close()
}
}()
@ -129,8 +129,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate
return
}
func (s *spaceStorage) ID() (string, error) {
return s.spaceId, nil
func (s *spaceStorage) ID() string {
return s.spaceId
}
func (s *spaceStorage) TreeStorage(id string) (storage2.TreeStorage, error) {
@ -156,13 +156,13 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI
func (s *spaceStorage) StoredIds() (ids []string, err error) {
index := s.objDb.Items()
key, val, err := index.Next()
key, _, err := index.Next()
for err == nil {
strKey := string(key)
if isRootIdKey(strKey) {
ids = append(ids, string(val))
ids = append(ids, getRootId(strKey))
}
key, val, err = index.Next()
key, _, err = index.Next()
}
if err != pogreb.ErrIterationDone {

View File

@ -0,0 +1,107 @@
package storage
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/stretchr/testify/require"
"os"
"strconv"
"testing"
)
func spaceTestPayload() spacestorage.SpaceStorageCreatePayload {
header := &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: []byte("header"),
Id: "headerId",
}
aclRoot := &aclrecordproto.RawACLRecordWithId{
Payload: []byte("aclRoot"),
Id: "aclRootId",
}
return spacestorage.SpaceStorageCreatePayload{
RecWithId: aclRoot,
SpaceHeaderWithId: header,
}
}
func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) {
header, err := store.SpaceHeader()
require.NoError(t, err)
require.Equal(t, payload.SpaceHeaderWithId, header)
aclStorage, err := store.ACLStorage()
require.NoError(t, err)
testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id)
}
func TestSpaceStorage_Create(t *testing.T) {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
payload := spaceTestPayload()
store, err := createSpaceStorage(dir, payload)
require.NoError(t, err)
testSpace(t, store, payload)
require.NoError(t, store.Close())
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createSpaceStorage(dir, payload)
require.Error(t, err)
})
}
func TestSpaceStorage_NewAndCreateTree(t *testing.T) {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
payload := spaceTestPayload()
store, err := createSpaceStorage(dir, payload)
require.NoError(t, err)
require.NoError(t, store.Close())
store, err = newSpaceStorage(dir, payload.SpaceHeaderWithId.Id)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()
testSpace(t, store, payload)
t.Run("create tree and get tree", func(t *testing.T) {
payload := treeTestPayload()
treeStore, err := store.CreateTreeStorage(payload)
require.NoError(t, err)
testTreePayload(t, treeStore, payload)
otherStore, err := store.TreeStorage(payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, otherStore, payload)
})
}
func TestSpaceStorage_StoredIds(t *testing.T) {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
payload := spaceTestPayload()
store, err := createSpaceStorage(dir, payload)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()
n := 5
var ids []string
for i := 0; i < n; i++ {
treePayload := treeTestPayload()
treePayload.RootRawChange.Id += strconv.Itoa(i)
ids = append(ids, treePayload.RootRawChange.Id)
_, err := store.CreateTreeStorage(treePayload)
require.NoError(t, err)
}
storedIds, err := store.StoredIds()
require.NoError(t, err)
require.Equal(t, ids, storedIds)
}

View File

@ -3,7 +3,7 @@ package storage
import (
"context"
"github.com/akrylysov/pogreb"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
)
@ -14,14 +14,14 @@ type treeStorage struct {
root *treechangeproto.RawTreeChangeWithId
}
func newTreeStorage(db *pogreb.DB, treeId string) (ts storage2.TreeStorage, err error) {
func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err error) {
keys := newTreeKeys(treeId)
heads, err := db.Get(keys.HeadsKey())
if err != nil {
return
}
if heads == nil {
err = storage2.ErrUnknownTreeId
err = storage.ErrUnknownTreeId
return
}
@ -30,7 +30,7 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage2.TreeStorage, err
return
}
if root == nil {
err = storage2.ErrUnknownTreeId
err = storage.ErrUnknownTreeId
return
}
@ -47,18 +47,18 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage2.TreeStorage, err
return
}
func createTreeStorage(db *pogreb.DB, payload storage2.TreeStorageCreatePayload) (ts storage2.TreeStorage, err error) {
keys := newTreeKeys(payload.TreeId)
func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
keys := newTreeKeys(payload.RootRawChange.Id)
has, err := db.Has(keys.HeadsKey())
if err != nil {
return
}
if has {
err = storage2.ErrTreeExists
err = storage.ErrTreeExists
return
}
heads := storage2.CreateHeadsPayload(payload.Heads)
heads := storage.CreateHeadsPayload(payload.Heads)
for _, ch := range payload.Changes {
err = db.Put(keys.RawChangeKey(ch.Id), ch.GetRawChange())
@ -86,8 +86,8 @@ func createTreeStorage(db *pogreb.DB, payload storage2.TreeStorageCreatePayload)
return
}
func (t *treeStorage) ID() (string, error) {
return t.id, nil
func (t *treeStorage) ID() string {
return t.id
}
func (t *treeStorage) Root() (raw *treechangeproto.RawTreeChangeWithId, err error) {
@ -100,15 +100,15 @@ func (t *treeStorage) Heads() (heads []string, err error) {
return
}
if headsBytes == nil {
err = storage2.ErrUnknownTreeId
err = storage.ErrUnknownTreeId
return
}
heads = storage2.ParseHeads(headsBytes)
heads = storage.ParseHeads(headsBytes)
return
}
func (t *treeStorage) SetHeads(heads []string) (err error) {
payload := storage2.CreateHeadsPayload(heads)
payload := storage.CreateHeadsPayload(heads)
return t.db.Put(t.keys.HeadsKey(), payload)
}
@ -122,7 +122,7 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha
return
}
if res == nil {
err = storage2.ErrUnkownChange
err = storage.ErrUnkownChange
}
raw = &treechangeproto.RawTreeChangeWithId{

View File

@ -0,0 +1,121 @@
package storage
import (
"context"
"github.com/akrylysov/pogreb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/stretchr/testify/require"
"os"
"testing"
)
func treeTestPayload() storage.TreeStorageCreatePayload {
rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "rootId"}
otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"}
changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange}
return storage.TreeStorageCreatePayload{
RootRawChange: rootRawChange,
Changes: changes,
Heads: []string{rootRawChange.Id},
}
}
type fixture struct {
dir string
db *pogreb.DB
}
func newFixture(t *testing.T) *fixture {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
return &fixture{dir: dir}
}
func (fx *fixture) open(t *testing.T) {
var err error
fx.db, err = pogreb.Open(fx.dir, nil)
require.NoError(t, err)
}
func (fx *fixture) stop(t *testing.T) {
require.NoError(t, fx.db.Close())
}
func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) {
require.Equal(t, payload.RootRawChange.Id, store.ID())
root, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, payload.RootRawChange)
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, payload.Heads, heads)
for _, ch := range payload.Changes {
dbCh, err := store.GetRawChange(context.Background(), ch.Id)
require.NoError(t, err)
require.Equal(t, ch, dbCh)
}
return
}
func TestTreeStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
payload := treeTestPayload()
store, err := createTreeStorage(fx.db, payload)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createTreeStorage(fx.db, payload)
require.Error(t, err)
})
}
func TestTreeStorage_Methods(t *testing.T) {
fx := newFixture(t)
fx.open(t)
payload := treeTestPayload()
_, err := createTreeStorage(fx.db, payload)
require.NoError(t, err)
fx.stop(t)
fx.open(t)
defer fx.stop(t)
store, err := newTreeStorage(fx.db, payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("update heads", func(t *testing.T) {
newHeads := []string{"a", "b"}
require.NoError(t, store.SetHeads(newHeads))
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, newHeads, heads)
})
t.Run("add raw change, get change and has change", func(t *testing.T) {
newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"}
require.NoError(t, store.AddRawChange(newChange))
rawCh, err := store.GetRawChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.Equal(t, newChange, rawCh)
has, err := store.HasChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.True(t, has)
})
t.Run("get and has for unknown change", func(t *testing.T) {
incorrectId := "incorrectId"
_, err := store.GetRawChange(context.Background(), incorrectId)
require.Error(t, err)
has, err := store.HasChange(context.Background(), incorrectId)
require.NoError(t, err)
require.False(t, has)
})
}