diff --git a/client/storage/liststorage.go b/client/storage/liststorage.go index 3d784c31..40d46e38 100644 --- a/client/storage/liststorage.go +++ b/client/storage/liststorage.go @@ -37,7 +37,7 @@ func newListStorage(spaceId string, db *badger.DB, txn *badger.Txn) (ls storage. ls = &listStorage{ db: db, - keys: newACLKeys(spaceId), + keys: keys, id: stringId, root: rootWithId, } @@ -70,15 +70,15 @@ func createListStorage(spaceId string, db *badger.DB, txn *badger.Txn, root *acl ls = &listStorage{ db: db, - keys: newACLKeys(spaceId), + keys: keys, id: root.Id, root: root, } return } -func (l *listStorage) ID() (string, error) { - return l.id, nil +func (l *listStorage) ID() string { + return l.id } func (l *listStorage) Root() (*aclrecordproto.RawACLRecordWithId, error) { diff --git a/client/storage/liststorage_test.go b/client/storage/liststorage_test.go new file mode 100644 index 00000000..6afa676c --- /dev/null +++ b/client/storage/liststorage_test.go @@ -0,0 +1,72 @@ +package storage + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "testing" +) + +func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) { + require.Equal(t, store.ID(), root.Id) + + aclRoot, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, aclRoot) + + aclHead, err := store.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) +} + +func TestListStorage(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + spaceId := "spaceId" + aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"} + + fx.db.Update(func(txn *badger.Txn) error { + _, err := createListStorage(spaceId, fx.db, txn, aclRoot) + require.NoError(t, err) + return nil + }) + + var listStore storage.ListStorage + fx.db.View(func(txn *badger.Txn) (err error) { + listStore, err = newListStorage(spaceId, fx.db, txn) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + return nil + }) + + t.Run("create same storage returns no error", func(t *testing.T) { + fx.db.View(func(txn *badger.Txn) error { + // this is ok, because we only create new list storage when we create space storage + listStore, err := createListStorage(spaceId, fx.db, txn, aclRoot) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + return nil + }) + }) + + t.Run("set head", func(t *testing.T) { + head := "newHead" + require.NoError(t, listStore.SetHead(head)) + aclHead, err := listStore.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) + }) + + t.Run("add raw record and get raw record", func(t *testing.T) { + newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"} + require.NoError(t, listStore.AddRawRecord(context.Background(), newRec)) + aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id) + require.NoError(t, err) + require.Equal(t, newRec, aclRec) + }) +} diff --git a/client/storage/spacestorage.go b/client/storage/spacestorage.go index f0b65045..58f43482 100644 --- a/client/storage/spacestorage.go +++ b/client/storage/spacestorage.go @@ -77,8 +77,8 @@ func createSpaceStorage(db *badger.DB, payload spacestorage.SpaceStorageCreatePa return } -func (s *spaceStorage) ID() (string, error) { - return s.spaceId, nil +func (s *spaceStorage) ID() string { + return s.spaceId } func (s *spaceStorage) TreeStorage(id string) (storage2.TreeStorage, error) { diff --git a/client/storage/spacestorage_test.go b/client/storage/spacestorage_test.go new file mode 100644 index 00000000..8540d338 --- /dev/null +++ b/client/storage/spacestorage_test.go @@ -0,0 +1,108 @@ +package storage + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/stretchr/testify/require" + "strconv" + "testing" +) + +func spaceTestPayload() spacestorage.SpaceStorageCreatePayload { + header := &spacesyncproto.RawSpaceHeaderWithId{ + RawHeader: []byte("header"), + Id: "headerId", + } + aclRoot := &aclrecordproto.RawACLRecordWithId{ + Payload: []byte("aclRoot"), + Id: "aclRootId", + } + return spacestorage.SpaceStorageCreatePayload{ + RecWithId: aclRoot, + SpaceHeaderWithId: header, + } +} + +func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) { + header, err := store.SpaceHeader() + require.NoError(t, err) + require.Equal(t, payload.SpaceHeaderWithId, header) + + aclStorage, err := store.ACLStorage() + require.NoError(t, err) + testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id) +} + +func TestSpaceStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + payload := spaceTestPayload() + store, err := createSpaceStorage(fx.db, payload) + require.NoError(t, err) + + testSpace(t, store, payload) + require.NoError(t, store.Close()) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createSpaceStorage(fx.db, payload) + require.Error(t, err) + }) +} + +func TestSpaceStorage_NewAndCreateTree(t *testing.T) { + fx := newFixture(t) + fx.open(t) + + payload := spaceTestPayload() + store, err := createSpaceStorage(fx.db, payload) + require.NoError(t, err) + require.NoError(t, store.Close()) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + store, err = newSpaceStorage(fx.db, payload.SpaceHeaderWithId.Id) + require.NoError(t, err) + testSpace(t, store, payload) + + t.Run("create tree and get tree", func(t *testing.T) { + payload := treeTestPayload() + treeStore, err := store.CreateTreeStorage(payload) + require.NoError(t, err) + testTreePayload(t, treeStore, payload) + + otherStore, err := store.TreeStorage(payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, otherStore, payload) + }) +} + +func TestSpaceStorage_StoredIds(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + payload := spaceTestPayload() + store, err := createSpaceStorage(fx.db, payload) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + n := 5 + var ids []string + for i := 0; i < n; i++ { + treePayload := treeTestPayload() + treePayload.RootRawChange.Id += strconv.Itoa(i) + ids = append(ids, treePayload.RootRawChange.Id) + _, err := store.CreateTreeStorage(treePayload) + require.NoError(t, err) + } + + storedIds, err := store.StoredIds() + require.NoError(t, err) + require.Equal(t, ids, storedIds) +} diff --git a/client/storage/treestorage.go b/client/storage/treestorage.go index bede0bc5..60c74c9f 100644 --- a/client/storage/treestorage.go +++ b/client/storage/treestorage.go @@ -2,7 +2,7 @@ package storage import ( "context" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/dgraph-io/badger/v3" ) @@ -14,7 +14,7 @@ type treeStorage struct { root *treechangeproto.RawTreeChangeWithId } -func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage2.TreeStorage, err error) { +func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage.TreeStorage, err error) { keys := newTreeKeys(spaceId, treeId) err = db.View(func(txn *badger.Txn) error { _, err := txn.Get(keys.RootIdKey()) @@ -43,14 +43,14 @@ func newTreeStorage(db *badger.DB, spaceId, treeId string) (ts storage2.TreeStor return } -func createTreeStorage(db *badger.DB, spaceId string, payload storage2.TreeStorageCreatePayload) (ts storage2.TreeStorage, err error) { - keys := newTreeKeys(spaceId, payload.TreeId) +func createTreeStorage(db *badger.DB, spaceId string, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { + keys := newTreeKeys(spaceId, payload.RootRawChange.Id) if hasDB(db, keys.RootIdKey()) { - err = storage2.ErrTreeExists + err = storage.ErrTreeExists return } err = db.Update(func(txn *badger.Txn) error { - heads := storage2.CreateHeadsPayload(payload.Heads) + heads := storage.CreateHeadsPayload(payload.Heads) for _, ch := range payload.Changes { err = txn.Set(keys.RawChangeKey(ch.Id), ch.GetRawChange()) @@ -85,8 +85,8 @@ func createTreeStorage(db *badger.DB, spaceId string, payload storage2.TreeStora return } -func (t *treeStorage) ID() (string, error) { - return t.id, nil +func (t *treeStorage) ID() string { + return t.id } func (t *treeStorage) Root() (raw *treechangeproto.RawTreeChangeWithId, err error) { @@ -97,16 +97,16 @@ func (t *treeStorage) Heads() (heads []string, err error) { headsBytes, err := getDB(t.db, t.keys.HeadsKey()) if err != nil { if err == badger.ErrKeyNotFound { - err = storage2.ErrUnknownTreeId + err = storage.ErrUnknownTreeId } return } - heads = storage2.ParseHeads(headsBytes) + heads = storage.ParseHeads(headsBytes) return } func (t *treeStorage) SetHeads(heads []string) (err error) { - payload := storage2.CreateHeadsPayload(heads) + payload := storage.CreateHeadsPayload(heads) return putDB(t.db, t.keys.HeadsKey(), payload) } @@ -118,7 +118,7 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha res, err := getDB(t.db, t.keys.RawChangeKey(id)) if err != nil { if err == badger.ErrKeyNotFound { - err = storage2.ErrUnknownTreeId + err = storage.ErrUnknownTreeId } return } diff --git a/client/storage/treestorage_test.go b/client/storage/treestorage_test.go new file mode 100644 index 00000000..e2a3bcad --- /dev/null +++ b/client/storage/treestorage_test.go @@ -0,0 +1,123 @@ +package storage + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" + "github.com/dgraph-io/badger/v3" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func treeTestPayload() storage.TreeStorageCreatePayload { + rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "someRootId"} + otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"} + changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange} + return storage.TreeStorageCreatePayload{ + RootRawChange: rootRawChange, + Changes: changes, + Heads: []string{rootRawChange.Id}, + } +} + +type fixture struct { + dir string + db *badger.DB +} + +func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) { + require.Equal(t, payload.RootRawChange.Id, store.ID()) + + root, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, payload.RootRawChange) + + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, payload.Heads, heads) + + for _, ch := range payload.Changes { + dbCh, err := store.GetRawChange(context.Background(), ch.Id) + require.NoError(t, err) + require.Equal(t, ch, dbCh) + } + return +} + +func newFixture(t *testing.T) *fixture { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + return &fixture{dir: dir} +} + +func (fx *fixture) open(t *testing.T) { + var err error + fx.db, err = badger.Open(badger.DefaultOptions(fx.dir)) + require.NoError(t, err) +} + +func (fx *fixture) stop(t *testing.T) { + require.NoError(t, fx.db.Close()) +} + +func TestTreeStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + spaceId := "spaceId" + payload := treeTestPayload() + store, err := createTreeStorage(fx.db, spaceId, payload) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createTreeStorage(fx.db, spaceId, payload) + require.Error(t, err) + }) +} + +func TestTreeStorage_Methods(t *testing.T) { + fx := newFixture(t) + fx.open(t) + payload := treeTestPayload() + spaceId := "spaceId" + _, err := createTreeStorage(fx.db, spaceId, payload) + require.NoError(t, err) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + store, err := newTreeStorage(fx.db, spaceId, payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("update heads", func(t *testing.T) { + newHeads := []string{"a", "b"} + require.NoError(t, store.SetHeads(newHeads)) + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, newHeads, heads) + }) + + t.Run("add raw change, get change and has change", func(t *testing.T) { + newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"} + require.NoError(t, store.AddRawChange(newChange)) + rawCh, err := store.GetRawChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.Equal(t, newChange, rawCh) + has, err := store.HasChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.True(t, has) + }) + + t.Run("get and has for unknown change", func(t *testing.T) { + incorrectId := "incorrectId" + _, err := store.GetRawChange(context.Background(), incorrectId) + require.Error(t, err) + has, err := store.HasChange(context.Background(), incorrectId) + require.NoError(t, err) + require.False(t, has) + }) +} diff --git a/common/commonspace/service.go b/common/commonspace/service.go index 53e22aee..8ab4d9f5 100644 --- a/common/commonspace/service.go +++ b/common/commonspace/service.go @@ -64,7 +64,7 @@ func (s *service) CreateSpace( return } - return store.ID() + return store.ID(), nil } func (s *service) DeriveSpace( @@ -79,7 +79,7 @@ func (s *service) DeriveSpace( return } - return store.ID() + return store.ID(), nil } func (s *service) GetSpace(ctx context.Context, id string) (Space, error) { diff --git a/common/commonspace/storage/mock_storage/mock_storage.go b/common/commonspace/storage/mock_storage/mock_storage.go index 422af081..fb76ad05 100644 --- a/common/commonspace/storage/mock_storage/mock_storage.go +++ b/common/commonspace/storage/mock_storage/mock_storage.go @@ -163,12 +163,11 @@ func (mr *MockSpaceStorageMockRecorder) CreateTreeStorage(arg0 interface{}) *gom } // ID mocks base method. -func (m *MockSpaceStorage) ID() (string, error) { +func (m *MockSpaceStorage) ID() string { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ID") ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + return ret0 } // ID indicates an expected call of ID. diff --git a/common/commonspace/storage/storage.go b/common/commonspace/storage/storage.go index eeee74a2..a469a4b1 100644 --- a/common/commonspace/storage/storage.go +++ b/common/commonspace/storage/storage.go @@ -6,7 +6,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" ) const CName = "commonspace.storage" @@ -15,9 +15,9 @@ var ErrSpaceStorageExists = errors.New("space storage exists") var ErrSpaceStorageMissing = errors.New("space storage missing") type SpaceStorage interface { - storage2.Storage - storage2.Provider - ACLStorage() (storage2.ListStorage, error) + storage.Provider + ID() string + ACLStorage() (storage.ListStorage, error) SpaceHeader() (*spacesyncproto.RawSpaceHeaderWithId, error) StoredIds() ([]string, error) Close() error diff --git a/common/commonspace/syncservice/streampool.go b/common/commonspace/syncservice/streampool.go index 489d561f..e807ae14 100644 --- a/common/commonspace/syncservice/streampool.go +++ b/common/commonspace/syncservice/streampool.go @@ -245,7 +245,8 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre Loop: for { - msg, err := stream.Recv() + var msg *spacesyncproto.ObjectSyncMessage + msg, err = stream.Recv() s.lastUsage.Store(time.Now().Unix()) if err != nil { break @@ -260,7 +261,8 @@ Loop: limiter <- struct{}{} }() } - return s.removePeer(peerId) + s.removePeer(peerId) + return } func (s *streamPool) removePeer(peerId string) (err error) { diff --git a/common/commonspace/syncservice/streampool_test.go b/common/commonspace/syncservice/streampool_test.go new file mode 100644 index 00000000..47ed4ee0 --- /dev/null +++ b/common/commonspace/syncservice/streampool_test.go @@ -0,0 +1,322 @@ +package syncservice + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type testServer struct { + stream chan spacesyncproto.DRPCSpace_StreamStream + addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error + addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error + releaseStream chan error + watchErrOnce bool +} + +func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { + panic("implement me") +} + +func (t *testServer) PushSpace(ctx context.Context, request *spacesyncproto.PushSpaceRequest) (*spacesyncproto.PushSpaceResponse, error) { + panic("implement me") +} + +func (t *testServer) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error { + t.stream <- stream + return <-t.releaseStream +} + +func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpace_StreamStream { + select { + case <-time.After(time.Second * 5): + test.Fatalf("waiteStream timeout") + case st := <-t.stream: + return st + } + return nil +} + +type fixture struct { + testServer *testServer + drpcTS *rpctest.TesServer + client spacesyncproto.DRPCSpaceClient + clientStream spacesyncproto.DRPCSpace_StreamStream + serverStream spacesyncproto.DRPCSpace_StreamStream + pool *streamPool + localId peer.ID + remoteId peer.ID +} + +func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler) *fixture { + fx := &fixture{ + testServer: &testServer{}, + drpcTS: rpctest.NewTestServer(), + localId: localId, + remoteId: remoteId, + } + fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1) + require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer)) + clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId) + fx.client = spacesyncproto.NewDRPCSpaceClient(fx.drpcTS.DialWrapConn(nil, clientWrapper)) + + var err error + fx.clientStream, err = fx.client.Stream(context.Background()) + require.NoError(t, err) + fx.serverStream = fx.testServer.waitStream(t) + fx.pool = newStreamPool(handler).(*streamPool) + + return fx +} + +func (fx *fixture) run(t *testing.T) chan error { + waitCh := make(chan error) + go func() { + err := fx.pool.AddAndReadStreamSync(fx.clientStream) + waitCh <- err + }() + + time.Sleep(time.Millisecond * 10) + fx.pool.Lock() + require.Equal(t, fx.pool.peerStreams[fx.remoteId.String()], fx.clientStream) + fx.pool.Unlock() + + return waitCh +} + +func TestStreamPool_AddAndReadStreamAsync(t *testing.T) { + remId := peer.ID("remoteId") + + t.Run("client close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + + err := fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) + t.Run("server close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + + err := fx.serverStream.Close() + require.NoError(t, err) + + err = <-waitCh + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_Close(t *testing.T) { + remId := peer.ID("remoteId") + + t.Run("client close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + fx.run(t) + var events []string + recvChan := make(chan struct{}) + go func() { + fx.pool.Close() + events = append(events, "pool_close") + recvChan <- struct{}{} + }() + time.Sleep(50 * time.Millisecond) //err = <-waitCh + events = append(events, "stream_close") + err := fx.clientStream.Close() + require.NoError(t, err) + <-recvChan + require.Equal(t, []string{"stream_close", "pool_close"}, events) + }) + t.Run("server close", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + fx.run(t) + var events []string + recvChan := make(chan struct{}) + go func() { + fx.pool.Close() + events = append(events, "pool_close") + recvChan <- struct{}{} + }() + time.Sleep(50 * time.Millisecond) //err = <-waitCh + events = append(events, "stream_close") + err := fx.clientStream.Close() + require.NoError(t, err) + <-recvChan + require.Equal(t, []string{"stream_close", "pool_close"}, events) + }) +} + +func TestStreamPool_ReceiveMessage(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool receive message from server", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + recvChan := make(chan struct{}) + fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { + require.Equal(t, msg, message) + recvChan <- struct{}{} + return nil + }) + waitCh := fx.run(t) + + err := fx.serverStream.Send(msg) + require.NoError(t, err) + <-recvChan + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_HasActiveStream(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool has active stream", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + require.True(t, fx.pool.HasActiveStream(remId.String())) + + err := fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) + t.Run("pool has no active stream", func(t *testing.T) { + fx := newFixture(t, "", remId, nil) + waitCh := fx.run(t) + err := fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + require.Error(t, err) + require.False(t, fx.pool.HasActiveStream(remId.String())) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_SendAsync(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool send async to server", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + recvChan := make(chan struct{}) + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg, message) + recvChan <- struct{}{} + }() + waitCh := fx.run(t) + + err := fx.pool.SendAsync([]string{remId.String()}, msg) + require.NoError(t, err) + <-recvChan + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_SendSync(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool send sync to server", func(t *testing.T) { + objectId := "objectId" + payload := []byte("payload") + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg.ObjectId, message.ObjectId) + require.NotEmpty(t, message.ReplyId) + message.Payload = payload + err = fx.serverStream.Send(message) + require.NoError(t, err) + }() + waitCh := fx.run(t) + res, err := fx.pool.SendSync(remId.String(), msg) + require.NoError(t, err) + require.Equal(t, payload, res.Payload) + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) + + t.Run("pool send sync timeout", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + syncWaitPeriod = time.Millisecond * 30 + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg.ObjectId, message.ObjectId) + require.NotEmpty(t, message.ReplyId) + }() + waitCh := fx.run(t) + _, err := fx.pool.SendSync(remId.String(), msg) + require.Equal(t, ErrSyncTimeout, err) + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} + +func TestStreamPool_BroadcastAsync(t *testing.T) { + remId := peer.ID("remoteId") + t.Run("pool broadcast async to server", func(t *testing.T) { + objectId := "objectId" + msg := &spacesyncproto.ObjectSyncMessage{ + ObjectId: objectId, + } + fx := newFixture(t, "", remId, nil) + recvChan := make(chan struct{}) + go func() { + message, err := fx.serverStream.Recv() + require.NoError(t, err) + require.Equal(t, msg, message) + recvChan <- struct{}{} + }() + waitCh := fx.run(t) + + err := fx.pool.BroadcastAsync(msg) + require.NoError(t, err) + <-recvChan + err = fx.clientStream.Close() + require.NoError(t, err) + err = <-waitCh + + require.Error(t, err) + require.Nil(t, fx.pool.peerStreams[remId.String()]) + }) +} diff --git a/common/commonspace/synctree/mock_synctree/mock_synctree.go b/common/commonspace/synctree/mock_synctree/mock_synctree.go index 61cf4401..914a6ec1 100644 --- a/common/commonspace/synctree/mock_synctree/mock_synctree.go +++ b/common/commonspace/synctree/mock_synctree/mock_synctree.go @@ -6,7 +6,6 @@ package mock_synctree import ( reflect "reflect" - time "time" tree "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree" treechangeproto "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" @@ -122,20 +121,6 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest)) } -// LastUsage mocks base method. -func (m *MockSyncClient) LastUsage() time.Time { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LastUsage") - ret0, _ := ret[0].(time.Time) - return ret0 -} - -// LastUsage indicates an expected call of LastUsage. -func (mr *MockSyncClientMockRecorder) LastUsage() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastUsage", reflect.TypeOf((*MockSyncClient)(nil).LastUsage)) -} - // SendAsync mocks base method. func (m *MockSyncClient) SendAsync(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) error { m.ctrl.T.Helper() diff --git a/common/commonspace/synctree/synctree.go b/common/commonspace/synctree/synctree.go index 17bbcb84..ac7b5aac 100644 --- a/common/commonspace/synctree/synctree.go +++ b/common/commonspace/synctree/synctree.go @@ -59,9 +59,7 @@ type BuildDeps struct { TreeStorage storage.TreeStorage } -func DeriveSyncTree( - ctx context.Context, - deps CreateDeps) (t tree.ObjectTree, err error) { +func DeriveSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { t, err = createDerivedObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return @@ -86,9 +84,7 @@ func DeriveSyncTree( return } -func CreateSyncTree( - ctx context.Context, - deps CreateDeps) (t tree.ObjectTree, err error) { +func CreateSyncTree(ctx context.Context, deps CreateDeps) (t tree.ObjectTree, err error) { t, err = createObjectTree(deps.Payload, deps.AclList, deps.CreateStorage) if err != nil { return @@ -155,7 +151,6 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t fullSyncResp := resp.GetContent().GetFullSyncResponse() payload := storage.TreeStorageCreatePayload{ - TreeId: id, RootRawChange: resp.RootChange, Changes: fullSyncResp.Changes, Heads: fullSyncResp.Heads, @@ -175,10 +170,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t return buildSyncTree(ctx, true, deps) } -func buildSyncTree( - ctx context.Context, - isFirstBuild bool, - deps BuildDeps) (t tree.ObjectTree, err error) { +func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t tree.ObjectTree, err error) { t, err = buildObjectTree(deps.TreeStorage, deps.AclList) if err != nil { diff --git a/common/net/rpc/rpctest/server.go b/common/net/rpc/rpctest/server.go index 270067e4..134ce6cb 100644 --- a/common/net/rpc/rpctest/server.go +++ b/common/net/rpc/rpctest/server.go @@ -2,6 +2,8 @@ package rpctest import ( "context" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" "net" "storj.io/drpc" "storj.io/drpc/drpcconn" @@ -9,6 +11,48 @@ import ( "storj.io/drpc/drpcserver" ) +type SecConnMock struct { + net.Conn + localPrivKey crypto.PrivKey + remotePubKey crypto.PubKey + localId peer.ID + remoteId peer.ID +} + +func (s *SecConnMock) LocalPeer() peer.ID { + return s.localId +} + +func (s *SecConnMock) LocalPrivateKey() crypto.PrivKey { + return s.localPrivKey +} + +func (s *SecConnMock) RemotePeer() peer.ID { + return s.remoteId +} + +func (s *SecConnMock) RemotePublicKey() crypto.PubKey { + return s.remotePubKey +} + +type ConnWrapper func(conn net.Conn) net.Conn + +func NewSecConnWrapper( + localPrivKey crypto.PrivKey, + remotePubKey crypto.PubKey, + localId peer.ID, + remoteId peer.ID) ConnWrapper { + return func(conn net.Conn) net.Conn { + return &SecConnMock{ + Conn: conn, + localPrivKey: localPrivKey, + remotePubKey: remotePubKey, + localId: localId, + remoteId: remoteId, + } + } +} + func NewTestServer() *TesServer { ts := &TesServer{ Mux: drpcmux.New(), @@ -23,7 +67,17 @@ type TesServer struct { } func (ts *TesServer) Dial() drpc.Conn { + return ts.DialWrapConn(nil, nil) +} + +func (ts *TesServer) DialWrapConn(serverWrapper ConnWrapper, clientWrapper ConnWrapper) drpc.Conn { sc, cc := net.Pipe() + if serverWrapper != nil { + sc = serverWrapper(sc) + } + if clientWrapper != nil { + cc = clientWrapper(cc) + } go ts.Server.ServeOne(context.Background(), sc) return drpcconn.New(cc) } diff --git a/common/pkg/acl/list/list.go b/common/pkg/acl/list/list.go index a86b2bc8..6e8d259e 100644 --- a/common/pkg/acl/list/list.go +++ b/common/pkg/acl/list/list.go @@ -50,19 +50,13 @@ type aclList struct { } func BuildACLListWithIdentity(acc *account.AccountData, storage storage.ListStorage) (ACLList, error) { - id, err := storage.ID() - if err != nil { - return nil, err - } + id := storage.ID() builder := newACLStateBuilderWithIdentity(acc) return build(id, builder, newACLRecordBuilder(id, common.NewKeychain()), storage) } func BuildACLList(storage storage.ListStorage) (ACLList, error) { - id, err := storage.ID() - if err != nil { - return nil, err - } + id := storage.ID() return build(id, newACLStateBuilder(), newACLRecordBuilder(id, common.NewKeychain()), storage) } diff --git a/common/pkg/acl/storage/inmemory.go b/common/pkg/acl/storage/inmemory.go index 11843de7..5c39e94d 100644 --- a/common/pkg/acl/storage/inmemory.go +++ b/common/pkg/acl/storage/inmemory.go @@ -56,10 +56,10 @@ func (i *inMemoryACLListStorage) AddRawRecord(ctx context.Context, rec *aclrecor panic("implement me") } -func (i *inMemoryACLListStorage) ID() (string, error) { +func (i *inMemoryACLListStorage) ID() string { i.RLock() defer i.RUnlock() - return i.id, nil + return i.id } type inMemoryTreeStorage struct { @@ -72,7 +72,6 @@ type inMemoryTreeStorage struct { } func NewInMemoryTreeStorage( - treeId string, root *treechangeproto.RawTreeChangeWithId, heads []string, changes []*treechangeproto.RawTreeChangeWithId) (TreeStorage, error) { @@ -80,10 +79,10 @@ func NewInMemoryTreeStorage( for _, ch := range changes { allChanges[ch.Id] = ch } - allChanges[treeId] = root + allChanges[root.Id] = root return &inMemoryTreeStorage{ - id: treeId, + id: root.Id, root: root, heads: heads, changes: allChanges, @@ -96,10 +95,10 @@ func (t *inMemoryTreeStorage) HasChange(ctx context.Context, id string) (bool, e return exists, nil } -func (t *inMemoryTreeStorage) ID() (string, error) { +func (t *inMemoryTreeStorage) ID() string { t.RLock() defer t.RUnlock() - return t.id, nil + return t.id } func (t *inMemoryTreeStorage) Root() (*treechangeproto.RawTreeChangeWithId, error) { @@ -159,12 +158,12 @@ func (i *inMemoryStorageProvider) TreeStorage(id string) (TreeStorage, error) { func (i *inMemoryStorageProvider) CreateTreeStorage(payload TreeStorageCreatePayload) (TreeStorage, error) { i.Lock() defer i.Unlock() - res, err := NewInMemoryTreeStorage(payload.TreeId, payload.RootRawChange, payload.Heads, payload.Changes) + res, err := NewInMemoryTreeStorage(payload.RootRawChange, payload.Heads, payload.Changes) if err != nil { return nil, err } - i.objects[payload.TreeId] = res + i.objects[payload.RootRawChange.Id] = res return res, nil } diff --git a/common/pkg/acl/storage/liststorage.go b/common/pkg/acl/storage/liststorage.go index 42f80110..81bb43cc 100644 --- a/common/pkg/acl/storage/liststorage.go +++ b/common/pkg/acl/storage/liststorage.go @@ -12,7 +12,7 @@ var ErrACLExists = errors.New("acl already exists") var ErrUnknownRecord = errors.New("record doesn't exist") type ListStorage interface { - Storage + ID() string Root() (*aclrecordproto.RawACLRecordWithId, error) Head() (string, error) SetHead(headId string) error diff --git a/common/pkg/acl/storage/mock_storage/mock_storage.go b/common/pkg/acl/storage/mock_storage/mock_storage.go index bd562720..38f8a2b2 100644 --- a/common/pkg/acl/storage/mock_storage/mock_storage.go +++ b/common/pkg/acl/storage/mock_storage/mock_storage.go @@ -81,12 +81,11 @@ func (mr *MockListStorageMockRecorder) Head() *gomock.Call { } // ID mocks base method. -func (m *MockListStorage) ID() (string, error) { +func (m *MockListStorage) ID() string { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ID") ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + return ret0 } // ID indicates an expected call of ID. @@ -207,12 +206,11 @@ func (mr *MockTreeStorageMockRecorder) Heads() *gomock.Call { } // ID mocks base method. -func (m *MockTreeStorage) ID() (string, error) { +func (m *MockTreeStorage) ID() string { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ID") ret0, _ := ret[0].(string) - ret1, _ := ret[1].(error) - return ret0, ret1 + return ret0 } // ID indicates an expected call of ID. diff --git a/common/pkg/acl/storage/provider.go b/common/pkg/acl/storage/provider.go index da46ab41..51b53b4f 100644 --- a/common/pkg/acl/storage/provider.go +++ b/common/pkg/acl/storage/provider.go @@ -10,7 +10,6 @@ var ErrTreeExists = errors.New("tree already exists") var ErrUnkownChange = errors.New("change doesn't exist") type TreeStorageCreatePayload struct { - TreeId string RootRawChange *treechangeproto.RawTreeChangeWithId Changes []*treechangeproto.RawTreeChangeWithId Heads []string diff --git a/common/pkg/acl/storage/storage.go b/common/pkg/acl/storage/storage.go deleted file mode 100644 index 42af1815..00000000 --- a/common/pkg/acl/storage/storage.go +++ /dev/null @@ -1,5 +0,0 @@ -package storage - -type Storage interface { - ID() (string, error) -} diff --git a/common/pkg/acl/storage/treestorage.go b/common/pkg/acl/storage/treestorage.go index 76332399..751dd1d9 100644 --- a/common/pkg/acl/storage/treestorage.go +++ b/common/pkg/acl/storage/treestorage.go @@ -6,7 +6,7 @@ import ( ) type TreeStorage interface { - Storage + ID() string Root() (*treechangeproto.RawTreeChangeWithId, error) Heads() ([]string, error) SetHeads(heads []string) error diff --git a/common/pkg/acl/testutils/acllistbuilder/liststoragebuilder.go b/common/pkg/acl/testutils/acllistbuilder/liststoragebuilder.go index cf515bfd..6777777c 100644 --- a/common/pkg/acl/testutils/acllistbuilder/liststoragebuilder.go +++ b/common/pkg/acl/testutils/acllistbuilder/liststoragebuilder.go @@ -120,8 +120,8 @@ func (t *ACLListStorageBuilder) AddRawRecord(ctx context.Context, rec *aclrecord panic("implement me") } -func (t *ACLListStorageBuilder) ID() (string, error) { - return t.id, nil +func (t *ACLListStorageBuilder) ID() string { + return t.id } func (t *ACLListStorageBuilder) GetRawRecords() []*aclrecordproto2.RawACLRecordWithId { diff --git a/common/pkg/acl/tree/objecttree_test.go b/common/pkg/acl/tree/objecttree_test.go index e2a2b028..da9f2364 100644 --- a/common/pkg/acl/tree/objecttree_test.go +++ b/common/pkg/acl/tree/objecttree_test.go @@ -3,7 +3,7 @@ package tree import ( "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/testutils/acllistbuilder" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/stretchr/testify/assert" @@ -53,9 +53,9 @@ func (c *mockChangeCreator) createRaw(id, aclId, snapshotId string, isSnapshot b } } -func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage2.TreeStorage { +func (c *mockChangeCreator) createNewTreeStorage(treeId, aclHeadId string) storage.TreeStorage { root := c.createRoot(treeId, aclHeadId) - treeStorage, _ := storage2.NewInMemoryTreeStorage(treeId, root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root}) + treeStorage, _ := storage.NewInMemoryTreeStorage(root, []string{root.Id}, []*treechangeproto.RawTreeChangeWithId{root}) return treeStorage } @@ -95,7 +95,7 @@ func (m *mockChangeValidator) ValidateFullTree(tree *Tree, aclList list.ACLList) type testTreeContext struct { aclList list.ACLList - treeStorage storage2.TreeStorage + treeStorage storage.TreeStorage changeBuilder *mockChangeBuilder changeCreator *mockChangeCreator objTree ObjectTree diff --git a/common/pkg/acl/tree/objecttreefactory.go b/common/pkg/acl/tree/objecttreefactory.go index 94459020..ca097505 100644 --- a/common/pkg/acl/tree/objecttreefactory.go +++ b/common/pkg/acl/tree/objecttreefactory.go @@ -3,7 +3,7 @@ package tree import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/common" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/list" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/symmetric" @@ -20,7 +20,7 @@ type ObjectTreeCreatePayload struct { Identity []byte } -func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (ObjectTree, error) { +func BuildObjectTree(treeStorage storage.TreeStorage, aclList list.ACLList) (ObjectTree, error) { rootChange, err := treeStorage.Root() if err != nil { return nil, err @@ -32,14 +32,14 @@ func BuildObjectTree(treeStorage storage2.TreeStorage, aclList list.ACLList) (Ob func CreateDerivedObjectTree( payload ObjectTreeCreatePayload, aclList list.ACLList, - createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { + createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { return createObjectTree(payload, 0, nil, aclList, createStorage) } func CreateObjectTree( payload ObjectTreeCreatePayload, aclList list.ACLList, - createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { + createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { bytes := make([]byte, 32) _, err = rand.Read(bytes) if err != nil { @@ -53,7 +53,7 @@ func createObjectTree( timestamp int64, seed []byte, aclList list.ACLList, - createStorage storage2.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { + createStorage storage.TreeStorageCreatorFunc) (objTree ObjectTree, err error) { aclList.RLock() aclHeadId := aclList.Head().Id aclList.RUnlock() @@ -77,8 +77,7 @@ func createObjectTree( } // create storage - st, err := createStorage(storage2.TreeStorageCreatePayload{ - TreeId: raw.Id, + st, err := createStorage(storage.TreeStorageCreatePayload{ RootRawChange: raw, Changes: []*treechangeproto.RawTreeChangeWithId{raw}, Heads: []string{raw.Id}, @@ -127,11 +126,7 @@ func buildObjectTree(deps objectTreeDeps) (ObjectTree, error) { } } - objTree.id, err = objTree.treeStorage.ID() - if err != nil { - return nil, err - } - + objTree.id = objTree.treeStorage.ID() objTree.root, err = objTree.treeStorage.Root() if err != nil { return nil, err diff --git a/common/pkg/ocache/ocache.go b/common/pkg/ocache/ocache.go index 41555e42..629a5864 100644 --- a/common/pkg/ocache/ocache.go +++ b/common/pkg/ocache/ocache.go @@ -409,9 +409,13 @@ func (c *oCache) Close() (err error) { } c.closed = true close(c.closeCh) - var toClose []*entry + var toClose, alreadyClosing []*entry for _, e := range c.data { - toClose = append(toClose, e) + if e.isClosing { + alreadyClosing = append(alreadyClosing, e) + } else { + toClose = append(toClose, e) + } } c.mu.Unlock() for _, e := range toClose { @@ -422,5 +426,8 @@ func (c *oCache) Close() (err error) { } } } + for _, e := range alreadyClosing { + <-e.close + } return nil } diff --git a/node/storage/keys.go b/node/storage/keys.go index 85dd46aa..e21a7140 100644 --- a/node/storage/keys.go +++ b/node/storage/keys.go @@ -62,5 +62,17 @@ func (s spaceKeys) HeaderKey() []byte { } func isRootIdKey(key string) bool { - return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "heads") + return strings.HasPrefix(key, "t/") && strings.HasSuffix(key, "/heads") +} + +func getRootId(key string) string { + prefixLen := 2 // len("t/") + suffixLen := 6 // len("/heads") + rootLen := len(key) - suffixLen - prefixLen + sBuf := strings.Builder{} + sBuf.Grow(rootLen) + for i := prefixLen; i < prefixLen+rootLen; i++ { + sBuf.WriteByte(key[i]) + } + return sBuf.String() } diff --git a/node/storage/liststorage.go b/node/storage/liststorage.go index c3ba4f92..1247b2e8 100644 --- a/node/storage/liststorage.go +++ b/node/storage/liststorage.go @@ -82,8 +82,8 @@ func createListStorage(db *pogreb.DB, root *aclrecordproto.RawACLRecordWithId) ( return } -func (l *listStorage) ID() (string, error) { - return l.id, nil +func (l *listStorage) ID() string { + return l.id } func (l *listStorage) Root() (*aclrecordproto.RawACLRecordWithId, error) { diff --git a/node/storage/liststorage_test.go b/node/storage/liststorage_test.go new file mode 100644 index 00000000..ae04e3af --- /dev/null +++ b/node/storage/liststorage_test.go @@ -0,0 +1,70 @@ +package storage + +import ( + "context" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/stretchr/testify/require" + "testing" +) + +func testList(t *testing.T, store storage.ListStorage, root *aclrecordproto.RawACLRecordWithId, head string) { + require.Equal(t, store.ID(), root.Id) + + aclRoot, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, aclRoot) + + aclHead, err := store.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) +} + +func TestListStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"} + listStore, err := createListStorage(fx.db, aclRoot) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + t.Run("create same list storage returns nil", func(t *testing.T) { + // this is ok, because we only create new list storage when we create space storage + listStore, err := createListStorage(fx.db, aclRoot) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + }) +} + +func TestListStorage_Methods(t *testing.T) { + fx := newFixture(t) + fx.open(t) + aclRoot := &aclrecordproto.RawACLRecordWithId{Payload: []byte("root"), Id: "someRootId"} + _, err := createListStorage(fx.db, aclRoot) + require.NoError(t, err) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + listStore, err := newListStorage(fx.db) + require.NoError(t, err) + testList(t, listStore, aclRoot, aclRoot.Id) + + t.Run("set head", func(t *testing.T) { + head := "newHead" + require.NoError(t, listStore.SetHead(head)) + aclHead, err := listStore.Head() + require.NoError(t, err) + require.Equal(t, head, aclHead) + }) + + t.Run("add raw record and get raw record", func(t *testing.T) { + newRec := &aclrecordproto.RawACLRecordWithId{Payload: []byte("rec"), Id: "someRecId"} + require.NoError(t, listStore.AddRawRecord(context.Background(), newRec)) + aclRec, err := listStore.GetRawRecord(context.Background(), newRec.Id) + require.NoError(t, err) + require.Equal(t, newRec, aclRec) + }) +} diff --git a/node/storage/spacestorage.go b/node/storage/spacestorage.go index f82d85ba..eed05609 100644 --- a/node/storage/spacestorage.go +++ b/node/storage/spacestorage.go @@ -88,8 +88,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate } defer func() { - log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage") if err != nil { + log.With(zap.String("id", payload.SpaceHeaderWithId.Id), zap.Error(err)).Warn("failed to create storage") db.Close() } }() @@ -129,8 +129,8 @@ func createSpaceStorage(rootPath string, payload spacestorage.SpaceStorageCreate return } -func (s *spaceStorage) ID() (string, error) { - return s.spaceId, nil +func (s *spaceStorage) ID() string { + return s.spaceId } func (s *spaceStorage) TreeStorage(id string) (storage2.TreeStorage, error) { @@ -156,13 +156,13 @@ func (s *spaceStorage) SpaceHeader() (header *spacesyncproto.RawSpaceHeaderWithI func (s *spaceStorage) StoredIds() (ids []string, err error) { index := s.objDb.Items() - key, val, err := index.Next() + key, _, err := index.Next() for err == nil { strKey := string(key) if isRootIdKey(strKey) { - ids = append(ids, string(val)) + ids = append(ids, getRootId(strKey)) } - key, val, err = index.Next() + key, _, err = index.Next() } if err != pogreb.ErrIterationDone { diff --git a/node/storage/spacestorage_test.go b/node/storage/spacestorage_test.go new file mode 100644 index 00000000..33243e0a --- /dev/null +++ b/node/storage/spacestorage_test.go @@ -0,0 +1,107 @@ +package storage + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/stretchr/testify/require" + "os" + "strconv" + "testing" +) + +func spaceTestPayload() spacestorage.SpaceStorageCreatePayload { + header := &spacesyncproto.RawSpaceHeaderWithId{ + RawHeader: []byte("header"), + Id: "headerId", + } + aclRoot := &aclrecordproto.RawACLRecordWithId{ + Payload: []byte("aclRoot"), + Id: "aclRootId", + } + return spacestorage.SpaceStorageCreatePayload{ + RecWithId: aclRoot, + SpaceHeaderWithId: header, + } +} + +func testSpace(t *testing.T, store spacestorage.SpaceStorage, payload spacestorage.SpaceStorageCreatePayload) { + header, err := store.SpaceHeader() + require.NoError(t, err) + require.Equal(t, payload.SpaceHeaderWithId, header) + + aclStorage, err := store.ACLStorage() + require.NoError(t, err) + testList(t, aclStorage, payload.RecWithId, payload.RecWithId.Id) +} + +func TestSpaceStorage_Create(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + payload := spaceTestPayload() + store, err := createSpaceStorage(dir, payload) + require.NoError(t, err) + + testSpace(t, store, payload) + require.NoError(t, store.Close()) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createSpaceStorage(dir, payload) + require.Error(t, err) + }) +} + +func TestSpaceStorage_NewAndCreateTree(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + payload := spaceTestPayload() + store, err := createSpaceStorage(dir, payload) + require.NoError(t, err) + require.NoError(t, store.Close()) + + store, err = newSpaceStorage(dir, payload.SpaceHeaderWithId.Id) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + testSpace(t, store, payload) + + t.Run("create tree and get tree", func(t *testing.T) { + payload := treeTestPayload() + treeStore, err := store.CreateTreeStorage(payload) + require.NoError(t, err) + testTreePayload(t, treeStore, payload) + + otherStore, err := store.TreeStorage(payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, otherStore, payload) + }) +} + +func TestSpaceStorage_StoredIds(t *testing.T) { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + + payload := spaceTestPayload() + store, err := createSpaceStorage(dir, payload) + require.NoError(t, err) + defer func() { + require.NoError(t, store.Close()) + }() + + n := 5 + var ids []string + for i := 0; i < n; i++ { + treePayload := treeTestPayload() + treePayload.RootRawChange.Id += strconv.Itoa(i) + ids = append(ids, treePayload.RootRawChange.Id) + _, err := store.CreateTreeStorage(treePayload) + require.NoError(t, err) + } + + storedIds, err := store.StoredIds() + require.NoError(t, err) + require.Equal(t, ids, storedIds) +} diff --git a/node/storage/treestorage.go b/node/storage/treestorage.go index 7ad33aa1..9b92379d 100644 --- a/node/storage/treestorage.go +++ b/node/storage/treestorage.go @@ -3,7 +3,7 @@ package storage import ( "context" "github.com/akrylysov/pogreb" - storage2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + storage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" ) @@ -14,14 +14,14 @@ type treeStorage struct { root *treechangeproto.RawTreeChangeWithId } -func newTreeStorage(db *pogreb.DB, treeId string) (ts storage2.TreeStorage, err error) { +func newTreeStorage(db *pogreb.DB, treeId string) (ts storage.TreeStorage, err error) { keys := newTreeKeys(treeId) heads, err := db.Get(keys.HeadsKey()) if err != nil { return } if heads == nil { - err = storage2.ErrUnknownTreeId + err = storage.ErrUnknownTreeId return } @@ -30,7 +30,7 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage2.TreeStorage, err return } if root == nil { - err = storage2.ErrUnknownTreeId + err = storage.ErrUnknownTreeId return } @@ -47,18 +47,18 @@ func newTreeStorage(db *pogreb.DB, treeId string) (ts storage2.TreeStorage, err return } -func createTreeStorage(db *pogreb.DB, payload storage2.TreeStorageCreatePayload) (ts storage2.TreeStorage, err error) { - keys := newTreeKeys(payload.TreeId) +func createTreeStorage(db *pogreb.DB, payload storage.TreeStorageCreatePayload) (ts storage.TreeStorage, err error) { + keys := newTreeKeys(payload.RootRawChange.Id) has, err := db.Has(keys.HeadsKey()) if err != nil { return } if has { - err = storage2.ErrTreeExists + err = storage.ErrTreeExists return } - heads := storage2.CreateHeadsPayload(payload.Heads) + heads := storage.CreateHeadsPayload(payload.Heads) for _, ch := range payload.Changes { err = db.Put(keys.RawChangeKey(ch.Id), ch.GetRawChange()) @@ -86,8 +86,8 @@ func createTreeStorage(db *pogreb.DB, payload storage2.TreeStorageCreatePayload) return } -func (t *treeStorage) ID() (string, error) { - return t.id, nil +func (t *treeStorage) ID() string { + return t.id } func (t *treeStorage) Root() (raw *treechangeproto.RawTreeChangeWithId, err error) { @@ -100,15 +100,15 @@ func (t *treeStorage) Heads() (heads []string, err error) { return } if headsBytes == nil { - err = storage2.ErrUnknownTreeId + err = storage.ErrUnknownTreeId return } - heads = storage2.ParseHeads(headsBytes) + heads = storage.ParseHeads(headsBytes) return } func (t *treeStorage) SetHeads(heads []string) (err error) { - payload := storage2.CreateHeadsPayload(heads) + payload := storage.CreateHeadsPayload(heads) return t.db.Put(t.keys.HeadsKey(), payload) } @@ -122,7 +122,7 @@ func (t *treeStorage) GetRawChange(ctx context.Context, id string) (raw *treecha return } if res == nil { - err = storage2.ErrUnkownChange + err = storage.ErrUnkownChange } raw = &treechangeproto.RawTreeChangeWithId{ diff --git a/node/storage/treestorage_test.go b/node/storage/treestorage_test.go new file mode 100644 index 00000000..df83c118 --- /dev/null +++ b/node/storage/treestorage_test.go @@ -0,0 +1,121 @@ +package storage + +import ( + "context" + "github.com/akrylysov/pogreb" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/storage" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto" + "github.com/stretchr/testify/require" + "os" + "testing" +) + +func treeTestPayload() storage.TreeStorageCreatePayload { + rootRawChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some"), Id: "rootId"} + otherChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("some other"), Id: "otherId"} + changes := []*treechangeproto.RawTreeChangeWithId{rootRawChange, otherChange} + return storage.TreeStorageCreatePayload{ + RootRawChange: rootRawChange, + Changes: changes, + Heads: []string{rootRawChange.Id}, + } +} + +type fixture struct { + dir string + db *pogreb.DB +} + +func newFixture(t *testing.T) *fixture { + dir, err := os.MkdirTemp("", "") + require.NoError(t, err) + return &fixture{dir: dir} +} + +func (fx *fixture) open(t *testing.T) { + var err error + fx.db, err = pogreb.Open(fx.dir, nil) + require.NoError(t, err) +} + +func (fx *fixture) stop(t *testing.T) { + require.NoError(t, fx.db.Close()) +} + +func testTreePayload(t *testing.T, store storage.TreeStorage, payload storage.TreeStorageCreatePayload) { + require.Equal(t, payload.RootRawChange.Id, store.ID()) + + root, err := store.Root() + require.NoError(t, err) + require.Equal(t, root, payload.RootRawChange) + + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, payload.Heads, heads) + + for _, ch := range payload.Changes { + dbCh, err := store.GetRawChange(context.Background(), ch.Id) + require.NoError(t, err) + require.Equal(t, ch, dbCh) + } + return +} + +func TestTreeStorage_Create(t *testing.T) { + fx := newFixture(t) + fx.open(t) + defer fx.stop(t) + + payload := treeTestPayload() + store, err := createTreeStorage(fx.db, payload) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("create same storage returns error", func(t *testing.T) { + _, err := createTreeStorage(fx.db, payload) + require.Error(t, err) + }) +} + +func TestTreeStorage_Methods(t *testing.T) { + fx := newFixture(t) + fx.open(t) + payload := treeTestPayload() + _, err := createTreeStorage(fx.db, payload) + require.NoError(t, err) + fx.stop(t) + + fx.open(t) + defer fx.stop(t) + store, err := newTreeStorage(fx.db, payload.RootRawChange.Id) + require.NoError(t, err) + testTreePayload(t, store, payload) + + t.Run("update heads", func(t *testing.T) { + newHeads := []string{"a", "b"} + require.NoError(t, store.SetHeads(newHeads)) + heads, err := store.Heads() + require.NoError(t, err) + require.Equal(t, newHeads, heads) + }) + + t.Run("add raw change, get change and has change", func(t *testing.T) { + newChange := &treechangeproto.RawTreeChangeWithId{RawChange: []byte("ab"), Id: "newId"} + require.NoError(t, store.AddRawChange(newChange)) + rawCh, err := store.GetRawChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.Equal(t, newChange, rawCh) + has, err := store.HasChange(context.Background(), newChange.Id) + require.NoError(t, err) + require.True(t, has) + }) + + t.Run("get and has for unknown change", func(t *testing.T) { + incorrectId := "incorrectId" + _, err := store.GetRawChange(context.Background(), incorrectId) + require.Error(t, err) + has, err := store.HasChange(context.Background(), incorrectId) + require.NoError(t, err) + require.False(t, has) + }) +}