diff --git a/client/storage/liststorage.go b/client/storage/liststorage.go index ea6fb886..fbe10d35 100644 --- a/client/storage/liststorage.go +++ b/client/storage/liststorage.go @@ -37,7 +37,7 @@ func newListStorage(spaceId string, db *badger.DB, txn *badger.Txn) (ls storage. ls = &listStorage{ db: db, - keys: newACLKeys(spaceId), + keys: keys, id: stringId, root: rootWithId, } @@ -70,7 +70,7 @@ func createListStorage(spaceId string, db *badger.DB, txn *badger.Txn, root *acl ls = &listStorage{ db: db, - keys: newACLKeys(spaceId), + keys: keys, id: root.Id, root: root, } diff --git a/client/storage/liststorage_test.go b/client/storage/liststorage_test.go new file mode 100644 index 00000000..4c4df686 --- /dev/null +++ b/client/storage/liststorage_test.go @@ -0,0 +1,72 @@ +package storage + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "testing" +) + +func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) { + require.Equal(t, store.Id(), root.Id) + + aclRoot, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, aclRoot) + + aclHead, err := store.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) +} + +func TestListStorage(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + spaceId := "spaceId" + aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"} + + fx.db.Update(func(txn *badger.Txn) error { + _, err := createListStorage(spaceId, fx.db, txn, aclRoot) + require.NoError(t, err) + return nil + }) + + var listStore storage.ListStorage + fx.db.View(func(txn *badger.Txn) (err error) { + listStore, err = newListStorage(spaceId, fx.db, txn) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + return nil + }) + + t.Run("create same storage returns no error", func(t *testing.T) { + fx.db.View(func(txn *badger.Txn) error { + // this is ok, because we only create new list storage when we create space storage + listStore, err := createListStorage(spaceId, fx.db, txn, aclRoot) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + return nil + }) + }) + + t.Run("set head", func(t *testing.T) { + head := "newHead" + require.NoError(t, listStore.SetHead(head)) + aclHead, err := listStore.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) + }) + + t.Run("add raw record and get raw record", func(t *testing.T) { + newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"} + require.NoError(t, listStore.AddRawRecord(context.Background(), newRec)) + aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id) + require.NoError(t, err) + require.Equal(t, newRec, aclRec) + }) +} diff --git a/client/storage/spacestorage_test.go b/client/storage/spacestorage_test.go new file mode 100644 index 00000000..8540d338 --- /dev/null +++ b/client/storage/spacestorage_test.go @@ -0,0 +1,108 @@ +package storage + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/stretchr/testify/require" + "strconv" + "testing" +) + +func spaceTestPayload() spacestorage.SpaceStorageCreatePayload { + header := &spacesyncproto.RawSpaceHeaderWithId{ + RawHeader: []byte("header"), + Id: "headerId", + } + aclRoot := &aclrecordproto.RawACLRecordWithId{ + Payload: []byte("aclRoot"), + Id: "aclRootId", + } + return spacestorage.SpaceStorageCreatePayload{ + RecWithId: aclRoot, + SpaceHeaderWithId: header, + } +} + +func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) { + header, err := store.SpaceHeader() + require.NoError(t, err) + require.Equal(t, payload.SpaceHeaderWithId, header) + + aclStorage, err := store.ACLStorage() + require.NoError(t, err) + testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id) +} + +func TestSpaceStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + payload := spaceTestPayload() + store, err := createSpaceStorage(fx.db, payload) + require.NoError(t, err) + + testSpace(t, store, payload) + require.NoError(t, store.Close()) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createSpaceStorage(fx.db, payload) + require.Error(t, err) + }) +} + +func TestSpaceStorage_NewAndCreateTree(t *testing.T) { + fx := newFixture(t) + fx.open(t) + + payload := spaceTestPayload() + store, err := createSpaceStorage(fx.db, payload) + require.NoError(t, err) + require.NoError(t, store.Close()) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + store, err = newSpaceStorage(fx.db, payload.SpaceHeaderWithId.Id) + require.NoError(t, err) + testSpace(t, store, payload) + + t.Run("create tree and get tree", func(t *testing.T) { + payload := treeTestPayload() + treeStore, err := store.CreateTreeStorage(payload) + require.NoError(t, err) + testTreePayload(t, treeStore, payload) + + otherStore, err := store.TreeStorage(payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, otherStore, payload) + }) +} + +func TestSpaceStorage_StoredIds(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + payload := spaceTestPayload() + store, err := createSpaceStorage(fx.db, payload) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + n := 5 + var ids []string + for i := 0; i < n; i++ { + treePayload := treeTestPayload() + treePayload.RootRawChange.Id += strconv.Itoa(i) + ids = append(ids, treePayload.RootRawChange.Id) + _, err := store.CreateTreeStorage(treePayload) + require.NoError(t, err) + } + + storedIds, err := store.StoredIds() + require.NoError(t, err) + require.Equal(t, ids, storedIds) +} diff --git a/client/storage/treestorage.go b/client/storage/treestorage.go index 8c11f003..a8f9abf8 100644 --- a/client/storage/treestorage.go +++ b/client/storage/treestorage.go @@ -44,7 +44,7 @@ func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage.TreeStora } func createTreeStorage(db *badger.DB, spaceId string, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { - keys := newTreeKeys(spaceId, payload.TreeId) + keys := newTreeKeys(spaceId, payload.RootRawChange.Id) if hasDB(db, keys.RootIdKey()) { err = storage.ErrTreeExists return diff --git a/client/storage/treestorage_test.go b/client/storage/treestorage_test.go new file mode 100644 index 00000000..84b422d5 --- /dev/null +++ b/client/storage/treestorage_test.go @@ -0,0 +1,123 @@ +package storage + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func treeTestPayload() storage.TreeStorageCreatePayload { + rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "someRootId"} + otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"} + changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange} + return storage.TreeStorageCreatePayload{ + RootRawChange: rootRawChange, + Changes: changes, + Heads: []string{rootRawChange.Id}, + } +} + +type fixture struct { + dir string + db *badger.DB +} + +func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) { + require.Equal(t, payload.RootRawChange.Id, store.Id()) + + root, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, payload.RootRawChange) + + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, payload.Heads, heads) + + for _, ch := range payload.Changes { + dbCh, err := store.GetRawChange(context.Background(), ch.Id) + require.NoError(t, err) + require.Equal(t, ch, dbCh) + } + return +} + +func newFixture(t *testing.T) *fixture { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + return &fixture{dir: dir} +} + +func (fx *fixture) open(t *testing.T) { + var err error + fx.db, err = badger.Open(badger.DefaultOptions(fx.dir)) + require.NoError(t, err) +} + +func (fx *fixture) stop(t *testing.T) { + require.NoError(t, fx.db.Close()) +} + +func TestTreeStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + spaceId := "spaceId" + payload := treeTestPayload() + store, err := createTreeStorage(fx.db, spaceId, payload) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createTreeStorage(fx.db, spaceId, payload) + require.Error(t, err) + }) +} + +func TestTreeStorage_Methods(t *testing.T) { + fx := newFixture(t) + fx.open(t) + payload := treeTestPayload() + spaceId := "spaceId" + _, err := createTreeStorage(fx.db, spaceId, payload) + require.NoError(t, err) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + store, err := newTreeStorage(fx.db, spaceId, payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("update heads", func(t *testing.T) { + newHeads := []string{"a", "b"} + require.NoError(t, store.SetHeads(newHeads)) + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, newHeads, heads) + }) + + t.Run("add raw change, get change and has change", func(t *testing.T) { + newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"} + require.NoError(t, store.AddRawChange(newChange)) + rawCh, err := store.GetRawChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.Equal(t, newChange, rawCh) + has, err := store.HasChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.True(t, has) + }) + + t.Run("get and has for unknown change", func(t *testing.T) { + incorrectId := "incorrectId" + _, err := store.GetRawChange(context.Background(), incorrectId) + require.Error(t, err) + has, err := store.HasChange(context.Background(), incorrectId) + require.NoError(t, err) + require.False(t, has) + }) +} diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 489d561f..e807ae14 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -245,7 +245,8 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre Loop: for { - msg, err := stream.Recv() + var msg *spacesyncproto.ObjectSyncMessage + msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { break @@ -260,7 +261,8 @@ Loop: limiter <- struct{}{} }() } - return s.removePeer(peerId) + s.removePeer(peerId) + return } func (s *streamPool) removePeer(peerId string) (err error) { diff --git a/common/commonspace/syncservice/streampool_test.go b/common/commonspace/syncservice/streampool_test.go new file mode 100644 index 00000000..47ed4ee0 --- /dev/null +++ b/common/commonspace/syncservice/streampool_test.go @@ -0,0 +1,322 @@ +package syncservice + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type testServer struct { + stream chan spacesyncproto.DRPCSpace_StreamStream + addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error + addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error + releaseStream chan error + watchErrOnce bool +} + +func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { + panic("implement me") +} + +func (t *testServer) PushSpace(ctx context.Context, request *spacesyncproto.PushSpaceRequest) (*spacesyncproto.PushSpaceResponse, error) { + panic("implement me") +} + +func (t *testServer) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { + t.stream <- stream + return <-t.releaseStream +} + +func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpace_StreamStream { + select { + case <-time.After(time.Second * 5): + test.Fatalf("waiteStream timeout") + case st := <-t.stream: + return st + } + return nil +} + +type fixture struct { + testServer *testServer + drpcTS *rpctest.TesServer + client spacesyncproto.DRPCSpaceClient + clientStream spacesyncproto.DRPCSpace_StreamStream + serverStream spacesyncproto.DRPCSpace_StreamStream + pool *streamPool + localId peer.ID + remoteId peer.ID +} + +func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler) *fixture { + fx := &fixture{ + testServer: &testServer{}, + drpcTS: rpctest.NewTestServer(), + localId: localId, + remoteId: remoteId, + } + fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1) + require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer)) + clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId) + fx.client = spacesyncproto.NewDRPCSpaceClient(fx.drpcTS.DialWrapConn(nil, clientWrapper)) + + var err error + fx.clientStream, err = fx.client.Stream(context.Background()) + require.NoError(t, err) + fx.serverStream = fx.testServer.waitStream(t) + fx.pool = newStreamPool(handler).(*streamPool) + + return fx +} + +func (fx *fixture) run(t *testing.T) chan error { + waitCh := make(chan error) + go func() { + err := fx.pool.AddAndReadStreamSync(fx.clientStream) + waitCh <- err + }() + + time.Sleep(time.Millisecond * 10) + fx.pool.Lock() + require.Equal(t, fx.pool.peerStreams[fx.remoteId.String()], fx.clientStream) + fx.pool.Unlock() + + return waitCh +} + +func TestStreamPool_AddAndReadStreamAsync(t *testing.T) { + remId := peer.ID("remoteId") + + t.Run("client close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + + err := fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) + t.Run("server close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + + err := fx.serverStream.Close() + require.NoError(t, err) + + err = <-waitCh + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_Close(t *testing.T) { + remId := peer.ID("remoteId") + + t.Run("client close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + fx.run(t) + var events []string + recvChan := make(chan struct{}) + go func() { + fx.pool.Close() + events = append(events, "pool_close") + recvChan <- struct{}{} + }() + time.Sleep(50 * time.Millisecond) //err = <-waitCh + events = append(events, "stream_close") + err := fx.clientStream.Close() + require.NoError(t, err) + <-recvChan + require.Equal(t, []string{"stream_close", "pool_close"}, events) + }) + t.Run("server close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + fx.run(t) + var events []string + recvChan := make(chan struct{}) + go func() { + fx.pool.Close() + events = append(events, "pool_close") + recvChan <- struct{}{} + }() + time.Sleep(50 * time.Millisecond) //err = <-waitCh + events = append(events, "stream_close") + err := fx.clientStream.Close() + require.NoError(t, err) + <-recvChan + require.Equal(t, []string{"stream_close", "pool_close"}, events) + }) +} + +func TestStreamPool_ReceiveMessage(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool receive message from server", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + recvChan := make(chan struct{}) + fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + require.Equal(t, msg, message) + recvChan <- struct{}{} + return nil + }) + waitCh := fx.run(t) + + err := fx.serverStream.Send(msg) + require.NoError(t, err) + <-recvChan + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_HasActiveStream(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool has active stream", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + require.True(t, fx.pool.HasActiveStream(remId.String())) + + err := fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) + t.Run("pool has no active stream", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + err := fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + require.Error(t, err) + require.False(t, fx.pool.HasActiveStream(remId.String())) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_SendAsync(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool send async to server", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + recvChan := make(chan struct{}) + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg, message) + recvChan <- struct{}{} + }() + waitCh := fx.run(t) + + err := fx.pool.SendAsync([]string{remId.String()}, msg) + require.NoError(t, err) + <-recvChan + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_SendSync(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool send sync to server", func(t *testing.T) { + objectId := "objectId" + payload := []byte("payload") + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg.ObjectId, message.ObjectId) + require.NotEmpty(t, message.ReplyId) + message.Payload = payload + err = fx.serverStream.Send(message) + require.NoError(t, err) + }() + waitCh := fx.run(t) + res, err := fx.pool.SendSync(remId.String(), msg) + require.NoError(t, err) + require.Equal(t, payload, res.Payload) + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) + + t.Run("pool send sync timeout", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + syncWaitPeriod = time.Millisecond * 30 + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg.ObjectId, message.ObjectId) + require.NotEmpty(t, message.ReplyId) + }() + waitCh := fx.run(t) + _, err := fx.pool.SendSync(remId.String(), msg) + require.Equal(t, ErrSyncTimeout, err) + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_BroadcastAsync(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool broadcast async to server", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + recvChan := make(chan struct{}) + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg, message) + recvChan <- struct{}{} + }() + waitCh := fx.run(t) + + err := fx.pool.BroadcastAsync(msg) + require.NoError(t, err) + <-recvChan + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index a3aba77c..ac7b5aac 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -59,9 +59,7 @@ type BuildDeps struct { TreeStorage storage.TreeStorage } -func DeriveSyncTree( - ctx context.Context, - deps CreateDeps) (t tree.ObjectTree, err error) { +func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return @@ -153,7 +151,6 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t fullSyncResp := resp.GetContent().GetFullSyncResponse() payload := storage.TreeStorageCreatePayload{ - TreeId: id, RootRawChange: resp.RootChange, Changes: fullSyncResp.Changes, Heads: fullSyncResp.Heads, @@ -173,10 +170,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t return buildSyncTree(ctx, true, deps) } -func buildSyncTree( - ctx context.Context, - isFirstBuild bool, - deps BuildDeps) (t tree.ObjectTree, err error) { +func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tree.ObjectTree, err error) { t, err = buildObjectTree(deps.TreeStorage, deps.AclList) if err != nil { diff --git a/common/net/rpc/rpctest/server.go b/common/net/rpc/rpctest/server.go index 270067e4..134ce6cb 100644 --- a/common/net/rpc/rpctest/server.go +++ b/common/net/rpc/rpctest/server.go @@ -2,6 +2,8 @@ package rpctest import ( "context" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" "net" "storj.io/drpc" "storj.io/drpc/drpcconn" @@ -9,6 +11,48 @@ import ( "storj.io/drpc/drpcserver" ) +type SecConnMock struct { + net.Conn + localPrivKey crypto.PrivKey + remotePubKey crypto.PubKey + localId peer.ID + remoteId peer.ID +} + +func (s *SecConnMock) LocalPeer() peer.ID { + return s.localId +} + +func (s *SecConnMock) LocalPrivateKey() crypto.PrivKey { + return s.localPrivKey +} + +func (s *SecConnMock) RemotePeer() peer.ID { + return s.remoteId +} + +func (s *SecConnMock) RemotePublicKey() crypto.PubKey { + return s.remotePubKey +} + +type ConnWrapper func(conn net.Conn) net.Conn + +func NewSecConnWrapper( + localPrivKey crypto.PrivKey, + remotePubKey crypto.PubKey, + localId peer.ID, + remoteId peer.ID) ConnWrapper { + return func(conn net.Conn) net.Conn { + return &SecConnMock{ + Conn: conn, + localPrivKey: localPrivKey, + remotePubKey: remotePubKey, + localId: localId, + remoteId: remoteId, + } + } +} + func NewTestServer() *TesServer { ts := &TesServer{ Mux: drpcmux.New(), @@ -23,7 +67,17 @@ type TesServer struct { } func (ts *TesServer) Dial() drpc.Conn { + return ts.DialWrapConn(nil, nil) +} + +func (ts *TesServer) DialWrapConn(serverWrapper ConnWrapper, clientWrapper ConnWrapper) drpc.Conn { sc, cc := net.Pipe() + if serverWrapper != nil { + sc = serverWrapper(sc) + } + if clientWrapper != nil { + cc = clientWrapper(cc) + } go ts.Server.ServeOne(context.Background(), sc) return drpcconn.New(cc) } diff --git a/common/pkg/acl/storage/inmemory.go b/common/pkg/acl/storage/inmemory.go index e9834f9a..318e183a 100644 --- a/common/pkg/acl/storage/inmemory.go +++ b/common/pkg/acl/storage/inmemory.go @@ -72,7 +72,6 @@ type inMemoryTreeStorage struct { } func NewInMemoryTreeStorage( - treeId string, root *treechangeproto.RawTreeChangeWithId, heads []string, changes []*treechangeproto.RawTreeChangeWithId) (TreeStorage, error) { @@ -80,10 +79,10 @@ func NewInMemoryTreeStorage( for _, ch := range changes { allChanges[ch.Id] = ch } - allChanges[treeId] = root + allChanges[root.Id] = root return &inMemoryTreeStorage{ - id: treeId, + id: root.Id, root: root, heads: heads, changes: allChanges, @@ -159,12 +158,12 @@ func (i *inMemoryStorageProvider) TreeStorage(id string) (TreeStorage, error) { func (i *inMemoryStorageProvider) CreateTreeStorage(payload TreeStorageCreatePayload) (TreeStorage, error) { i.Lock() defer i.Unlock() - res, err := NewInMemoryTreeStorage(payload.TreeId, payload.RootRawChange, payload.Heads, payload.Changes) + res, err := NewInMemoryTreeStorage(payload.RootRawChange, payload.Heads, payload.Changes) if err != nil { return nil, err } - i.objects[payload.TreeId] = res + i.objects[payload.RootRawChange.Id] = res return res, nil } diff --git a/common/pkg/acl/storage/provider.go b/common/pkg/acl/storage/provider.go index da46ab41..51b53b4f 100644 --- a/common/pkg/acl/storage/provider.go +++ b/common/pkg/acl/storage/provider.go @@ -10,7 +10,6 @@ var ErrTreeExists = errors.New("tree already exists") var ErrUnkownChange = errors.New("change doesn't exist") type TreeStorageCreatePayload struct { - TreeId string RootRawChange *treechangeproto.RawTreeChangeWithId Changes []*treechangeproto.RawTreeChangeWithId Heads []string diff --git a/common/pkg/acl/tree/objecttree_test.go b/common/pkg/acl/tree/objecttree_test.go index e2a2b028..da9f2364 100644 --- a/common/pkg/acl/tree/objecttree_test.go +++ b/common/pkg/acl/tree/objecttree_test.go @@ -3,7 +3,7 @@ package tree import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/testutils/acllistbuilder" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/stretchr/testify/assert" @@ -53,9 +53,9 @@ func (c *mockChangeCreator) createRaw(id, aclId, snapshotId string, isSnapshot b } } -func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage2.TreeStorage { +func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage.TreeStorage { root := c.createRoot(treeId, aclHeadId) - treeStorage, _ := storage2.NewInMemoryTreeStorage(treeId, root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root}) + treeStorage, _ := storage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root}) return treeStorage } @@ -95,7 +95,7 @@ func (m *mockChangeValidator) ValidateFullTree(tree *Tree, aclList list.ACLList) type testTreeContext struct { aclList list.ACLList - treeStorage storage2.TreeStorage + treeStorage storage.TreeStorage changeBuilder *mockChangeBuilder changeCreator *mockChangeCreator objTree ObjectTree diff --git a/common/pkg/acl/tree/objecttreefactory.go b/common/pkg/acl/tree/objecttreefactory.go index c148d930..26912377 100644 --- a/common/pkg/acl/tree/objecttreefactory.go +++ b/common/pkg/acl/tree/objecttreefactory.go @@ -3,7 +3,7 @@ package tree import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/common" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric" @@ -20,7 +20,7 @@ type ObjectTreeCreatePayload struct { Identity []byte } -func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (ObjectTree, error) { +func BuildObjectTree(treeStorage storage.TreeStorage, aclList list.ACLList) (ObjectTree, error) { rootChange, err := treeStorage.Root() if err != nil { return nil, err @@ -32,14 +32,14 @@ func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (Ob func CreateDerivedObjectTree( payload ObjectTreeCreatePayload, aclList list.ACLList, - createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { + createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { return createObjectTree(payload, 0, nil, aclList, createStorage) } func CreateObjectTree( payload ObjectTreeCreatePayload, aclList list.ACLList, - createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { + createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { bytes := make([]byte, 32) _, err = rand.Read(bytes) if err != nil { @@ -53,7 +53,7 @@ func createObjectTree( timestamp int64, seed []byte, aclList list.ACLList, - createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { + createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { aclList.RLock() aclHeadId := aclList.Head().Id aclList.RUnlock() @@ -77,8 +77,7 @@ func createObjectTree( } // create storage - st, err := createStorage(storage2.TreeStorageCreatePayload{ - TreeId: raw.Id, + st, err := createStorage(storage.TreeStorageCreatePayload{ RootRawChange: raw, Changes: []*treechangeproto.RawTreeChangeWithId{raw}, Heads: []string{raw.Id}, @@ -129,6 +128,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { objTree.id = objTree.treeStorage.Id() + objTree.id = objTree.treeStorage.Id() objTree.root, err = objTree.treeStorage.Root() if err != nil { return nil, err diff --git a/common/pkg/ocache/ocache.go b/common/pkg/ocache/ocache.go index 41555e42..629a5864 100644 --- a/common/pkg/ocache/ocache.go +++ b/common/pkg/ocache/ocache.go @@ -409,9 +409,13 @@ func (c *oCache) Close() (err error) { } c.closed = true close(c.closeCh) - var toClose []*entry + var toClose, alreadyClosing []*entry for _, e := range c.data { - toClose = append(toClose, e) + if e.isClosing { + alreadyClosing = append(alreadyClosing, e) + } else { + toClose = append(toClose, e) + } } c.mu.Unlock() for _, e := range toClose { @@ -422,5 +426,8 @@ func (c *oCache) Close() (err error) { } } } + for _, e := range alreadyClosing { + <-e.close + } return nil } diff --git a/node/storage/keys.go b/node/storage/keys.go index 85dd46aa..e21a7140 100644 --- a/node/storage/keys.go +++ b/node/storage/keys.go @@ -62,5 +62,17 @@ func (s spaceKeys) HeaderKey() []byte { } func isRootIdKey(key string) bool { - return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "heads") + return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads") +} + +func getRootId(key string) string { + prefixLen := 2 // len("t/") + suffixLen := 6 // len("/heads") + rootLen := len(key) - suffixLen - prefixLen + sBuf := strings.Builder{} + sBuf.Grow(rootLen) + for i := prefixLen; i < prefixLen+rootLen; i++ { + sBuf.WriteByte(key[i]) + } + return sBuf.String() } diff --git a/node/storage/liststorage_test.go b/node/storage/liststorage_test.go new file mode 100644 index 00000000..4df253d2 --- /dev/null +++ b/node/storage/liststorage_test.go @@ -0,0 +1,70 @@ +package storage + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/stretchr/testify/require" + "testing" +) + +func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) { + require.Equal(t, store.Id(), root.Id) + + aclRoot, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, aclRoot) + + aclHead, err := store.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) +} + +func TestListStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"} + listStore, err := createListStorage(fx.db, aclRoot) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + t.Run("create same list storage returns nil", func(t *testing.T) { + // this is ok, because we only create new list storage when we create space storage + listStore, err := createListStorage(fx.db, aclRoot) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + }) +} + +func TestListStorage_Methods(t *testing.T) { + fx := newFixture(t) + fx.open(t) + aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"} + _, err := createListStorage(fx.db, aclRoot) + require.NoError(t, err) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + listStore, err := newListStorage(fx.db) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + t.Run("set head", func(t *testing.T) { + head := "newHead" + require.NoError(t, listStore.SetHead(head)) + aclHead, err := listStore.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) + }) + + t.Run("add raw record and get raw record", func(t *testing.T) { + newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"} + require.NoError(t, listStore.AddRawRecord(context.Background(), newRec)) + aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id) + require.NoError(t, err) + require.Equal(t, newRec, aclRec) + }) +} diff --git a/node/storage/spacestorage.go b/node/storage/spacestorage.go index b15e1a4d..018c6b04 100644 --- a/node/storage/spacestorage.go +++ b/node/storage/spacestorage.go @@ -5,7 +5,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" - storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "go.uber.org/zap" "path" "sync" @@ -88,8 +88,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate } defer func() { - log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage") if err != nil { + log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage") db.Close() } }() @@ -156,13 +156,13 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI func (s *spaceStorage) StoredIds() (ids []string, err error) { index := s.objDb.Items() - key, val, err := index.Next() + key, _, err := index.Next() for err == nil { strKey := string(key) if isRootIdKey(strKey) { - ids = append(ids, string(val)) + ids = append(ids, getRootId(strKey)) } - key, val, err = index.Next() + key, _, err = index.Next() } if err != pogreb.ErrIterationDone { diff --git a/node/storage/spacestorage_test.go b/node/storage/spacestorage_test.go new file mode 100644 index 00000000..33243e0a --- /dev/null +++ b/node/storage/spacestorage_test.go @@ -0,0 +1,107 @@ +package storage + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/stretchr/testify/require" + "os" + "strconv" + "testing" +) + +func spaceTestPayload() spacestorage.SpaceStorageCreatePayload { + header := &spacesyncproto.RawSpaceHeaderWithId{ + RawHeader: []byte("header"), + Id: "headerId", + } + aclRoot := &aclrecordproto.RawACLRecordWithId{ + Payload: []byte("aclRoot"), + Id: "aclRootId", + } + return spacestorage.SpaceStorageCreatePayload{ + RecWithId: aclRoot, + SpaceHeaderWithId: header, + } +} + +func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) { + header, err := store.SpaceHeader() + require.NoError(t, err) + require.Equal(t, payload.SpaceHeaderWithId, header) + + aclStorage, err := store.ACLStorage() + require.NoError(t, err) + testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id) +} + +func TestSpaceStorage_Create(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + payload := spaceTestPayload() + store, err := createSpaceStorage(dir, payload) + require.NoError(t, err) + + testSpace(t, store, payload) + require.NoError(t, store.Close()) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createSpaceStorage(dir, payload) + require.Error(t, err) + }) +} + +func TestSpaceStorage_NewAndCreateTree(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + payload := spaceTestPayload() + store, err := createSpaceStorage(dir, payload) + require.NoError(t, err) + require.NoError(t, store.Close()) + + store, err = newSpaceStorage(dir, payload.SpaceHeaderWithId.Id) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + testSpace(t, store, payload) + + t.Run("create tree and get tree", func(t *testing.T) { + payload := treeTestPayload() + treeStore, err := store.CreateTreeStorage(payload) + require.NoError(t, err) + testTreePayload(t, treeStore, payload) + + otherStore, err := store.TreeStorage(payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, otherStore, payload) + }) +} + +func TestSpaceStorage_StoredIds(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + payload := spaceTestPayload() + store, err := createSpaceStorage(dir, payload) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + n := 5 + var ids []string + for i := 0; i < n; i++ { + treePayload := treeTestPayload() + treePayload.RootRawChange.Id += strconv.Itoa(i) + ids = append(ids, treePayload.RootRawChange.Id) + _, err := store.CreateTreeStorage(treePayload) + require.NoError(t, err) + } + + storedIds, err := store.StoredIds() + require.NoError(t, err) + require.Equal(t, ids, storedIds) +} diff --git a/node/storage/treestorage.go b/node/storage/treestorage.go index f35872ec..77234d2f 100644 --- a/node/storage/treestorage.go +++ b/node/storage/treestorage.go @@ -48,7 +48,7 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err e } func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { - keys := newTreeKeys(payload.TreeId) + keys := newTreeKeys(payload.RootRawChange.Id) has, err := db.Has(keys.HeadsKey()) if err != nil { return diff --git a/node/storage/treestorage_test.go b/node/storage/treestorage_test.go new file mode 100644 index 00000000..00cce688 --- /dev/null +++ b/node/storage/treestorage_test.go @@ -0,0 +1,121 @@ +package storage + +import ( + "context" + "github.com/akrylysov/pogreb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func treeTestPayload() storage.TreeStorageCreatePayload { + rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "rootId"} + otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"} + changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange} + return storage.TreeStorageCreatePayload{ + RootRawChange: rootRawChange, + Changes: changes, + Heads: []string{rootRawChange.Id}, + } +} + +type fixture struct { + dir string + db *pogreb.DB +} + +func newFixture(t *testing.T) *fixture { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + return &fixture{dir: dir} +} + +func (fx *fixture) open(t *testing.T) { + var err error + fx.db, err = pogreb.Open(fx.dir, nil) + require.NoError(t, err) +} + +func (fx *fixture) stop(t *testing.T) { + require.NoError(t, fx.db.Close()) +} + +func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) { + require.Equal(t, payload.RootRawChange.Id, store.Id()) + + root, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, payload.RootRawChange) + + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, payload.Heads, heads) + + for _, ch := range payload.Changes { + dbCh, err := store.GetRawChange(context.Background(), ch.Id) + require.NoError(t, err) + require.Equal(t, ch, dbCh) + } + return +} + +func TestTreeStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + payload := treeTestPayload() + store, err := createTreeStorage(fx.db, payload) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createTreeStorage(fx.db, payload) + require.Error(t, err) + }) +} + +func TestTreeStorage_Methods(t *testing.T) { + fx := newFixture(t) + fx.open(t) + payload := treeTestPayload() + _, err := createTreeStorage(fx.db, payload) + require.NoError(t, err) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + store, err := newTreeStorage(fx.db, payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("update heads", func(t *testing.T) { + newHeads := []string{"a", "b"} + require.NoError(t, store.SetHeads(newHeads)) + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, newHeads, heads) + }) + + t.Run("add raw change, get change and has change", func(t *testing.T) { + newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"} + require.NoError(t, store.AddRawChange(newChange)) + rawCh, err := store.GetRawChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.Equal(t, newChange, rawCh) + has, err := store.HasChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.True(t, has) + }) + + t.Run("get and has for unknown change", func(t *testing.T) { + incorrectId := "incorrectId" + _, err := store.GetRawChange(context.Background(), incorrectId) + require.Error(t, err) + has, err := store.HasChange(context.Background(), incorrectId) + require.NoError(t, err) + require.False(t, has) + }) +}