change consensus proto and client

This commit is contained in:
Sergey Cherepanov 2023-06-30 19:42:07 +02:00
parent 59cf8b46fd
commit 92cbfb1cb3
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
6 changed files with 779 additions and 140 deletions

View File

@ -30,7 +30,7 @@ func New() Service {
// Watcher watches new events by specified logId // Watcher watches new events by specified logId
type Watcher interface { type Watcher interface {
AddConsensusRecords(recs []*consensusproto.Record) AddConsensusRecords(recs []*consensusproto.RawRecordWithId)
AddConsensusError(err error) AddConsensusError(err error)
} }
@ -38,7 +38,7 @@ type Service interface {
// AddLog adds new log to consensus servers // AddLog adds new log to consensus servers
AddLog(ctx context.Context, clog *consensusproto.Log) (err error) AddLog(ctx context.Context, clog *consensusproto.Log) (err error)
// AddRecord adds new record to consensus servers // AddRecord adds new record to consensus servers
AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.RawRecord) (record *consensusproto.RawRecordWithId, err error)
// Watch starts watching to given logId and calls watcher when any relative event received // Watch starts watching to given logId and calls watcher when any relative event received
Watch(logId []byte, w Watcher) (err error) Watch(logId []byte, w Watcher) (err error)
// UnWatch stops watching given logId and removes watcher // UnWatch stops watching given logId and removes watcher
@ -97,9 +97,9 @@ func (s *service) AddLog(ctx context.Context, clog *consensusproto.Log) (err err
}) })
} }
func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error) { func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.RawRecord) (record *consensusproto.RawRecordWithId, err error) {
return s.doClient(ctx, func(cl consensusproto.DRPCConsensusClient) error { err = s.doClient(ctx, func(cl consensusproto.DRPCConsensusClient) error {
if _, err = cl.RecordAdd(ctx, &consensusproto.RecordAddRequest{ if record, err = cl.RecordAdd(ctx, &consensusproto.RecordAddRequest{
LogId: logId, LogId: logId,
Record: clog, Record: clog,
}); err != nil { }); err != nil {
@ -107,6 +107,7 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr
} }
return nil return nil
}) })
return
} }
func (s *service) Watch(logId []byte, w Watcher) (err error) { func (s *service) Watch(logId []byte, w Watcher) (err error) {

View File

@ -11,6 +11,7 @@ import (
"github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf"
"github.com/anyproto/any-sync/nodeconf/mock_nodeconf" "github.com/anyproto/any-sync/nodeconf/mock_nodeconf"
"github.com/anyproto/any-sync/testutil/accounttest" "github.com/anyproto/any-sync/testutil/accounttest"
"github.com/anyproto/any-sync/util/cidutil"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -118,7 +119,9 @@ func TestService_AddLog(t *testing.T) {
func TestService_AddRecord(t *testing.T) { func TestService_AddRecord(t *testing.T) {
fx := newFixture(t).run(t) fx := newFixture(t).run(t)
defer fx.Finish() defer fx.Finish()
assert.NoError(t, fx.AddRecord(ctx, []byte{'1'}, &consensusproto.Record{})) rec, err := fx.AddRecord(ctx, []byte{'1'}, &consensusproto.RawRecord{})
require.NoError(t, err)
assert.NotEmpty(t, rec)
} }
var ctx = context.Background() var ctx = context.Background()
@ -186,13 +189,15 @@ func (t *testServer) LogAdd(ctx context.Context, req *consensusproto.LogAddReque
return &consensusproto.Ok{}, nil return &consensusproto.Ok{}, nil
} }
func (t *testServer) RecordAdd(ctx context.Context, req *consensusproto.RecordAddRequest) (*consensusproto.Ok, error) { func (t *testServer) RecordAdd(ctx context.Context, req *consensusproto.RecordAddRequest) (*consensusproto.RawRecordWithId, error) {
if t.addRecord != nil { if t.addRecord != nil {
if err := t.addRecord(ctx, req); err != nil { if err := t.addRecord(ctx, req); err != nil {
return nil, err return nil, err
} }
} }
return &consensusproto.Ok{}, nil data, _ := req.Record.Marshal()
id, _ := cidutil.NewCidFromBytes(data)
return &consensusproto.RawRecordWithId{Id: id, Payload: data}, nil
} }
func (t *testServer) LogWatch(stream consensusproto.DRPCConsensus_LogWatchStream) error { func (t *testServer) LogWatch(stream consensusproto.DRPCConsensus_LogWatchStream) error {
@ -215,13 +220,13 @@ func (t *testServer) waitStream(test *testing.T) consensusproto.DRPCConsensus_Lo
} }
type testWatcher struct { type testWatcher struct {
recs [][]*consensusproto.Record recs [][]*consensusproto.RawRecordWithId
err error err error
ready chan struct{} ready chan struct{}
once sync.Once once sync.Once
} }
func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.Record) { func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.RawRecordWithId) {
t.recs = append(t.recs, recs) t.recs = append(t.recs, recs)
t.once.Do(func() { t.once.Do(func() {
close(t.ready) close(t.ready)

View File

@ -52,11 +52,12 @@ func (mr *MockServiceMockRecorder) AddLog(arg0, arg1 interface{}) *gomock.Call {
} }
// AddRecord mocks base method. // AddRecord mocks base method.
func (m *MockService) AddRecord(arg0 context.Context, arg1 []byte, arg2 *consensusproto.Record) error { func (m *MockService) AddRecord(arg0 context.Context, arg1 []byte, arg2 *consensusproto.RawRecord) (*consensusproto.RawRecordWithId, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "AddRecord", arg0, arg1, arg2) ret := m.ctrl.Call(m, "AddRecord", arg0, arg1, arg2)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(*consensusproto.RawRecordWithId)
return ret0 ret1, _ := ret[1].(error)
return ret0, ret1
} }
// AddRecord indicates an expected call of AddRecord. // AddRecord indicates an expected call of AddRecord.

File diff suppressed because it is too large Load Diff

View File

@ -41,7 +41,7 @@ type DRPCConsensusClient interface {
DRPCConn() drpc.Conn DRPCConn() drpc.Conn
LogAdd(ctx context.Context, in *LogAddRequest) (*Ok, error) LogAdd(ctx context.Context, in *LogAddRequest) (*Ok, error)
RecordAdd(ctx context.Context, in *RecordAddRequest) (*Ok, error) RecordAdd(ctx context.Context, in *RecordAddRequest) (*RawRecordWithId, error)
LogWatch(ctx context.Context) (DRPCConsensus_LogWatchClient, error) LogWatch(ctx context.Context) (DRPCConsensus_LogWatchClient, error)
} }
@ -64,8 +64,8 @@ func (c *drpcConsensusClient) LogAdd(ctx context.Context, in *LogAddRequest) (*O
return out, nil return out, nil
} }
func (c *drpcConsensusClient) RecordAdd(ctx context.Context, in *RecordAddRequest) (*Ok, error) { func (c *drpcConsensusClient) RecordAdd(ctx context.Context, in *RecordAddRequest) (*RawRecordWithId, error) {
out := new(Ok) out := new(RawRecordWithId)
err := c.cc.Invoke(ctx, "/consensusProto.Consensus/RecordAdd", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}, in, out) err := c.cc.Invoke(ctx, "/consensusProto.Consensus/RecordAdd", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}, in, out)
if err != nil { if err != nil {
return nil, err return nil, err
@ -114,7 +114,7 @@ func (x *drpcConsensus_LogWatchClient) RecvMsg(m *LogWatchEvent) error {
type DRPCConsensusServer interface { type DRPCConsensusServer interface {
LogAdd(context.Context, *LogAddRequest) (*Ok, error) LogAdd(context.Context, *LogAddRequest) (*Ok, error)
RecordAdd(context.Context, *RecordAddRequest) (*Ok, error) RecordAdd(context.Context, *RecordAddRequest) (*RawRecordWithId, error)
LogWatch(DRPCConsensus_LogWatchStream) error LogWatch(DRPCConsensus_LogWatchStream) error
} }
@ -124,7 +124,7 @@ func (s *DRPCConsensusUnimplementedServer) LogAdd(context.Context, *LogAddReques
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
} }
func (s *DRPCConsensusUnimplementedServer) RecordAdd(context.Context, *RecordAddRequest) (*Ok, error) { func (s *DRPCConsensusUnimplementedServer) RecordAdd(context.Context, *RecordAddRequest) (*RawRecordWithId, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
} }
@ -191,14 +191,14 @@ func (x *drpcConsensus_LogAddStream) SendAndClose(m *Ok) error {
type DRPCConsensus_RecordAddStream interface { type DRPCConsensus_RecordAddStream interface {
drpc.Stream drpc.Stream
SendAndClose(*Ok) error SendAndClose(*RawRecordWithId) error
} }
type drpcConsensus_RecordAddStream struct { type drpcConsensus_RecordAddStream struct {
drpc.Stream drpc.Stream
} }
func (x *drpcConsensus_RecordAddStream) SendAndClose(m *Ok) error { func (x *drpcConsensus_RecordAddStream) SendAndClose(m *RawRecordWithId) error {
if err := x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil { if err := x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
return err return err
} }

View File

@ -14,21 +14,38 @@ enum ErrCodes {
message Log { message Log {
bytes id = 1; bytes id = 1;
repeated Record records = 2; bytes payload = 2;
repeated RawRecordWithId records = 3;
} }
message Record { // RawRecord is a proto message containing the payload in bytes, signature of the account who added it and signature of the acceptor
bytes id = 1; message RawRecord {
bytes prevId = 2; bytes payload = 1;
bytes payload = 3; bytes signature = 2;
uint64 createdUnix = 4; bytes acceptorIdentity = 3;
bytes acceptorSignature = 4;
} }
// RawRecordWithId is a raw record and the id for convenience
message RawRecordWithId {
bytes payload = 1;
string id = 2;
}
// Record is a record containing a data
message Record {
string prevId = 1;
bytes identity = 2;
bytes data = 3;
int64 timestamp = 4;
}
service Consensus { service Consensus {
// AddLog adds new log to consensus // AddLog adds new log to consensus
rpc LogAdd(LogAddRequest) returns (Ok); rpc LogAdd(LogAddRequest) returns (Ok);
// AddRecord adds new record to log // AddRecord adds new record to log
rpc RecordAdd(RecordAddRequest) returns (Ok); rpc RecordAdd(RecordAddRequest) returns (RawRecordWithId);
// WatchLog fetches log and subscribes for a changes // WatchLog fetches log and subscribes for a changes
rpc LogWatch(stream LogWatchRequest) returns (stream LogWatchEvent); rpc LogWatch(stream LogWatchRequest) returns (stream LogWatchEvent);
} }
@ -41,7 +58,7 @@ message LogAddRequest {
message RecordAddRequest { message RecordAddRequest {
bytes logId = 1; bytes logId = 1;
Record record = 2; RawRecord record = 2;
} }
message LogWatchRequest { message LogWatchRequest {
@ -51,7 +68,7 @@ message LogWatchRequest {
message LogWatchEvent { message LogWatchEvent {
bytes logId = 1; bytes logId = 1;
repeated Record records = 2; repeated RawRecordWithId records = 2;
Err error = 3; Err error = 3;
} }