This commit is contained in:
Sergey Cherepanov 2022-10-31 11:33:42 +03:00
commit 434f149e04
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
20 changed files with 1029 additions and 39 deletions

View File

@ -37,7 +37,7 @@ func newListStorage(spaceId string, db *badger.DB, txn *badger.Txn) (ls storage.
ls = &listStorage{
db: db,
keys: newACLKeys(spaceId),
keys: keys,
id: stringId,
root: rootWithId,
}
@ -70,7 +70,7 @@ func createListStorage(spaceId string, db *badger.DB, txn *badger.Txn, root *acl
ls = &listStorage{
db: db,
keys: newACLKeys(spaceId),
keys: keys,
id: root.Id,
root: root,
}

View File

@ -0,0 +1,72 @@
package storage
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/dgraph-io/badger/v3"
"github.com/stretchr/testify/require"
"testing"
)
func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) {
require.Equal(t, store.Id(), root.Id)
aclRoot, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, aclRoot)
aclHead, err := store.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
}
func TestListStorage(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
spaceId := "spaceId"
aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"}
fx.db.Update(func(txn *badger.Txn) error {
_, err := createListStorage(spaceId, fx.db, txn, aclRoot)
require.NoError(t, err)
return nil
})
var listStore storage.ListStorage
fx.db.View(func(txn *badger.Txn) (err error) {
listStore, err = newListStorage(spaceId, fx.db, txn)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
return nil
})
t.Run("create same storage returns no error", func(t *testing.T) {
fx.db.View(func(txn *badger.Txn) error {
// this is ok, because we only create new list storage when we create space storage
listStore, err := createListStorage(spaceId, fx.db, txn, aclRoot)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
return nil
})
})
t.Run("set head", func(t *testing.T) {
head := "newHead"
require.NoError(t, listStore.SetHead(head))
aclHead, err := listStore.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
})
t.Run("add raw record and get raw record", func(t *testing.T) {
newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"}
require.NoError(t, listStore.AddRawRecord(context.Background(), newRec))
aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id)
require.NoError(t, err)
require.Equal(t, newRec, aclRec)
})
}

View File

@ -0,0 +1,108 @@
package storage
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/stretchr/testify/require"
"strconv"
"testing"
)
func spaceTestPayload() spacestorage.SpaceStorageCreatePayload {
header := &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: []byte("header"),
Id: "headerId",
}
aclRoot := &aclrecordproto.RawACLRecordWithId{
Payload: []byte("aclRoot"),
Id: "aclRootId",
}
return spacestorage.SpaceStorageCreatePayload{
RecWithId: aclRoot,
SpaceHeaderWithId: header,
}
}
func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) {
header, err := store.SpaceHeader()
require.NoError(t, err)
require.Equal(t, payload.SpaceHeaderWithId, header)
aclStorage, err := store.ACLStorage()
require.NoError(t, err)
testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id)
}
func TestSpaceStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
payload := spaceTestPayload()
store, err := createSpaceStorage(fx.db, payload)
require.NoError(t, err)
testSpace(t, store, payload)
require.NoError(t, store.Close())
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createSpaceStorage(fx.db, payload)
require.Error(t, err)
})
}
func TestSpaceStorage_NewAndCreateTree(t *testing.T) {
fx := newFixture(t)
fx.open(t)
payload := spaceTestPayload()
store, err := createSpaceStorage(fx.db, payload)
require.NoError(t, err)
require.NoError(t, store.Close())
fx.stop(t)
fx.open(t)
defer fx.stop(t)
store, err = newSpaceStorage(fx.db, payload.SpaceHeaderWithId.Id)
require.NoError(t, err)
testSpace(t, store, payload)
t.Run("create tree and get tree", func(t *testing.T) {
payload := treeTestPayload()
treeStore, err := store.CreateTreeStorage(payload)
require.NoError(t, err)
testTreePayload(t, treeStore, payload)
otherStore, err := store.TreeStorage(payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, otherStore, payload)
})
}
func TestSpaceStorage_StoredIds(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
payload := spaceTestPayload()
store, err := createSpaceStorage(fx.db, payload)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()
n := 5
var ids []string
for i := 0; i < n; i++ {
treePayload := treeTestPayload()
treePayload.RootRawChange.Id += strconv.Itoa(i)
ids = append(ids, treePayload.RootRawChange.Id)
_, err := store.CreateTreeStorage(treePayload)
require.NoError(t, err)
}
storedIds, err := store.StoredIds()
require.NoError(t, err)
require.Equal(t, ids, storedIds)
}

View File

@ -44,7 +44,7 @@ func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage.TreeStora
}
func createTreeStorage(db *badger.DB, spaceId string, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
keys := newTreeKeys(spaceId, payload.TreeId)
keys := newTreeKeys(spaceId, payload.RootRawChange.Id)
if hasDB(db, keys.RootIdKey()) {
err = storage.ErrTreeExists
return

View File

@ -0,0 +1,123 @@
package storage
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/dgraph-io/badger/v3"
"github.com/stretchr/testify/require"
"os"
"testing"
)
func treeTestPayload() storage.TreeStorageCreatePayload {
rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "someRootId"}
otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"}
changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange}
return storage.TreeStorageCreatePayload{
RootRawChange: rootRawChange,
Changes: changes,
Heads: []string{rootRawChange.Id},
}
}
type fixture struct {
dir string
db *badger.DB
}
func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) {
require.Equal(t, payload.RootRawChange.Id, store.Id())
root, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, payload.RootRawChange)
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, payload.Heads, heads)
for _, ch := range payload.Changes {
dbCh, err := store.GetRawChange(context.Background(), ch.Id)
require.NoError(t, err)
require.Equal(t, ch, dbCh)
}
return
}
func newFixture(t *testing.T) *fixture {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
return &fixture{dir: dir}
}
func (fx *fixture) open(t *testing.T) {
var err error
fx.db, err = badger.Open(badger.DefaultOptions(fx.dir))
require.NoError(t, err)
}
func (fx *fixture) stop(t *testing.T) {
require.NoError(t, fx.db.Close())
}
func TestTreeStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
spaceId := "spaceId"
payload := treeTestPayload()
store, err := createTreeStorage(fx.db, spaceId, payload)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createTreeStorage(fx.db, spaceId, payload)
require.Error(t, err)
})
}
func TestTreeStorage_Methods(t *testing.T) {
fx := newFixture(t)
fx.open(t)
payload := treeTestPayload()
spaceId := "spaceId"
_, err := createTreeStorage(fx.db, spaceId, payload)
require.NoError(t, err)
fx.stop(t)
fx.open(t)
defer fx.stop(t)
store, err := newTreeStorage(fx.db, spaceId, payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("update heads", func(t *testing.T) {
newHeads := []string{"a", "b"}
require.NoError(t, store.SetHeads(newHeads))
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, newHeads, heads)
})
t.Run("add raw change, get change and has change", func(t *testing.T) {
newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"}
require.NoError(t, store.AddRawChange(newChange))
rawCh, err := store.GetRawChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.Equal(t, newChange, rawCh)
has, err := store.HasChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.True(t, has)
})
t.Run("get and has for unknown change", func(t *testing.T) {
incorrectId := "incorrectId"
_, err := store.GetRawChange(context.Background(), incorrectId)
require.Error(t, err)
has, err := store.HasChange(context.Background(), incorrectId)
require.NoError(t, err)
require.False(t, has)
})
}

View File

@ -245,7 +245,8 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
Loop:
for {
msg, err := stream.Recv()
var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
break
@ -260,7 +261,8 @@ Loop:
limiter <- struct{}{}
}()
}
return s.removePeer(peerId)
s.removePeer(peerId)
return
}
func (s *streamPool) removePeer(peerId string) (err error) {

View File

@ -0,0 +1,322 @@
package syncservice
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"testing"
"time"
)
type testServer struct {
stream chan spacesyncproto.DRPCSpace_StreamStream
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error
releaseStream chan error
watchErrOnce bool
}
func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
panic("implement me")
}
func (t *testServer) PushSpace(ctx context.Context, request *spacesyncproto.PushSpaceRequest) (*spacesyncproto.PushSpaceResponse, error) {
panic("implement me")
}
func (t *testServer) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error {
t.stream <- stream
return <-t.releaseStream
}
func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpace_StreamStream {
select {
case <-time.After(time.Second * 5):
test.Fatalf("waiteStream timeout")
case st := <-t.stream:
return st
}
return nil
}
type fixture struct {
testServer *testServer
drpcTS *rpctest.TesServer
client spacesyncproto.DRPCSpaceClient
clientStream spacesyncproto.DRPCSpace_StreamStream
serverStream spacesyncproto.DRPCSpace_StreamStream
pool *streamPool
localId peer.ID
remoteId peer.ID
}
func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler) *fixture {
fx := &fixture{
testServer: &testServer{},
drpcTS: rpctest.NewTestServer(),
localId: localId,
remoteId: remoteId,
}
fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1)
require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer))
clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId)
fx.client = spacesyncproto.NewDRPCSpaceClient(fx.drpcTS.DialWrapConn(nil, clientWrapper))
var err error
fx.clientStream, err = fx.client.Stream(context.Background())
require.NoError(t, err)
fx.serverStream = fx.testServer.waitStream(t)
fx.pool = newStreamPool(handler).(*streamPool)
return fx
}
func (fx *fixture) run(t *testing.T) chan error {
waitCh := make(chan error)
go func() {
err := fx.pool.AddAndReadStreamSync(fx.clientStream)
waitCh <- err
}()
time.Sleep(time.Millisecond * 10)
fx.pool.Lock()
require.Equal(t, fx.pool.peerStreams[fx.remoteId.String()], fx.clientStream)
fx.pool.Unlock()
return waitCh
}
func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.serverStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_Close(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
var events []string
recvChan := make(chan struct{})
go func() {
fx.pool.Close()
events = append(events, "pool_close")
recvChan <- struct{}{}
}()
time.Sleep(50 * time.Millisecond) //err = <-waitCh
events = append(events, "stream_close")
err := fx.clientStream.Close()
require.NoError(t, err)
<-recvChan
require.Equal(t, []string{"stream_close", "pool_close"}, events)
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
var events []string
recvChan := make(chan struct{})
go func() {
fx.pool.Close()
events = append(events, "pool_close")
recvChan <- struct{}{}
}()
time.Sleep(50 * time.Millisecond) //err = <-waitCh
events = append(events, "stream_close")
err := fx.clientStream.Close()
require.NoError(t, err)
<-recvChan
require.Equal(t, []string{"stream_close", "pool_close"}, events)
})
}
func TestStreamPool_ReceiveMessage(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool receive message from server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
recvChan := make(chan struct{})
fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
require.Equal(t, msg, message)
recvChan <- struct{}{}
return nil
})
waitCh := fx.run(t)
err := fx.serverStream.Send(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_HasActiveStream(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool has active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
require.True(t, fx.pool.HasActiveStream(remId.String()))
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("pool has no active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.False(t, fx.pool.HasActiveStream(remId.String()))
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_SendAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool send async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.SendAsync([]string{remId.String()}, msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_SendSync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool send sync to server", func(t *testing.T) {
objectId := "objectId"
payload := []byte("payload")
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
message.Payload = payload
err = fx.serverStream.Send(message)
require.NoError(t, err)
}()
waitCh := fx.run(t)
res, err := fx.pool.SendSync(remId.String(), msg)
require.NoError(t, err)
require.Equal(t, payload, res.Payload)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("pool send sync timeout", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
syncWaitPeriod = time.Millisecond * 30
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
}()
waitCh := fx.run(t)
_, err := fx.pool.SendSync(remId.String(), msg)
require.Equal(t, ErrSyncTimeout, err)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_BroadcastAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool broadcast async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.BroadcastAsync(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}

View File

@ -59,9 +59,7 @@ type BuildDeps struct {
TreeStorage storage.TreeStorage
}
func DeriveSyncTree(
ctx context.Context,
deps CreateDeps) (t tree.ObjectTree, err error) {
func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) {
t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage)
if err != nil {
return
@ -153,7 +151,6 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := storage.TreeStorageCreatePayload{
TreeId: id,
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
@ -173,10 +170,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
return buildSyncTree(ctx, true, deps)
}
func buildSyncTree(
ctx context.Context,
isFirstBuild bool,
deps BuildDeps) (t tree.ObjectTree, err error) {
func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tree.ObjectTree, err error) {
t, err = buildObjectTree(deps.TreeStorage, deps.AclList)
if err != nil {

View File

@ -2,6 +2,8 @@ package rpctest
import (
"context"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
@ -9,6 +11,48 @@ import (
"storj.io/drpc/drpcserver"
)
type SecConnMock struct {
net.Conn
localPrivKey crypto.PrivKey
remotePubKey crypto.PubKey
localId peer.ID
remoteId peer.ID
}
func (s *SecConnMock) LocalPeer() peer.ID {
return s.localId
}
func (s *SecConnMock) LocalPrivateKey() crypto.PrivKey {
return s.localPrivKey
}
func (s *SecConnMock) RemotePeer() peer.ID {
return s.remoteId
}
func (s *SecConnMock) RemotePublicKey() crypto.PubKey {
return s.remotePubKey
}
type ConnWrapper func(conn net.Conn) net.Conn
func NewSecConnWrapper(
localPrivKey crypto.PrivKey,
remotePubKey crypto.PubKey,
localId peer.ID,
remoteId peer.ID) ConnWrapper {
return func(conn net.Conn) net.Conn {
return &SecConnMock{
Conn: conn,
localPrivKey: localPrivKey,
remotePubKey: remotePubKey,
localId: localId,
remoteId: remoteId,
}
}
}
func NewTestServer() *TesServer {
ts := &TesServer{
Mux: drpcmux.New(),
@ -23,7 +67,17 @@ type TesServer struct {
}
func (ts *TesServer) Dial() drpc.Conn {
return ts.DialWrapConn(nil, nil)
}
func (ts *TesServer) DialWrapConn(serverWrapper ConnWrapper, clientWrapper ConnWrapper) drpc.Conn {
sc, cc := net.Pipe()
if serverWrapper != nil {
sc = serverWrapper(sc)
}
if clientWrapper != nil {
cc = clientWrapper(cc)
}
go ts.Server.ServeOne(context.Background(), sc)
return drpcconn.New(cc)
}

View File

@ -72,7 +72,6 @@ type inMemoryTreeStorage struct {
}
func NewInMemoryTreeStorage(
treeId string,
root *treechangeproto.RawTreeChangeWithId,
heads []string,
changes []*treechangeproto.RawTreeChangeWithId) (TreeStorage, error) {
@ -80,10 +79,10 @@ func NewInMemoryTreeStorage(
for _, ch := range changes {
allChanges[ch.Id] = ch
}
allChanges[treeId] = root
allChanges[root.Id] = root
return &inMemoryTreeStorage{
id: treeId,
id: root.Id,
root: root,
heads: heads,
changes: allChanges,
@ -159,12 +158,12 @@ func (i *inMemoryStorageProvider) TreeStorage(id string) (TreeStorage, error) {
func (i *inMemoryStorageProvider) CreateTreeStorage(payload TreeStorageCreatePayload) (TreeStorage, error) {
i.Lock()
defer i.Unlock()
res, err := NewInMemoryTreeStorage(payload.TreeId, payload.RootRawChange, payload.Heads, payload.Changes)
res, err := NewInMemoryTreeStorage(payload.RootRawChange, payload.Heads, payload.Changes)
if err != nil {
return nil, err
}
i.objects[payload.TreeId] = res
i.objects[payload.RootRawChange.Id] = res
return res, nil
}

View File

@ -10,7 +10,6 @@ var ErrTreeExists = errors.New("tree already exists")
var ErrUnkownChange = errors.New("change doesn't exist")
type TreeStorageCreatePayload struct {
TreeId string
RootRawChange *treechangeproto.RawTreeChangeWithId
Changes []*treechangeproto.RawTreeChangeWithId
Heads []string

View File

@ -3,7 +3,7 @@ package tree
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/testutils/acllistbuilder"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/stretchr/testify/assert"
@ -53,9 +53,9 @@ func (c *mockChangeCreator) createRaw(id, aclId, snapshotId string, isSnapshot b
}
}
func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage2.TreeStorage {
func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage.TreeStorage {
root := c.createRoot(treeId, aclHeadId)
treeStorage, _ := storage2.NewInMemoryTreeStorage(treeId, root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
treeStorage, _ := storage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root})
return treeStorage
}
@ -95,7 +95,7 @@ func (m *mockChangeValidator) ValidateFullTree(tree *Tree, aclList list.ACLList)
type testTreeContext struct {
aclList list.ACLList
treeStorage storage2.TreeStorage
treeStorage storage.TreeStorage
changeBuilder *mockChangeBuilder
changeCreator *mockChangeCreator
objTree ObjectTree

View File

@ -3,7 +3,7 @@ package tree
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/common"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list"
storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric"
@ -20,7 +20,7 @@ type ObjectTreeCreatePayload struct {
Identity []byte
}
func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (ObjectTree, error) {
func BuildObjectTree(treeStorage storage.TreeStorage, aclList list.ACLList) (ObjectTree, error) {
rootChange, err := treeStorage.Root()
if err != nil {
return nil, err
@ -32,14 +32,14 @@ func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (Ob
func CreateDerivedObjectTree(
payload ObjectTreeCreatePayload,
aclList list.ACLList,
createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
return createObjectTree(payload, 0, nil, aclList, createStorage)
}
func CreateObjectTree(
payload ObjectTreeCreatePayload,
aclList list.ACLList,
createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
bytes := make([]byte, 32)
_, err = rand.Read(bytes)
if err != nil {
@ -53,7 +53,7 @@ func createObjectTree(
timestamp int64,
seed []byte,
aclList list.ACLList,
createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) {
aclList.RLock()
aclHeadId := aclList.Head().Id
aclList.RUnlock()
@ -77,8 +77,7 @@ func createObjectTree(
}
// create storage
st, err := createStorage(storage2.TreeStorageCreatePayload{
TreeId: raw.Id,
st, err := createStorage(storage.TreeStorageCreatePayload{
RootRawChange: raw,
Changes: []*treechangeproto.RawTreeChangeWithId{raw},
Heads: []string{raw.Id},
@ -129,6 +128,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) {
objTree.id = objTree.treeStorage.Id()
objTree.id = objTree.treeStorage.Id()
objTree.root, err = objTree.treeStorage.Root()
if err != nil {
return nil, err

View File

@ -409,9 +409,13 @@ func (c *oCache) Close() (err error) {
}
c.closed = true
close(c.closeCh)
var toClose []*entry
var toClose, alreadyClosing []*entry
for _, e := range c.data {
toClose = append(toClose, e)
if e.isClosing {
alreadyClosing = append(alreadyClosing, e)
} else {
toClose = append(toClose, e)
}
}
c.mu.Unlock()
for _, e := range toClose {
@ -422,5 +426,8 @@ func (c *oCache) Close() (err error) {
}
}
}
for _, e := range alreadyClosing {
<-e.close
}
return nil
}

View File

@ -62,5 +62,17 @@ func (s spaceKeys) HeaderKey() []byte {
}
func isRootIdKey(key string) bool {
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "heads")
return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads")
}
func getRootId(key string) string {
prefixLen := 2 // len("t/")
suffixLen := 6 // len("/heads")
rootLen := len(key) - suffixLen - prefixLen
sBuf := strings.Builder{}
sBuf.Grow(rootLen)
for i := prefixLen; i < prefixLen+rootLen; i++ {
sBuf.WriteByte(key[i])
}
return sBuf.String()
}

View File

@ -0,0 +1,70 @@
package storage
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/stretchr/testify/require"
"testing"
)
func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) {
require.Equal(t, store.Id(), root.Id)
aclRoot, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, aclRoot)
aclHead, err := store.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
}
func TestListStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"}
listStore, err := createListStorage(fx.db, aclRoot)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
t.Run("create same list storage returns nil", func(t *testing.T) {
// this is ok, because we only create new list storage when we create space storage
listStore, err := createListStorage(fx.db, aclRoot)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
})
}
func TestListStorage_Methods(t *testing.T) {
fx := newFixture(t)
fx.open(t)
aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"}
_, err := createListStorage(fx.db, aclRoot)
require.NoError(t, err)
fx.stop(t)
fx.open(t)
defer fx.stop(t)
listStore, err := newListStorage(fx.db)
require.NoError(t, err)
testList(t, listStore, aclRoot, aclRoot.Id)
t.Run("set head", func(t *testing.T) {
head := "newHead"
require.NoError(t, listStore.SetHead(head))
aclHead, err := listStore.Head()
require.NoError(t, err)
require.Equal(t, head, aclHead)
})
t.Run("add raw record and get raw record", func(t *testing.T) {
newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"}
require.NoError(t, listStore.AddRawRecord(context.Background(), newRec))
aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id)
require.NoError(t, err)
require.Equal(t, newRec, aclRec)
})
}

View File

@ -5,7 +5,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"go.uber.org/zap"
"path"
"sync"
@ -88,8 +88,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate
}
defer func() {
log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage")
if err != nil {
log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage")
db.Close()
}
}()
@ -156,13 +156,13 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI
func (s *spaceStorage) StoredIds() (ids []string, err error) {
index := s.objDb.Items()
key, val, err := index.Next()
key, _, err := index.Next()
for err == nil {
strKey := string(key)
if isRootIdKey(strKey) {
ids = append(ids, string(val))
ids = append(ids, getRootId(strKey))
}
key, val, err = index.Next()
key, _, err = index.Next()
}
if err != pogreb.ErrIterationDone {

View File

@ -0,0 +1,107 @@
package storage
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto"
"github.com/stretchr/testify/require"
"os"
"strconv"
"testing"
)
func spaceTestPayload() spacestorage.SpaceStorageCreatePayload {
header := &spacesyncproto.RawSpaceHeaderWithId{
RawHeader: []byte("header"),
Id: "headerId",
}
aclRoot := &aclrecordproto.RawACLRecordWithId{
Payload: []byte("aclRoot"),
Id: "aclRootId",
}
return spacestorage.SpaceStorageCreatePayload{
RecWithId: aclRoot,
SpaceHeaderWithId: header,
}
}
func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) {
header, err := store.SpaceHeader()
require.NoError(t, err)
require.Equal(t, payload.SpaceHeaderWithId, header)
aclStorage, err := store.ACLStorage()
require.NoError(t, err)
testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id)
}
func TestSpaceStorage_Create(t *testing.T) {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
payload := spaceTestPayload()
store, err := createSpaceStorage(dir, payload)
require.NoError(t, err)
testSpace(t, store, payload)
require.NoError(t, store.Close())
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createSpaceStorage(dir, payload)
require.Error(t, err)
})
}
func TestSpaceStorage_NewAndCreateTree(t *testing.T) {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
payload := spaceTestPayload()
store, err := createSpaceStorage(dir, payload)
require.NoError(t, err)
require.NoError(t, store.Close())
store, err = newSpaceStorage(dir, payload.SpaceHeaderWithId.Id)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()
testSpace(t, store, payload)
t.Run("create tree and get tree", func(t *testing.T) {
payload := treeTestPayload()
treeStore, err := store.CreateTreeStorage(payload)
require.NoError(t, err)
testTreePayload(t, treeStore, payload)
otherStore, err := store.TreeStorage(payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, otherStore, payload)
})
}
func TestSpaceStorage_StoredIds(t *testing.T) {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
payload := spaceTestPayload()
store, err := createSpaceStorage(dir, payload)
require.NoError(t, err)
defer func() {
require.NoError(t, store.Close())
}()
n := 5
var ids []string
for i := 0; i < n; i++ {
treePayload := treeTestPayload()
treePayload.RootRawChange.Id += strconv.Itoa(i)
ids = append(ids, treePayload.RootRawChange.Id)
_, err := store.CreateTreeStorage(treePayload)
require.NoError(t, err)
}
storedIds, err := store.StoredIds()
require.NoError(t, err)
require.Equal(t, ids, storedIds)
}

View File

@ -48,7 +48,7 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err e
}
func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) {
keys := newTreeKeys(payload.TreeId)
keys := newTreeKeys(payload.RootRawChange.Id)
has, err := db.Has(keys.HeadsKey())
if err != nil {
return

View File

@ -0,0 +1,121 @@
package storage
import (
"context"
"github.com/akrylysov/pogreb"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/stretchr/testify/require"
"os"
"testing"
)
func treeTestPayload() storage.TreeStorageCreatePayload {
rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "rootId"}
otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"}
changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange}
return storage.TreeStorageCreatePayload{
RootRawChange: rootRawChange,
Changes: changes,
Heads: []string{rootRawChange.Id},
}
}
type fixture struct {
dir string
db *pogreb.DB
}
func newFixture(t *testing.T) *fixture {
dir, err := os.MkdirTemp("", "")
require.NoError(t, err)
return &fixture{dir: dir}
}
func (fx *fixture) open(t *testing.T) {
var err error
fx.db, err = pogreb.Open(fx.dir, nil)
require.NoError(t, err)
}
func (fx *fixture) stop(t *testing.T) {
require.NoError(t, fx.db.Close())
}
func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) {
require.Equal(t, payload.RootRawChange.Id, store.Id())
root, err := store.Root()
require.NoError(t, err)
require.Equal(t, root, payload.RootRawChange)
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, payload.Heads, heads)
for _, ch := range payload.Changes {
dbCh, err := store.GetRawChange(context.Background(), ch.Id)
require.NoError(t, err)
require.Equal(t, ch, dbCh)
}
return
}
func TestTreeStorage_Create(t *testing.T) {
fx := newFixture(t)
fx.open(t)
defer fx.stop(t)
payload := treeTestPayload()
store, err := createTreeStorage(fx.db, payload)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("create same storage returns error", func(t *testing.T) {
_, err := createTreeStorage(fx.db, payload)
require.Error(t, err)
})
}
func TestTreeStorage_Methods(t *testing.T) {
fx := newFixture(t)
fx.open(t)
payload := treeTestPayload()
_, err := createTreeStorage(fx.db, payload)
require.NoError(t, err)
fx.stop(t)
fx.open(t)
defer fx.stop(t)
store, err := newTreeStorage(fx.db, payload.RootRawChange.Id)
require.NoError(t, err)
testTreePayload(t, store, payload)
t.Run("update heads", func(t *testing.T) {
newHeads := []string{"a", "b"}
require.NoError(t, store.SetHeads(newHeads))
heads, err := store.Heads()
require.NoError(t, err)
require.Equal(t, newHeads, heads)
})
t.Run("add raw change, get change and has change", func(t *testing.T) {
newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"}
require.NoError(t, store.AddRawChange(newChange))
rawCh, err := store.GetRawChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.Equal(t, newChange, rawCh)
has, err := store.HasChange(context.Background(), newChange.Id)
require.NoError(t, err)
require.True(t, has)
})
t.Run("get and has for unknown change", func(t *testing.T) {
incorrectId := "incorrectId"
_, err := store.GetRawChange(context.Background(), incorrectId)
require.Error(t, err)
has, err := store.HasChange(context.Background(), incorrectId)
require.NoError(t, err)
require.False(t, has)
})
}