diff --git a/cmd/consensusnode/testclient/consensustestclient.go b/cmd/consensusnode/testclient/consensustestclient.go index c0301b8b..d362f03c 100644 --- a/cmd/consensusnode/testclient/consensustestclient.go +++ b/cmd/consensusnode/testclient/consensustestclient.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "context" "flag" "fmt" @@ -17,10 +18,12 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/node/account" "go.uber.org/zap" "gopkg.in/mgo.v2/bson" + "math/rand" "net/http" _ "net/http/pprof" "os" "os/signal" + "sync" "syscall" "time" ) @@ -77,6 +80,9 @@ func main() { log.Info("test success!") } + b := &bench{service: a.MustComponent(consensusclient.CName).(consensusclient.Service)} + b.run() + // wait exit signal exit := make(chan os.Signal, 1) signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) @@ -206,11 +212,18 @@ func testStream(service consensusclient.Service) (err error) { } log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st))) - stream, err := service.WatchLog(ctx, newLogId) + stream, err := service.WatchLog(ctx) if err != nil { return err } defer stream.Close() + + st = time.Now() + if err = stream.WatchIds([][]byte{newLogId}); err != nil { + return err + } + log.Info("watch", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st))) + sr := readStream(stream) for i := 0; i < 10; i++ { st = time.Now() @@ -226,35 +239,129 @@ func testStream(service consensusclient.Service) (err error) { lastRecId = recId log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st))) } - fmt.Println(sr.log.Records) + sr.validate() return nil } -func readStream(stream consensusproto.DRPCConsensus_WatchLogClient) *streamReader { - sr := &streamReader{stream: stream} +func readStream(stream consensusclient.Stream) *streamReader { + sr := &streamReader{stream: stream, logs: map[string]*consensusproto.Log{}} go sr.read() return sr } type streamReader struct { - stream consensusproto.DRPCConsensus_WatchLogClient - log *consensusproto.Log + stream consensusclient.Stream + logs map[string]*consensusproto.Log } func (sr *streamReader) read() { for { - event, err := sr.stream.Recv() - if err != nil { + recs := sr.stream.WaitLogs() + if len(recs) == 0 { return } - fmt.Println("received event", event) - if sr.log == nil { - sr.log = &consensusproto.Log{ - Id: event.LogId, - Records: event.Records, + for _, rec := range recs { + if el, ok := sr.logs[string(rec.Id)]; !ok { + sr.logs[string(rec.Id)] = &consensusproto.Log{ + Id: rec.Id, + Records: rec.Records, + } + } else { + el.Records = append(rec.Records, el.Records...) + sr.logs[string(rec.Id)] = el } - } else { - sr.log.Records = append(event.Records, sr.log.Records...) } } } + +func (sr *streamReader) validate() { + var lc, rc int + for _, log := range sr.logs { + lc++ + rc += len(log.Records) + validateLog(log) + } + fmt.Println("logs valid; log count:", lc, "records:", rc) +} + +func validateLog(log *consensusproto.Log) { + var prevId []byte + for _, rec := range log.Records { + if len(prevId) != 0 { + if !bytes.Equal(prevId, rec.Id) { + panic(fmt.Sprintf("invalid log: %+v", log)) + } + } + prevId = rec.PrevId + } +} + +type bench struct { + service consensusclient.Service + stream consensusclient.Stream +} + +func (b *bench) run() { + var err error + b.stream, err = b.service.WatchLog(context.Background()) + if err != nil { + panic(err) + } + defer b.stream.Close() + sr := readStream(b.stream) + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + time.Sleep(time.Second / 100) + wg.Add(1) + go func() { + defer wg.Done() + b.client() + }() + fmt.Println("total streams:", i+1) + } + wg.Wait() + sr.validate() +} + +func (b *bench) client() { + ctx := context.Background() + + // create log + newLogId := []byte(bson.NewObjectId()) + lastRecId := []byte(bson.NewObjectId()) + err := b.service.AddLog(ctx, &consensusproto.Log{ + Id: newLogId, + Records: []*consensusproto.Record{ + { + Id: lastRecId, + Payload: []byte("test"), + CreatedUnix: uint64(time.Now().Unix()), + }, + }, + }) + for i := 0; i < 5; i++ { + fmt.Println("watch", bson.ObjectId(newLogId).Hex()) + if err = b.stream.WatchIds([][]byte{newLogId}); err != nil { + panic(err) + } + for i := 0; i < rand.Intn(20); i++ { + time.Sleep(time.Millisecond * time.Duration(rand.Intn(10000))) + recId := []byte(bson.NewObjectId()) + err = b.service.AddRecord(ctx, newLogId, &consensusproto.Record{ + Id: recId, + PrevId: lastRecId, + Payload: []byte("some payload 1 2 3 4 5 6 6 7 oijoj"), + CreatedUnix: uint64(time.Now().Unix()), + }) + if err != nil { + panic(err) + } + lastRecId = recId + } + if err = b.stream.UnwatchIds([][]byte{newLogId}); err != nil { + panic(err) + } + fmt.Println("unwatch", bson.ObjectId(newLogId).Hex()) + time.Sleep(time.Minute * 1) + } +} diff --git a/consensus/consensus.go b/consensus/consensus.go index a041ae26..c0b99f4b 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -13,10 +13,3 @@ type Record struct { Payload []byte `bson:"payload"` Created time.Time `bson:"created"'` } - -func (l Log) CopyRecords() Log { - l2 := l - l2.Records = make([]Record, len(l.Records)) - copy(l2.Records, l.Records) - return l2 -} diff --git a/consensus/consensusclient/client.go b/consensus/consensusclient/client.go index 0344734b..39a1bcf4 100644 --- a/consensus/consensusclient/client.go +++ b/consensus/consensusclient/client.go @@ -21,7 +21,7 @@ func New() Service { type Service interface { AddLog(ctx context.Context, clog *consensusproto.Log) (err error) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error) - WatchLog(ctx context.Context, logId []byte) (stream consensusproto.DRPCConsensus_WatchLogClient, err error) + WatchLog(ctx context.Context) (stream Stream, err error) app.Component } @@ -83,15 +83,14 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr return } -func (s *service) WatchLog(ctx context.Context, logId []byte) (stream consensusproto.DRPCConsensus_WatchLogClient, err error) { +func (s *service) WatchLog(ctx context.Context) (st Stream, err error) { cl, err := s.dialClient(ctx) if err != nil { return } - if stream, err = cl.WatchLog(ctx, &consensusproto.WatchLogRequest{ - LogId: logId, - }); err != nil { + rpcStream, err := cl.WatchLog(ctx) + if err != nil { return nil, rpcerr.Unwrap(err) } - return + return runStream(rpcStream), nil } diff --git a/consensus/consensusclient/stream.go b/consensus/consensusclient/stream.go new file mode 100644 index 00000000..9c18d07a --- /dev/null +++ b/consensus/consensusclient/stream.go @@ -0,0 +1,66 @@ +package consensusclient + +import ( + "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto" + "github.com/cheggaaa/mb/v2" +) + +type Stream interface { + WatchIds(logIds [][]byte) (err error) + UnwatchIds(logIds [][]byte) (err error) + WaitLogs() []*consensusproto.Log + Close() error +} + +func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream { + st := &stream{ + rpcStream: rpcStream, + mb: mb.New((*consensusproto.Log)(nil), 100), + } + go st.readStream() + return st +} + +type stream struct { + rpcStream consensusproto.DRPCConsensus_WatchLogClient + mb *mb.MB[*consensusproto.Log] +} + +func (s *stream) WatchIds(logIds [][]byte) (err error) { + return s.rpcStream.Send(&consensusproto.WatchLogRequest{ + WatchIds: logIds, + }) +} + +func (s *stream) UnwatchIds(logIds [][]byte) (err error) { + return s.rpcStream.Send(&consensusproto.WatchLogRequest{ + UnwatchIds: logIds, + }) +} + +func (s *stream) WaitLogs() []*consensusproto.Log { + return s.mb.Wait() +} + +func (s *stream) readStream() { + defer s.Close() + for { + event, err := s.rpcStream.Recv() + if err != nil { + return + } + if err = s.mb.Add(&consensusproto.Log{ + Id: event.LogId, + Records: event.Records, + }); err != nil { + return + } + } +} + +func (s *stream) Close() error { + if err := s.mb.Close(); err == nil { + return s.rpcStream.Close() + } + return nil +} diff --git a/consensus/consensusproto/consensus.pb.go b/consensus/consensusproto/consensus.pb.go index 1b3e20d2..a2327271 100644 --- a/consensus/consensusproto/consensus.pb.go +++ b/consensus/consensusproto/consensus.pb.go @@ -317,7 +317,8 @@ func (m *AddRecordRequest) GetRecord() *Record { } type WatchLogRequest struct { - LogId []byte `protobuf:"bytes,1,opt,name=logId,proto3" json:"logId,omitempty"` + WatchIds [][]byte `protobuf:"bytes,1,rep,name=watchIds,proto3" json:"watchIds,omitempty"` + UnwatchIds [][]byte `protobuf:"bytes,2,rep,name=unwatchIds,proto3" json:"unwatchIds,omitempty"` } func (m *WatchLogRequest) Reset() { *m = WatchLogRequest{} } @@ -353,9 +354,16 @@ func (m *WatchLogRequest) XXX_DiscardUnknown() { var xxx_messageInfo_WatchLogRequest proto.InternalMessageInfo -func (m *WatchLogRequest) GetLogId() []byte { +func (m *WatchLogRequest) GetWatchIds() [][]byte { if m != nil { - return m.LogId + return m.WatchIds + } + return nil +} + +func (m *WatchLogRequest) GetUnwatchIds() [][]byte { + if m != nil { + return m.UnwatchIds } return nil } @@ -428,36 +436,37 @@ func init() { } var fileDescriptor_b8d7f1c16b400059 = []byte{ - // 458 bytes of a gzipped FileDescriptorProto + // 480 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x8b, 0xd3, 0x40, - 0x14, 0xee, 0x24, 0x35, 0xbb, 0x7d, 0x69, 0xbb, 0xf1, 0xb1, 0x48, 0xd8, 0xc5, 0x50, 0xe2, 0xc1, - 0x22, 0xd2, 0x95, 0x2a, 0x78, 0xf2, 0xb0, 0x96, 0x8a, 0x0b, 0xc5, 0x42, 0xa0, 0x0a, 0x9e, 0x8c, - 0x99, 0x69, 0x0c, 0x1b, 0x32, 0x75, 0x66, 0xba, 0x74, 0xff, 0x85, 0x3f, 0xc4, 0x1f, 0xe2, 0x45, - 0xd8, 0xa3, 0x47, 0x69, 0xff, 0x88, 0x74, 0xd2, 0xac, 0xc9, 0xb6, 0x3d, 0xec, 0x25, 0xc9, 0x7c, - 0xdf, 0xfb, 0xde, 0xf7, 0xe6, 0x7b, 0x04, 0xce, 0x22, 0x9e, 0x49, 0x96, 0xc9, 0xb9, 0xfc, 0xff, - 0x35, 0x13, 0x5c, 0xf1, 0x33, 0xfd, 0x2c, 0xa1, 0x3d, 0x0d, 0x60, 0x33, 0xcc, 0xae, 0x07, 0x05, - 0xe6, 0xc7, 0x60, 0x8e, 0x78, 0x8c, 0x6d, 0x30, 0x12, 0xea, 0x92, 0x0e, 0xe9, 0x36, 0x03, 0x23, - 0xa1, 0xd8, 0x83, 0x03, 0xc1, 0x22, 0x2e, 0xa8, 0x74, 0x8d, 0x8e, 0xd9, 0xb5, 0xfb, 0xc7, 0xbd, - 0xb2, 0xac, 0x17, 0x68, 0x32, 0x28, 0x8a, 0xb0, 0x03, 0x76, 0x24, 0x58, 0xa8, 0x18, 0x9d, 0x64, - 0xc9, 0xc2, 0x35, 0x3b, 0xa4, 0x5b, 0x0f, 0xca, 0x90, 0x9f, 0x82, 0x95, 0x8b, 0xb6, 0xbc, 0x1e, - 0x81, 0x35, 0x13, 0xec, 0xea, 0x82, 0xba, 0x86, 0xc6, 0x36, 0x27, 0x74, 0xe1, 0x60, 0x16, 0x5e, - 0xa7, 0x3c, 0xa4, 0xba, 0x5f, 0x33, 0x28, 0x8e, 0x77, 0xdd, 0xea, 0xdb, 0x6e, 0x75, 0x30, 0xc6, - 0x97, 0xfe, 0x2b, 0x68, 0x9d, 0x53, 0x3a, 0xe2, 0x71, 0xc0, 0xbe, 0xcf, 0x99, 0x54, 0xf8, 0x04, - 0xcc, 0x94, 0xc7, 0xda, 0xdb, 0xee, 0x3f, 0xac, 0x5e, 0x69, 0x5d, 0xb6, 0x66, 0xfd, 0x8f, 0xe0, - 0x9c, 0x53, 0xba, 0xb9, 0xe1, 0x46, 0x78, 0x0c, 0x0f, 0x52, 0x1e, 0x5f, 0x14, 0x63, 0xe7, 0x07, - 0x7c, 0x0e, 0x56, 0x1e, 0x80, 0x9e, 0x7c, 0x5f, 0x48, 0x9b, 0x1a, 0xff, 0x29, 0x1c, 0x7d, 0x0a, - 0x55, 0xf4, 0xad, 0x34, 0xcf, 0xce, 0xb6, 0xfe, 0x04, 0x5a, 0x45, 0xe1, 0xf0, 0x8a, 0x65, 0xfb, - 0xdc, 0xef, 0xb9, 0xa3, 0x67, 0x5f, 0xe0, 0x70, 0x28, 0xc4, 0x80, 0x53, 0x26, 0xb1, 0x0d, 0x30, - 0xc9, 0xd8, 0x62, 0xc6, 0x22, 0xc5, 0xa8, 0x53, 0xc3, 0x16, 0x34, 0xd6, 0x6e, 0x8b, 0x44, 0x2a, - 0xe9, 0x10, 0x3c, 0x02, 0x7b, 0xc4, 0xe3, 0x0f, 0x5c, 0xbd, 0xe3, 0xf3, 0x8c, 0x3a, 0x06, 0x22, - 0xb4, 0xf3, 0x76, 0x03, 0x9e, 0x4d, 0xd3, 0x24, 0x52, 0x8e, 0x89, 0x0e, 0xd8, 0x43, 0x21, 0xb8, - 0x18, 0x4f, 0xa7, 0x92, 0x29, 0xe7, 0xa7, 0xd1, 0xff, 0x4d, 0xa0, 0x71, 0xeb, 0x8f, 0xaf, 0xc1, - 0xca, 0xd3, 0xc7, 0xd3, 0xea, 0x60, 0x95, 0x9d, 0x9c, 0x38, 0x55, 0x72, 0x7c, 0x89, 0x6f, 0xa0, - 0x71, 0xbb, 0x00, 0xf4, 0xb6, 0xb4, 0x95, 0xcd, 0xec, 0x90, 0xbf, 0x87, 0xc3, 0x22, 0x3e, 0x7c, - 0x5c, 0x65, 0xef, 0xe4, 0x7f, 0x72, 0xba, 0x9b, 0xd6, 0xa9, 0xbf, 0x20, 0x6f, 0xfb, 0xbf, 0x96, - 0x1e, 0xb9, 0x59, 0x7a, 0xe4, 0xef, 0xd2, 0x23, 0x3f, 0x56, 0x5e, 0xed, 0x66, 0xe5, 0xd5, 0xfe, - 0xac, 0xbc, 0xda, 0x67, 0x77, 0xdf, 0x5f, 0xf7, 0xd5, 0xd2, 0xaf, 0x97, 0xff, 0x02, 0x00, 0x00, - 0xff, 0xff, 0x2f, 0xe9, 0x67, 0xad, 0x98, 0x03, 0x00, 0x00, + 0x14, 0xee, 0x24, 0x6b, 0xb7, 0x7d, 0x69, 0xbb, 0xf1, 0xb1, 0x48, 0xe8, 0x62, 0x08, 0xf1, 0x12, + 0x44, 0xba, 0x52, 0x05, 0x4f, 0x1e, 0xd6, 0x52, 0xa1, 0x52, 0x2d, 0x04, 0xaa, 0xe0, 0xc9, 0x98, + 0x99, 0xc6, 0xb0, 0x21, 0x53, 0x67, 0xd2, 0xb5, 0xfb, 0x2f, 0xfc, 0x21, 0xfe, 0x10, 0x6f, 0xee, + 0xd1, 0xa3, 0xb4, 0x7f, 0x44, 0x3a, 0x69, 0x6a, 0xb2, 0xdd, 0x1e, 0xbc, 0x24, 0x79, 0xdf, 0x37, + 0xef, 0x7d, 0xdf, 0xbc, 0x8f, 0xc0, 0x79, 0xc8, 0x53, 0xc9, 0x52, 0xb9, 0x90, 0xff, 0xbe, 0xe6, + 0x82, 0x67, 0xfc, 0x5c, 0x3d, 0x4b, 0x68, 0x4f, 0x01, 0xd8, 0x0a, 0xd2, 0xeb, 0x41, 0x81, 0xb9, + 0x11, 0xe8, 0x63, 0x1e, 0x61, 0x07, 0xb4, 0x98, 0x5a, 0xc4, 0x21, 0x5e, 0xcb, 0xd7, 0x62, 0x8a, + 0x3d, 0x38, 0x16, 0x2c, 0xe4, 0x82, 0x4a, 0x4b, 0x73, 0x74, 0xcf, 0xe8, 0x9f, 0xf6, 0xca, 0x6d, + 0x3d, 0x5f, 0x91, 0x7e, 0x71, 0x08, 0x1d, 0x30, 0x42, 0xc1, 0x82, 0x8c, 0xd1, 0x69, 0x1a, 0x2f, + 0x2d, 0xdd, 0x21, 0xde, 0x91, 0x5f, 0x86, 0xdc, 0x04, 0xea, 0x79, 0xd3, 0x9e, 0xd6, 0x03, 0xa8, + 0xcf, 0x05, 0xbb, 0x1a, 0x51, 0x4b, 0x53, 0xd8, 0xb6, 0x42, 0x0b, 0x8e, 0xe7, 0xc1, 0x75, 0xc2, + 0x03, 0xaa, 0xe6, 0xb5, 0xfc, 0xa2, 0xbc, 0xad, 0x76, 0xb4, 0xaf, 0x76, 0x04, 0xda, 0xe4, 0xd2, + 0x7d, 0x0e, 0xed, 0x0b, 0x4a, 0xc7, 0x3c, 0xf2, 0xd9, 0xd7, 0x05, 0x93, 0x19, 0x3e, 0x02, 0x3d, + 0xe1, 0x91, 0xd2, 0x36, 0xfa, 0xf7, 0xab, 0x57, 0xda, 0x1c, 0xdb, 0xb0, 0xee, 0x7b, 0x30, 0x2f, + 0x28, 0xdd, 0xde, 0x70, 0xdb, 0x78, 0x0a, 0xf7, 0x12, 0x1e, 0x8d, 0x0a, 0xdb, 0x79, 0x81, 0x4f, + 0xa0, 0x9e, 0x2f, 0x40, 0x39, 0x3f, 0xb4, 0xa4, 0xed, 0x19, 0xf7, 0x2d, 0x9c, 0x7c, 0x08, 0xb2, + 0xf0, 0x4b, 0xc9, 0x4f, 0x17, 0x1a, 0xdf, 0x36, 0xd0, 0x88, 0x4a, 0x8b, 0x38, 0xba, 0xd7, 0xf2, + 0x77, 0x35, 0xda, 0x00, 0x8b, 0x74, 0xc7, 0x6a, 0x8a, 0x2d, 0x21, 0xee, 0x14, 0xda, 0xc5, 0xb8, + 0xe1, 0x15, 0x4b, 0x0f, 0x79, 0xfc, 0xcf, 0x24, 0x1f, 0x7f, 0x82, 0xc6, 0x50, 0x88, 0x01, 0xa7, + 0x4c, 0x62, 0x07, 0x60, 0x9a, 0xb2, 0xe5, 0x9c, 0x85, 0x19, 0xa3, 0x66, 0x0d, 0xdb, 0xd0, 0xdc, + 0xa8, 0x2d, 0x63, 0x99, 0x49, 0x93, 0xe0, 0x09, 0x18, 0x63, 0x1e, 0xbd, 0xe3, 0xd9, 0x6b, 0xbe, + 0x48, 0xa9, 0xa9, 0x21, 0x42, 0x27, 0x1f, 0x37, 0xe0, 0xe9, 0x2c, 0x89, 0xc3, 0xcc, 0xd4, 0xd1, + 0x04, 0x63, 0x28, 0x04, 0x17, 0x93, 0xd9, 0x4c, 0xb2, 0xcc, 0xfc, 0xa1, 0xf5, 0x7f, 0x11, 0x68, + 0xee, 0xf4, 0xf1, 0x05, 0xd4, 0xf3, 0x8c, 0xf0, 0xac, 0x6a, 0xac, 0x92, 0x5c, 0xd7, 0xac, 0x92, + 0x93, 0x4b, 0x7c, 0x09, 0xcd, 0x5d, 0x4c, 0x68, 0xef, 0xf5, 0x56, 0xf2, 0xbb, 0xa3, 0xfd, 0x0d, + 0x34, 0x8a, 0xf5, 0xe1, 0xc3, 0x2a, 0x7b, 0x2b, 0xa5, 0xee, 0xd9, 0xdd, 0xb4, 0xda, 0xba, 0x47, + 0x9e, 0x92, 0x57, 0xfd, 0x9f, 0x2b, 0x9b, 0xdc, 0xac, 0x6c, 0xf2, 0x67, 0x65, 0x93, 0xef, 0x6b, + 0xbb, 0x76, 0xb3, 0xb6, 0x6b, 0xbf, 0xd7, 0x76, 0xed, 0xa3, 0x75, 0xe8, 0xef, 0xfc, 0x5c, 0x57, + 0xaf, 0x67, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xf0, 0xc9, 0xa1, 0x1f, 0xc0, 0x03, 0x00, 0x00, } func (m *Log) Marshal() (dAtA []byte, err error) { @@ -678,12 +687,23 @@ func (m *WatchLogRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.LogId) > 0 { - i -= len(m.LogId) - copy(dAtA[i:], m.LogId) - i = encodeVarintConsensus(dAtA, i, uint64(len(m.LogId))) - i-- - dAtA[i] = 0xa + if len(m.UnwatchIds) > 0 { + for iNdEx := len(m.UnwatchIds) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.UnwatchIds[iNdEx]) + copy(dAtA[i:], m.UnwatchIds[iNdEx]) + i = encodeVarintConsensus(dAtA, i, uint64(len(m.UnwatchIds[iNdEx]))) + i-- + dAtA[i] = 0x12 + } + } + if len(m.WatchIds) > 0 { + for iNdEx := len(m.WatchIds) - 1; iNdEx >= 0; iNdEx-- { + i -= len(m.WatchIds[iNdEx]) + copy(dAtA[i:], m.WatchIds[iNdEx]) + i = encodeVarintConsensus(dAtA, i, uint64(len(m.WatchIds[iNdEx]))) + i-- + dAtA[i] = 0xa + } } return len(dAtA) - i, nil } @@ -834,9 +854,17 @@ func (m *WatchLogRequest) Size() (n int) { } var l int _ = l - l = len(m.LogId) - if l > 0 { - n += 1 + l + sovConsensus(uint64(l)) + if len(m.WatchIds) > 0 { + for _, b := range m.WatchIds { + l = len(b) + n += 1 + l + sovConsensus(uint64(l)) + } + } + if len(m.UnwatchIds) > 0 { + for _, b := range m.UnwatchIds { + l = len(b) + n += 1 + l + sovConsensus(uint64(l)) + } } return n } @@ -1461,7 +1489,7 @@ func (m *WatchLogRequest) Unmarshal(dAtA []byte) error { switch fieldNum { case 1: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field LogId", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field WatchIds", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -1488,10 +1516,40 @@ func (m *WatchLogRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.LogId = append(m.LogId[:0], dAtA[iNdEx:postIndex]...) - if m.LogId == nil { - m.LogId = []byte{} + m.WatchIds = append(m.WatchIds, make([]byte, postIndex-iNdEx)) + copy(m.WatchIds[len(m.WatchIds)-1], dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UnwatchIds", wireType) } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowConsensus + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthConsensus + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthConsensus + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UnwatchIds = append(m.UnwatchIds, make([]byte, postIndex-iNdEx)) + copy(m.UnwatchIds[len(m.UnwatchIds)-1], dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex diff --git a/consensus/consensusproto/consensus_drpc.pb.go b/consensus/consensusproto/consensus_drpc.pb.go index f573360f..ccdf5bc9 100644 --- a/consensus/consensusproto/consensus_drpc.pb.go +++ b/consensus/consensusproto/consensus_drpc.pb.go @@ -42,7 +42,7 @@ type DRPCConsensusClient interface { AddLog(ctx context.Context, in *AddLogRequest) (*Ok, error) AddRecord(ctx context.Context, in *AddRecordRequest) (*Ok, error) - WatchLog(ctx context.Context, in *WatchLogRequest) (DRPCConsensus_WatchLogClient, error) + WatchLog(ctx context.Context) (DRPCConsensus_WatchLogClient, error) } type drpcConsensusClient struct { @@ -73,23 +73,18 @@ func (c *drpcConsensusClient) AddRecord(ctx context.Context, in *AddRecordReques return out, nil } -func (c *drpcConsensusClient) WatchLog(ctx context.Context, in *WatchLogRequest) (DRPCConsensus_WatchLogClient, error) { +func (c *drpcConsensusClient) WatchLog(ctx context.Context) (DRPCConsensus_WatchLogClient, error) { stream, err := c.cc.NewStream(ctx, "/anyConsensus.Consensus/WatchLog", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}) if err != nil { return nil, err } x := &drpcConsensus_WatchLogClient{stream} - if err := x.MsgSend(in, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil { - return nil, err - } - if err := x.CloseSend(); err != nil { - return nil, err - } return x, nil } type DRPCConsensus_WatchLogClient interface { drpc.Stream + Send(*WatchLogRequest) error Recv() (*WatchLogEvent, error) } @@ -97,6 +92,10 @@ type drpcConsensus_WatchLogClient struct { drpc.Stream } +func (x *drpcConsensus_WatchLogClient) Send(m *WatchLogRequest) error { + return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}) +} + func (x *drpcConsensus_WatchLogClient) Recv() (*WatchLogEvent, error) { m := new(WatchLogEvent) if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil { @@ -112,7 +111,7 @@ func (x *drpcConsensus_WatchLogClient) RecvMsg(m *WatchLogEvent) error { type DRPCConsensusServer interface { AddLog(context.Context, *AddLogRequest) (*Ok, error) AddRecord(context.Context, *AddRecordRequest) (*Ok, error) - WatchLog(*WatchLogRequest, DRPCConsensus_WatchLogStream) error + WatchLog(DRPCConsensus_WatchLogStream) error } type DRPCConsensusUnimplementedServer struct{} @@ -125,7 +124,7 @@ func (s *DRPCConsensusUnimplementedServer) AddRecord(context.Context, *AddRecord return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } -func (s *DRPCConsensusUnimplementedServer) WatchLog(*WatchLogRequest, DRPCConsensus_WatchLogStream) error { +func (s *DRPCConsensusUnimplementedServer) WatchLog(DRPCConsensus_WatchLogStream) error { return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) } @@ -158,8 +157,7 @@ func (DRPCConsensusDescription) Method(n int) (string, drpc.Encoding, drpc.Recei func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { return nil, srv.(DRPCConsensusServer). WatchLog( - in1.(*WatchLogRequest), - &drpcConsensus_WatchLogStream{in2.(drpc.Stream)}, + &drpcConsensus_WatchLogStream{in1.(drpc.Stream)}, ) }, DRPCConsensusServer.WatchLog, true default: @@ -206,6 +204,7 @@ func (x *drpcConsensus_AddRecordStream) SendAndClose(m *Ok) error { type DRPCConsensus_WatchLogStream interface { drpc.Stream Send(*WatchLogEvent) error + Recv() (*WatchLogRequest, error) } type drpcConsensus_WatchLogStream struct { @@ -215,3 +214,15 @@ type drpcConsensus_WatchLogStream struct { func (x *drpcConsensus_WatchLogStream) Send(m *WatchLogEvent) error { return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}) } + +func (x *drpcConsensus_WatchLogStream) Recv() (*WatchLogRequest, error) { + m := new(WatchLogRequest) + if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil { + return nil, err + } + return m, nil +} + +func (x *drpcConsensus_WatchLogStream) RecvMsg(m *WatchLogRequest) error { + return x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}) +} diff --git a/consensus/consensusproto/protos/consensus.proto b/consensus/consensusproto/protos/consensus.proto index 256d2868..58d6c185 100644 --- a/consensus/consensusproto/protos/consensus.proto +++ b/consensus/consensusproto/protos/consensus.proto @@ -31,7 +31,7 @@ service Consensus { // AddRecord adds new record to log rpc AddRecord(AddRecordRequest) returns (Ok); // WatchLog fetches log and subscribes for a changes - rpc WatchLog(WatchLogRequest) returns (stream WatchLogEvent); + rpc WatchLog(stream WatchLogRequest) returns (stream WatchLogEvent); } message Ok {} @@ -46,7 +46,8 @@ message AddRecordRequest { } message WatchLogRequest { - bytes logId = 1; + repeated bytes watchIds = 1; + repeated bytes unwatchIds = 2; } message WatchLogEvent { diff --git a/consensus/consensusrpc/consensrpc.go b/consensus/consensusrpc/consensrpc.go index ecf458a7..5d18b34a 100644 --- a/consensus/consensusrpc/consensrpc.go +++ b/consensus/consensusrpc/consensrpc.go @@ -46,30 +46,19 @@ func (c *consensusRpc) AddRecord(ctx context.Context, req *consensusproto.AddRec return &consensusproto.Ok{}, nil } -func (c *consensusRpc) WatchLog(req *consensusproto.WatchLogRequest, rpcStream consensusproto.DRPCConsensus_WatchLogStream) error { - stream, err := c.stream.Subscribe(rpcStream.Context(), req.LogId) - if err != nil { - return err - } +func (c *consensusRpc) WatchLog(rpcStream consensusproto.DRPCConsensus_WatchLogStream) error { + stream := c.stream.NewStream() defer stream.Close() - var logSent bool + go c.readStream(stream, rpcStream) for { - if !logSent { - if err = rpcStream.Send(&consensusproto.WatchLogEvent{ - LogId: req.LogId, - Records: recordsToProto(stream.Records()), - }); err != nil { - return err - } - logSent = true - } else { - recs := stream.WaitRecords() - if len(recs) == 0 { - return rpcStream.Close() - } - if err = rpcStream.Send(&consensusproto.WatchLogEvent{ - LogId: req.LogId, - Records: recordsToProto(recs), + recs := stream.WaitLogs() + if len(recs) == 0 { + return rpcStream.Close() + } + for _, rec := range recs { + if err := rpcStream.Send(&consensusproto.WatchLogEvent{ + LogId: rec.Id, + Records: recordsToProto(rec.Records), }); err != nil { return err } @@ -77,6 +66,18 @@ func (c *consensusRpc) WatchLog(req *consensusproto.WatchLogRequest, rpcStream c } } +func (c *consensusRpc) readStream(st *stream.Stream, rpcStream consensusproto.DRPCConsensus_WatchLogStream) { + defer st.Close() + for { + req, err := rpcStream.Recv() + if err != nil { + return + } + st.UnwatchIds(rpcStream.Context(), req.UnwatchIds) + st.WatchIds(rpcStream.Context(), req.WatchIds) + } +} + func logFromProto(log *consensusproto.Log) consensus.Log { return consensus.Log{ Id: log.Id, diff --git a/consensus/stream/object.go b/consensus/stream/object.go index 6f4f3d07..d2714dd9 100644 --- a/consensus/stream/object.go +++ b/consensus/stream/object.go @@ -2,7 +2,6 @@ package stream import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus" - "github.com/cheggaaa/mb/v2" "sync" ) @@ -10,9 +9,7 @@ type object struct { logId []byte records []consensus.Record - streams map[uint32]*stream - - lastStreamId uint32 + streams map[uint64]*Stream mu sync.Mutex } @@ -27,7 +24,7 @@ func (o *object) AddRecords(recs []consensus.Record) { diff := recs[0 : len(recs)-len(o.records)] o.records = recs for _, st := range o.streams { - st.AddRecords(diff) + _ = st.AddRecords(o.logId, diff) } } @@ -37,18 +34,12 @@ func (o *object) Records() []consensus.Record { return o.records } -func (o *object) NewStream() Stream { +func (o *object) AddStream(s *Stream) { o.mu.Lock() defer o.mu.Unlock() - o.lastStreamId++ - st := &stream{ - id: o.lastStreamId, - obj: o, - records: o.records, - mb: mb.New(consensus.Record{}, 100), - } - o.streams[st.id] = st - return st + o.streams[s.id] = s + _ = s.AddRecords(o.logId, o.records) + return } func (o *object) Locked() bool { @@ -57,7 +48,7 @@ func (o *object) Locked() bool { return len(o.streams) > 0 } -func (o *object) removeStream(id uint32) { +func (o *object) RemoveStream(id uint64) { o.mu.Lock() defer o.mu.Unlock() delete(o.streams, id) diff --git a/consensus/stream/service.go b/consensus/stream/service.go index 11c5bc42..854846c5 100644 --- a/consensus/stream/service.go +++ b/consensus/stream/service.go @@ -7,8 +7,10 @@ import ( "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/db" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" + "github.com/cheggaaa/mb/v2" "github.com/mr-tron/base58" "go.uber.org/zap" + "sync/atomic" "time" ) @@ -30,21 +32,15 @@ func New() Service { return &service{} } -type Stream interface { - LogId() []byte - Records() []consensus.Record - WaitRecords() []consensus.Record - Close() -} - type Service interface { - Subscribe(ctx context.Context, logId []byte) (stream Stream, err error) + NewStream() *Stream app.ComponentRunnable } type service struct { - db db.Service - cache ocache.OCache + db db.Service + cache ocache.OCache + lastStreamId uint64 } func (s *service) Init(a *app.App) (err error) { @@ -58,14 +54,6 @@ func (s *service) Init(a *app.App) (err error) { return s.db.SetChangeReceiver(s.receiveChange) } -func (s *service) Subscribe(ctx context.Context, logId []byte) (Stream, error) { - obj, err := s.getObject(ctx, logId) - if err != nil { - return nil, err - } - return obj.NewStream(), nil -} - func (s *service) Name() (name string) { return CName } @@ -74,12 +62,39 @@ func (s *service) Run(ctx context.Context) (err error) { return nil } +func (s *service) NewStream() *Stream { + return &Stream{ + id: atomic.AddUint64(&s.lastStreamId, 1), + logIds: make(map[string]struct{}), + mb: mb.New(consensus.Log{}, 100), + s: s, + } +} + +func (s *service) AddStream(ctx context.Context, logId []byte, stream *Stream) (err error) { + obj, err := s.getObject(ctx, logId) + if err != nil { + return err + } + obj.AddStream(stream) + return +} + +func (s *service) RemoveStream(ctx context.Context, logId []byte, streamId uint64) (err error) { + obj, err := s.getObject(ctx, logId) + if err != nil { + return err + } + obj.RemoveStream(streamId) + return +} + func (s *service) loadLog(ctx context.Context, id string) (value ocache.Object, err error) { if ctxLog := ctx.Value(ctxLogKey); ctxLog != nil { return &object{ logId: ctxLog.(consensus.Log).Id, records: ctxLog.(consensus.Log).Records, - streams: make(map[uint32]*stream), + streams: make(map[uint64]*Stream), }, nil } logId := logIdFromString(id) @@ -90,7 +105,7 @@ func (s *service) loadLog(ctx context.Context, id string) (value ocache.Object, return &object{ logId: dbLog.Id, records: dbLog.Records, - streams: make(map[uint32]*stream), + streams: make(map[uint64]*Stream), }, nil } diff --git a/consensus/stream/service_test.go b/consensus/stream/service_test.go index 8acf865c..80643de4 100644 --- a/consensus/stream/service_test.go +++ b/consensus/stream/service_test.go @@ -13,11 +13,12 @@ import ( var ctx = context.Background() -func TestService_Subscribe(t *testing.T) { +func TestService_NewStream(t *testing.T) { fx := newFixture(t) defer fx.Finish(t) var expLogId = []byte("logId") + var preloadLogId = []byte("preloadId") fx.mockDB.fetchLog = func(ctx context.Context, logId []byte) (log consensus.Log, err error) { require.Equal(t, expLogId, logId) @@ -31,24 +32,7 @@ func TestService_Subscribe(t *testing.T) { }, nil } - st1, err := fx.Subscribe(ctx, expLogId) - require.NoError(t, err) - require.Equal(t, expLogId, st1.LogId()) - sr1 := readStream(st1) - assert.Equal(t, uint32(1), sr1.id) - - st2, err := fx.Subscribe(ctx, expLogId) - require.NoError(t, err) - require.Equal(t, expLogId, st2.LogId()) - sr2 := readStream(st2) - assert.Equal(t, uint32(2), sr2.id) - - fx.mockDB.receiver(expLogId, []consensus.Record{ - { - Id: []byte{'1'}, - }, - }) - fx.mockDB.receiver([]byte("other id"), []consensus.Record{ + fx.mockDB.receiver(preloadLogId, []consensus.Record{ { Id: []byte{'2'}, PrevId: []byte{'1'}, @@ -57,7 +41,38 @@ func TestService_Subscribe(t *testing.T) { Id: []byte{'1'}, }, }) + + st1 := fx.NewStream() + sr1 := readStream(st1) + assert.Equal(t, uint64(1), sr1.id) + st1.WatchIds(ctx, [][]byte{expLogId, preloadLogId}) + st1.UnwatchIds(ctx, [][]byte{preloadLogId}) + assert.Equal(t, [][]byte{expLogId}, st1.LogIds()) + + st2 := fx.NewStream() + sr2 := readStream(st2) + assert.Equal(t, uint64(2), sr2.id) + st2.WatchIds(ctx, [][]byte{expLogId, preloadLogId}) + fx.mockDB.receiver(expLogId, []consensus.Record{ + { + Id: []byte{'1'}, + }, + }) + fx.mockDB.receiver(expLogId, []consensus.Record{ + { + Id: []byte{'2'}, + PrevId: []byte{'1'}, + }, + { + Id: []byte{'1'}, + }, + }) + fx.mockDB.receiver(preloadLogId, []consensus.Record{ + { + Id: []byte{'3'}, + PrevId: []byte{'4'}, + }, { Id: []byte{'2'}, PrevId: []byte{'1'}, @@ -77,9 +92,15 @@ func TestService_Subscribe(t *testing.T) { } } - require.Equal(t, sr1.records, sr2.records) - require.Len(t, sr1.records, 1) - assert.Equal(t, []byte{'2'}, sr1.records[0].Id) + require.Len(t, sr1.logs, 2) + assert.Len(t, sr1.logs[string(expLogId)].Records, 2) + assert.Equal(t, []byte{'2'}, sr1.logs[string(expLogId)].Records[0].Id) + assert.Equal(t, []byte{'2'}, sr1.logs[string(preloadLogId)].Records[0].Id) + + require.Len(t, sr2.logs, 2) + assert.Len(t, sr2.logs[string(expLogId)].Records, 2) + assert.Equal(t, []byte{'2'}, sr2.logs[string(expLogId)].Records[0].Id) + assert.Equal(t, []byte{'3'}, sr2.logs[string(preloadLogId)].Records[0].Id) } func newFixture(t *testing.T) *fixture { @@ -104,10 +125,11 @@ func (fx *fixture) Finish(t *testing.T) { require.NoError(t, fx.a.Close(ctx)) } -func readStream(st Stream) *streamReader { +func readStream(st *Stream) *streamReader { sr := &streamReader{ - id: st.(*stream).id, + id: st.id, stream: st, + logs: map[string]consensus.Log{}, finished: make(chan struct{}), } go sr.read() @@ -115,21 +137,28 @@ func readStream(st Stream) *streamReader { } type streamReader struct { - id uint32 - stream Stream + id uint64 + stream *Stream - records []consensus.Record + logs map[string]consensus.Log finished chan struct{} } func (sr *streamReader) read() { defer close(sr.finished) for { - records := sr.stream.WaitRecords() - if len(records) == 0 { + logs := sr.stream.WaitLogs() + if len(logs) == 0 { return } - sr.records = append(sr.records, records...) + for _, l := range logs { + if el, ok := sr.logs[string(l.Id)]; !ok { + sr.logs[string(l.Id)] = l + } else { + el.Records = append(l.Records, el.Records...) + sr.logs[string(l.Id)] = el + } + } } } diff --git a/consensus/stream/stream.go b/consensus/stream/stream.go index 4f8f4b3e..bf3b1fd6 100644 --- a/consensus/stream/stream.go +++ b/consensus/stream/stream.go @@ -1,34 +1,74 @@ package stream import ( + "context" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus" "github.com/cheggaaa/mb/v2" + "go.uber.org/zap" + "sync" ) -type stream struct { - id uint32 - obj *object - records []consensus.Record - mb *mb.MB[consensus.Record] +type Stream struct { + id uint64 + logIds map[string]struct{} + mu sync.Mutex + mb *mb.MB[consensus.Log] + s *service } -func (s *stream) LogId() []byte { - return s.obj.logId +func (s *Stream) LogIds() [][]byte { + s.mu.Lock() + defer s.mu.Unlock() + logIds := make([][]byte, 0, len(s.logIds)) + for logId := range s.logIds { + logIds = append(logIds, []byte(logId)) + } + return logIds } -func (s *stream) AddRecords(records []consensus.Record) { - _ = s.mb.Add(records...) +func (s *Stream) AddRecords(logId []byte, records []consensus.Record) (err error) { + return s.mb.Add(consensus.Log{Id: logId, Records: records}) } -func (s *stream) Records() []consensus.Record { - return s.records -} - -func (s *stream) WaitRecords() []consensus.Record { +func (s *Stream) WaitLogs() []consensus.Log { return s.mb.Wait() } -func (s *stream) Close() { - _ = s.mb.Close() - s.obj.removeStream(s.id) +func (s *Stream) WatchIds(ctx context.Context, logIds [][]byte) { + s.mu.Lock() + defer s.mu.Unlock() + for _, logId := range logIds { + logIdKey := string(logId) + if _, ok := s.logIds[logIdKey]; !ok { + s.logIds[logIdKey] = struct{}{} + if addErr := s.s.AddStream(ctx, logId, s); addErr != nil { + log.Warn("can't add stream for log", zap.Binary("logId", logId), zap.Error(addErr)) + } + } + } + return +} + +func (s *Stream) UnwatchIds(ctx context.Context, logIds [][]byte) { + s.mu.Lock() + defer s.mu.Unlock() + for _, logId := range logIds { + logIdKey := string(logId) + if _, ok := s.logIds[logIdKey]; ok { + delete(s.logIds, logIdKey) + if remErr := s.s.RemoveStream(ctx, logId, s.id); remErr != nil { + log.Warn("can't remove stream for log", zap.Binary("logId", logId), zap.Error(remErr)) + } + } + } + return +} + +func (s *Stream) Close() { + _ = s.mb.Close() + s.mu.Lock() + defer s.mu.Unlock() + for logId := range s.logIds { + _ = s.s.RemoveStream(context.TODO(), []byte(logId), s.id) + } }