diff --git a/common/testutil/testaccount/service.go b/common/testutil/testaccount/service.go new file mode 100644 index 00000000..7995621d --- /dev/null +++ b/common/testutil/testaccount/service.go @@ -0,0 +1,44 @@ +package testaccount + +import ( + accountService "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/account" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey" +) + +// AccountTestService provides service for test purposes, generates new random account every Init +type AccountTestService struct { + acc *account.AccountData +} + +func (s *AccountTestService) Init(a *app.App) (err error) { + encKey, _, err := encryptionkey.GenerateRandomRSAKeyPair(2048) + if err != nil { + return + } + + signKey, _, err := signingkey.GenerateRandomEd25519KeyPair() + if err != nil { + return + } + ident, err := signKey.GetPublic().Raw() + if err != nil { + return + } + s.acc = &account.AccountData{ + Identity: ident, + SignKey: signKey, + EncKey: encKey, + } + return nil +} + +func (s *AccountTestService) Name() (name string) { + return accountService.CName +} + +func (s *AccountTestService) Account() *account.AccountData { + return s.acc +} diff --git a/consensus/consensusclient/client.go b/consensus/consensusclient/client.go index 9d08617b..48576dd4 100644 --- a/consensus/consensusclient/client.go +++ b/consensus/consensusclient/client.go @@ -1,3 +1,4 @@ +//go:generate mockgen -destination mock_consensusclient/mock_consensusclient.go github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient Service package consensusclient import ( diff --git a/consensus/consensusclient/client_test.go b/consensus/consensusclient/client_test.go index ef89d8d5..e7557d0d 100644 --- a/consensus/consensusclient/client_test.go +++ b/consensus/consensusclient/client_test.go @@ -13,6 +13,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "sync" "testing" "time" ) @@ -22,7 +23,7 @@ func TestService_Watch(t *testing.T) { fx := newFixture(t).run(t) defer fx.Finish() var logId = []byte{'1'} - w := &testWatcher{} + w := &testWatcher{ready: make(chan struct{})} require.NoError(t, fx.Watch(logId, w)) st := fx.testServer.waitStream(t) req, err := st.Recv() @@ -34,6 +35,7 @@ func TestService_Watch(t *testing.T) { Error: consensusproto.ErrCodes_ErrorOffset + consensusproto.ErrCodes_LogNotFound, }, })) + <-w.ready assert.Equal(t, consensuserr.ErrLogNotFound, w.err) fx.testServer.releaseStream <- nil }) @@ -207,14 +209,22 @@ func (t *testServer) waitStream(test *testing.T) consensusproto.DRPCConsensus_Wa } type testWatcher struct { - recs [][]*consensusproto.Record - err error + recs [][]*consensusproto.Record + err error + ready chan struct{} + once sync.Once } func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.Record) { t.recs = append(t.recs, recs) + t.once.Do(func() { + close(t.ready) + }) } func (t *testWatcher) AddConsensusError(err error) { t.err = err + t.once.Do(func() { + close(t.ready) + }) } diff --git a/consensus/consensusclient/mock_consensusclient/mock_consensusclient.go b/consensus/consensusclient/mock_consensusclient/mock_consensusclient.go new file mode 100644 index 00000000..356cb927 --- /dev/null +++ b/consensus/consensusclient/mock_consensusclient/mock_consensusclient.go @@ -0,0 +1,150 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient (interfaces: Service) + +// Package mock_consensusclient is a generated GoMock package. +package mock_consensusclient + +import ( + context "context" + reflect "reflect" + + app "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + consensusclient "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient" + consensusproto "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + gomock "github.com/golang/mock/gomock" +) + +// MockService is a mock of Service interface. +type MockService struct { + ctrl *gomock.Controller + recorder *MockServiceMockRecorder +} + +// MockServiceMockRecorder is the mock recorder for MockService. +type MockServiceMockRecorder struct { + mock *MockService +} + +// NewMockService creates a new mock instance. +func NewMockService(ctrl *gomock.Controller) *MockService { + mock := &MockService{ctrl: ctrl} + mock.recorder = &MockServiceMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockService) EXPECT() *MockServiceMockRecorder { + return m.recorder +} + +// AddLog mocks base method. +func (m *MockService) AddLog(arg0 context.Context, arg1 *consensusproto.Log) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddLog", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddLog indicates an expected call of AddLog. +func (mr *MockServiceMockRecorder) AddLog(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddLog", reflect.TypeOf((*MockService)(nil).AddLog), arg0, arg1) +} + +// AddRecord mocks base method. +func (m *MockService) AddRecord(arg0 context.Context, arg1 []byte, arg2 *consensusproto.Record) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "AddRecord", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// AddRecord indicates an expected call of AddRecord. +func (mr *MockServiceMockRecorder) AddRecord(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddRecord", reflect.TypeOf((*MockService)(nil).AddRecord), arg0, arg1, arg2) +} + +// Close mocks base method. +func (m *MockService) Close(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockServiceMockRecorder) Close(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockService)(nil).Close), arg0) +} + +// Init mocks base method. +func (m *MockService) Init(arg0 *app.App) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Init", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Init indicates an expected call of Init. +func (mr *MockServiceMockRecorder) Init(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockService)(nil).Init), arg0) +} + +// Name mocks base method. +func (m *MockService) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockServiceMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name)) +} + +// Run mocks base method. +func (m *MockService) Run(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Run", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Run indicates an expected call of Run. +func (mr *MockServiceMockRecorder) Run(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockService)(nil).Run), arg0) +} + +// UnWatch mocks base method. +func (m *MockService) UnWatch(arg0 []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UnWatch", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UnWatch indicates an expected call of UnWatch. +func (mr *MockServiceMockRecorder) UnWatch(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnWatch", reflect.TypeOf((*MockService)(nil).UnWatch), arg0) +} + +// Watch mocks base method. +func (m *MockService) Watch(arg0 []byte, arg1 consensusclient.Watcher) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Watch", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// Watch indicates an expected call of Watch. +func (mr *MockServiceMockRecorder) Watch(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockService)(nil).Watch), arg0, arg1) +} diff --git a/consensus/consensusclient/stream.go b/consensus/consensusclient/stream.go index d3383629..d2865326 100644 --- a/consensus/consensusclient/stream.go +++ b/consensus/consensusclient/stream.go @@ -3,6 +3,7 @@ package consensusclient import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" "github.com/cheggaaa/mb/v2" + "sync" ) func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) *stream { @@ -17,6 +18,7 @@ func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) *stream { type stream struct { rpcStream consensusproto.DRPCConsensus_WatchLogClient mb *mb.MB[*consensusproto.WatchLogEvent] + mu sync.Mutex err error } @@ -37,6 +39,8 @@ func (s *stream) WaitLogs() []*consensusproto.WatchLogEvent { } func (s *stream) Err() error { + s.mu.Lock() + defer s.mu.Unlock() return s.err } @@ -45,7 +49,9 @@ func (s *stream) readStream() { for { event, err := s.rpcStream.Recv() if err != nil { + s.mu.Lock() s.err = err + s.mu.Unlock() return } if err = s.mb.Add(event); err != nil { diff --git a/node/acl/service.go b/node/acl/service.go index 5227a8ab..a417778c 100644 --- a/node/acl/service.go +++ b/node/acl/service.go @@ -7,6 +7,7 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/syncservice/synchandler" "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/cid" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" "time" @@ -16,7 +17,15 @@ const CName = "node.acl" var log = logger.NewNamed(CName) +func New() Service { + return &service{} +} + type Service interface { + CreateLog(ctx context.Context, aclId string, rawRec *aclrecordproto.RawACLRecord) (firstRecId string, err error) + AddRecord(ctx context.Context, aclId string, rawRec *aclrecordproto.RawACLRecord) (id string, err error) + Watch(ctx context.Context, spaceId, aclId string, h synchandler.SyncHandler) (err error) + UnWatch(aclId string) (err error) app.Component } @@ -35,51 +44,49 @@ func (s *service) Name() (name string) { return CName } -func (s *service) CreateLog(ctx context.Context, aclId string, rec *aclrecordproto.RawACLRecordWithId) (err error) { +func (s *service) CreateLog(ctx context.Context, aclId string, rawRec *aclrecordproto.RawACLRecord) (firstRecId string, err error) { logId, err := cidToByte(aclId) if err != nil { return } - recId, err := cidToByte(rec.Id) + recId, _, payload, err := s.signAndMarshal(rawRec) if err != nil { return } - recPayload, err := rec.Marshal() - if err != nil { - return - } - return s.consService.AddLog(ctx, &consensusproto.Log{ + if err = s.consService.AddLog(ctx, &consensusproto.Log{ Id: logId, Records: []*consensusproto.Record{ { Id: recId, - Payload: recPayload, + Payload: payload, CreatedUnix: uint64(time.Now().Unix()), }, }, - }) + }); err != nil { + return + } + return cidToString(recId) } -func (s *service) AddRecord(ctx context.Context, aclId string, rec *aclrecordproto.RawACLRecordWithId) (err error) { +func (s *service) AddRecord(ctx context.Context, aclId string, rawRec *aclrecordproto.RawACLRecord) (id string, err error) { logId, err := cidToByte(aclId) if err != nil { return } - recId, err := cidToByte(rec.Id) - if err != nil { - return - } - recPayload, err := rec.Marshal() + recId, prevId, payload, err := s.signAndMarshal(rawRec) if err != nil { return } - return s.consService.AddRecord(ctx, logId, &consensusproto.Record{ + if err = s.consService.AddRecord(ctx, logId, &consensusproto.Record{ Id: recId, - PrevId: nil, //TODO: - Payload: recPayload, + PrevId: prevId, + Payload: payload, CreatedUnix: uint64(time.Now().Unix()), - }) + }); err != nil { + return + } + return cidToString(recId) } func (s *service) Watch(ctx context.Context, spaceId, aclId string, h synchandler.SyncHandler) (err error) { @@ -100,3 +107,28 @@ func (s *service) UnWatch(aclId string) (err error) { } return s.consService.UnWatch(logId) } + +func (s *service) signAndMarshal(rawRec *aclrecordproto.RawACLRecord) (recId, prevId, payload []byte, err error) { + var rec = &aclrecordproto.ACLRecord{} + if err = rec.Unmarshal(rawRec.Payload); err != nil { + return + } + if rec.PrevId != "" { + if prevId, err = cidToByte(rec.PrevId); err != nil { + return + } + } + rawRec.AcceptorIdentity = s.account.Account().Identity + if rawRec.AcceptorSignature, err = s.account.Account().SignKey.Sign(rawRec.Payload); err != nil { + return + } + if payload, err = rawRec.Marshal(); err != nil { + return + } + recCid, err := cid.NewCIDFromBytes(payload) + if err != nil { + return + } + recId, err = cidToByte(recCid) + return +} diff --git a/node/acl/service_test.go b/node/acl/service_test.go new file mode 100644 index 00000000..eb720d64 --- /dev/null +++ b/node/acl/service_test.go @@ -0,0 +1,195 @@ +package acl + +import ( + "context" + "fmt" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/aclrecordproto" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/testutil/testaccount" + "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/cid" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient/mock_consensusclient" + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +var ctx = context.Background() + +func TestService_CreateLog(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + var clog *consensusproto.Log + fx.mockClient.EXPECT().AddLog(ctx, gomock.Any()).Do(func(ctx context.Context, l *consensusproto.Log) { + clog = l + }) + + aclId, _ := cid.NewCIDFromBytes([]byte("aclId")) + + rec := &aclrecordproto.ACLRecord{ + PrevId: "", + Identity: fx.account.Account().Identity, + Data: []byte{'1', '2', '3'}, + Timestamp: time.Now().Unix(), + } + pl, _ := rec.Marshal() + + firstRecId, err := fx.CreateLog(ctx, aclId, &aclrecordproto.RawACLRecord{ + Payload: pl, + }) + require.NoError(t, err) + aclIdBytes, _ := cidToByte(aclId) + firstRecIdBytes, _ := cidToByte(firstRecId) + assert.Equal(t, aclIdBytes, clog.Id) + assert.NotEmpty(t, firstRecIdBytes) + require.Len(t, clog.Records, 1) + + var resultRawAcl = &aclrecordproto.RawACLRecord{} + require.NoError(t, resultRawAcl.Unmarshal(clog.Records[0].Payload)) + valid, err := fx.account.Account().SignKey.GetPublic().Verify(resultRawAcl.Payload, resultRawAcl.AcceptorSignature) + require.NoError(t, err) + require.True(t, valid) +} + +func TestService_AddRecord(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + var clog *consensusproto.Log + fx.mockClient.EXPECT().AddLog(ctx, gomock.Any()).Do(func(ctx context.Context, l *consensusproto.Log) { + clog = l + }) + + aclId, _ := cid.NewCIDFromBytes([]byte("aclId")) + + rec := &aclrecordproto.ACLRecord{ + PrevId: "", + Identity: fx.account.Account().Identity, + Data: []byte{'1', '2', '3'}, + Timestamp: time.Now().Unix(), + } + pl, _ := rec.Marshal() + + firstRecId, err := fx.CreateLog(ctx, aclId, &aclrecordproto.RawACLRecord{ + Payload: pl, + }) + require.NoError(t, err) + aclIdBytes, _ := cidToByte(aclId) + firstRecIdBytes, _ := cidToByte(firstRecId) + assert.Equal(t, aclIdBytes, clog.Id) + assert.NotEmpty(t, firstRecIdBytes) + var addRec *consensusproto.Record + fx.mockClient.EXPECT().AddRecord(ctx, aclIdBytes, gomock.Any()).Do(func(ctx context.Context, logId []byte, rec *consensusproto.Record) { + addRec = rec + }) + rec = &aclrecordproto.ACLRecord{ + PrevId: firstRecId, + Identity: fx.account.Account().Identity, + Data: []byte{'1', '2', '3', '4'}, + Timestamp: time.Now().Unix(), + } + pl, _ = rec.Marshal() + + newRecId, err := fx.AddRecord(ctx, aclId, &aclrecordproto.RawACLRecord{ + Payload: pl, + }) + require.NoError(t, err) + assert.NotEmpty(t, newRecId) + + assert.Equal(t, firstRecIdBytes, addRec.PrevId) + +} + +func TestService_Watch(t *testing.T) { + t.Run("remote error", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + var expErr = fmt.Errorf("error") + aclId, _ := cid.NewCIDFromBytes([]byte("aclId")) + aclIdBytes, _ := cidToByte(aclId) + fx.mockClient.EXPECT().Watch(aclIdBytes, gomock.Any()).Do(func(aid []byte, w consensusclient.Watcher) { + assert.Equal(t, aclIdBytes, aid) + go func() { + time.Sleep(time.Millisecond * 10) + w.AddConsensusError(expErr) + }() + }) + + th := &testHandler{} + err := fx.Watch(ctx, "123", aclId, th) + assert.Equal(t, expErr, err) + }) + t.Run("success", func(t *testing.T) { + fx := newFixture(t) + defer fx.Finish(t) + aclId, _ := cid.NewCIDFromBytes([]byte("aclId")) + aclIdBytes, _ := cidToByte(aclId) + fx.mockClient.EXPECT().Watch(aclIdBytes, gomock.Any()).Do(func(aid []byte, w consensusclient.Watcher) { + assert.Equal(t, aclIdBytes, aid) + go func() { + time.Sleep(time.Millisecond * 10) + r1cid, _ := cid.NewCIDFromBytes([]byte("r1")) + r2cid, _ := cid.NewCIDFromBytes([]byte("r2")) + r1cidB, _ := cidToByte(r1cid) + r2cidB, _ := cidToByte(r2cid) + w.AddConsensusRecords([]*consensusproto.Record{ + { + Id: r2cidB, + PrevId: r1cidB, + Payload: []byte("p1"), + }, + { + Id: r1cidB, + Payload: []byte("p1"), + }, + }) + }() + }) + + th := &testHandler{} + err := fx.Watch(ctx, "123", aclId, th) + require.NoError(t, err) + }) +} + +func newFixture(t *testing.T) *fixture { + fx := &fixture{ + a: new(app.App), + ctrl: gomock.NewController(t), + account: &testaccount.AccountTestService{}, + } + fx.mockClient = mock_consensusclient.NewMockService(fx.ctrl) + fx.mockClient.EXPECT().Name().Return(consensusclient.CName).AnyTimes() + fx.mockClient.EXPECT().Init(gomock.Any()).AnyTimes() + fx.mockClient.EXPECT().Run(gomock.Any()).AnyTimes() + fx.mockClient.EXPECT().Close(gomock.Any()).AnyTimes() + fx.Service = New() + fx.a.Register(fx.account).Register(fx.mockClient).Register(fx.Service) + require.NoError(t, fx.a.Start(ctx)) + return fx +} + +type fixture struct { + Service + mockClient *mock_consensusclient.MockService + ctrl *gomock.Controller + a *app.App + account *testaccount.AccountTestService +} + +func (fx *fixture) Finish(t *testing.T) { + require.NoError(t, fx.a.Close(ctx)) + fx.ctrl.Finish() +} + +type testHandler struct { + req *spacesyncproto.ObjectSyncMessage +} + +func (t *testHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { + t.req = request + return +}