rewrite for single stream

This commit is contained in:
Sergey Cherepanov 2022-10-07 11:04:48 +03:00 committed by Mikhail Iudin
parent 978a320b43
commit 7bb3d5e144
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
12 changed files with 503 additions and 192 deletions

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"bytes"
"context" "context"
"flag" "flag"
"fmt" "fmt"
@ -17,10 +18,12 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/node/account" "github.com/anytypeio/go-anytype-infrastructure-experiments/node/account"
"go.uber.org/zap" "go.uber.org/zap"
"gopkg.in/mgo.v2/bson" "gopkg.in/mgo.v2/bson"
"math/rand"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"sync"
"syscall" "syscall"
"time" "time"
) )
@ -77,6 +80,9 @@ func main() {
log.Info("test success!") log.Info("test success!")
} }
b := &bench{service: a.MustComponent(consensusclient.CName).(consensusclient.Service)}
b.run()
// wait exit signal // wait exit signal
exit := make(chan os.Signal, 1) exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT) signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
@ -206,11 +212,18 @@ func testStream(service consensusclient.Service) (err error) {
} }
log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st))) log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st)))
stream, err := service.WatchLog(ctx, newLogId) stream, err := service.WatchLog(ctx)
if err != nil { if err != nil {
return err return err
} }
defer stream.Close() defer stream.Close()
st = time.Now()
if err = stream.WatchIds([][]byte{newLogId}); err != nil {
return err
}
log.Info("watch", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st)))
sr := readStream(stream) sr := readStream(stream)
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
st = time.Now() st = time.Now()
@ -226,35 +239,129 @@ func testStream(service consensusclient.Service) (err error) {
lastRecId = recId lastRecId = recId
log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st))) log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st)))
} }
fmt.Println(sr.log.Records) sr.validate()
return nil return nil
} }
func readStream(stream consensusproto.DRPCConsensus_WatchLogClient) *streamReader { func readStream(stream consensusclient.Stream) *streamReader {
sr := &streamReader{stream: stream} sr := &streamReader{stream: stream, logs: map[string]*consensusproto.Log{}}
go sr.read() go sr.read()
return sr return sr
} }
type streamReader struct { type streamReader struct {
stream consensusproto.DRPCConsensus_WatchLogClient stream consensusclient.Stream
log *consensusproto.Log logs map[string]*consensusproto.Log
} }
func (sr *streamReader) read() { func (sr *streamReader) read() {
for { for {
event, err := sr.stream.Recv() recs := sr.stream.WaitLogs()
if err != nil { if len(recs) == 0 {
return return
} }
fmt.Println("received event", event) for _, rec := range recs {
if sr.log == nil { if el, ok := sr.logs[string(rec.Id)]; !ok {
sr.log = &consensusproto.Log{ sr.logs[string(rec.Id)] = &consensusproto.Log{
Id: event.LogId, Id: rec.Id,
Records: event.Records, Records: rec.Records,
}
} else {
el.Records = append(rec.Records, el.Records...)
sr.logs[string(rec.Id)] = el
} }
} else {
sr.log.Records = append(event.Records, sr.log.Records...)
} }
} }
} }
func (sr *streamReader) validate() {
var lc, rc int
for _, log := range sr.logs {
lc++
rc += len(log.Records)
validateLog(log)
}
fmt.Println("logs valid; log count:", lc, "records:", rc)
}
func validateLog(log *consensusproto.Log) {
var prevId []byte
for _, rec := range log.Records {
if len(prevId) != 0 {
if !bytes.Equal(prevId, rec.Id) {
panic(fmt.Sprintf("invalid log: %+v", log))
}
}
prevId = rec.PrevId
}
}
type bench struct {
service consensusclient.Service
stream consensusclient.Stream
}
func (b *bench) run() {
var err error
b.stream, err = b.service.WatchLog(context.Background())
if err != nil {
panic(err)
}
defer b.stream.Close()
sr := readStream(b.stream)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
time.Sleep(time.Second / 100)
wg.Add(1)
go func() {
defer wg.Done()
b.client()
}()
fmt.Println("total streams:", i+1)
}
wg.Wait()
sr.validate()
}
func (b *bench) client() {
ctx := context.Background()
// create log
newLogId := []byte(bson.NewObjectId())
lastRecId := []byte(bson.NewObjectId())
err := b.service.AddLog(ctx, &consensusproto.Log{
Id: newLogId,
Records: []*consensusproto.Record{
{
Id: lastRecId,
Payload: []byte("test"),
CreatedUnix: uint64(time.Now().Unix()),
},
},
})
for i := 0; i < 5; i++ {
fmt.Println("watch", bson.ObjectId(newLogId).Hex())
if err = b.stream.WatchIds([][]byte{newLogId}); err != nil {
panic(err)
}
for i := 0; i < rand.Intn(20); i++ {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(10000)))
recId := []byte(bson.NewObjectId())
err = b.service.AddRecord(ctx, newLogId, &consensusproto.Record{
Id: recId,
PrevId: lastRecId,
Payload: []byte("some payload 1 2 3 4 5 6 6 7 oijoj"),
CreatedUnix: uint64(time.Now().Unix()),
})
if err != nil {
panic(err)
}
lastRecId = recId
}
if err = b.stream.UnwatchIds([][]byte{newLogId}); err != nil {
panic(err)
}
fmt.Println("unwatch", bson.ObjectId(newLogId).Hex())
time.Sleep(time.Minute * 1)
}
}

View File

@ -13,10 +13,3 @@ type Record struct {
Payload []byte `bson:"payload"` Payload []byte `bson:"payload"`
Created time.Time `bson:"created"'` Created time.Time `bson:"created"'`
} }
func (l Log) CopyRecords() Log {
l2 := l
l2.Records = make([]Record, len(l.Records))
copy(l2.Records, l.Records)
return l2
}

View File

@ -21,7 +21,7 @@ func New() Service {
type Service interface { type Service interface {
AddLog(ctx context.Context, clog *consensusproto.Log) (err error) AddLog(ctx context.Context, clog *consensusproto.Log) (err error)
AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error) AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error)
WatchLog(ctx context.Context, logId []byte) (stream consensusproto.DRPCConsensus_WatchLogClient, err error) WatchLog(ctx context.Context) (stream Stream, err error)
app.Component app.Component
} }
@ -83,15 +83,14 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr
return return
} }
func (s *service) WatchLog(ctx context.Context, logId []byte) (stream consensusproto.DRPCConsensus_WatchLogClient, err error) { func (s *service) WatchLog(ctx context.Context) (st Stream, err error) {
cl, err := s.dialClient(ctx) cl, err := s.dialClient(ctx)
if err != nil { if err != nil {
return return
} }
if stream, err = cl.WatchLog(ctx, &consensusproto.WatchLogRequest{ rpcStream, err := cl.WatchLog(ctx)
LogId: logId, if err != nil {
}); err != nil {
return nil, rpcerr.Unwrap(err) return nil, rpcerr.Unwrap(err)
} }
return return runStream(rpcStream), nil
} }

View File

@ -0,0 +1,66 @@
package consensusclient
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/cheggaaa/mb/v2"
)
type Stream interface {
WatchIds(logIds [][]byte) (err error)
UnwatchIds(logIds [][]byte) (err error)
WaitLogs() []*consensusproto.Log
Close() error
}
func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream {
st := &stream{
rpcStream: rpcStream,
mb: mb.New((*consensusproto.Log)(nil), 100),
}
go st.readStream()
return st
}
type stream struct {
rpcStream consensusproto.DRPCConsensus_WatchLogClient
mb *mb.MB[*consensusproto.Log]
}
func (s *stream) WatchIds(logIds [][]byte) (err error) {
return s.rpcStream.Send(&consensusproto.WatchLogRequest{
WatchIds: logIds,
})
}
func (s *stream) UnwatchIds(logIds [][]byte) (err error) {
return s.rpcStream.Send(&consensusproto.WatchLogRequest{
UnwatchIds: logIds,
})
}
func (s *stream) WaitLogs() []*consensusproto.Log {
return s.mb.Wait()
}
func (s *stream) readStream() {
defer s.Close()
for {
event, err := s.rpcStream.Recv()
if err != nil {
return
}
if err = s.mb.Add(&consensusproto.Log{
Id: event.LogId,
Records: event.Records,
}); err != nil {
return
}
}
}
func (s *stream) Close() error {
if err := s.mb.Close(); err == nil {
return s.rpcStream.Close()
}
return nil
}

View File

@ -317,7 +317,8 @@ func (m *AddRecordRequest) GetRecord() *Record {
} }
type WatchLogRequest struct { type WatchLogRequest struct {
LogId []byte `protobuf:"bytes,1,opt,name=logId,proto3" json:"logId,omitempty"` WatchIds [][]byte `protobuf:"bytes,1,rep,name=watchIds,proto3" json:"watchIds,omitempty"`
UnwatchIds [][]byte `protobuf:"bytes,2,rep,name=unwatchIds,proto3" json:"unwatchIds,omitempty"`
} }
func (m *WatchLogRequest) Reset() { *m = WatchLogRequest{} } func (m *WatchLogRequest) Reset() { *m = WatchLogRequest{} }
@ -353,9 +354,16 @@ func (m *WatchLogRequest) XXX_DiscardUnknown() {
var xxx_messageInfo_WatchLogRequest proto.InternalMessageInfo var xxx_messageInfo_WatchLogRequest proto.InternalMessageInfo
func (m *WatchLogRequest) GetLogId() []byte { func (m *WatchLogRequest) GetWatchIds() [][]byte {
if m != nil { if m != nil {
return m.LogId return m.WatchIds
}
return nil
}
func (m *WatchLogRequest) GetUnwatchIds() [][]byte {
if m != nil {
return m.UnwatchIds
} }
return nil return nil
} }
@ -428,36 +436,37 @@ func init() {
} }
var fileDescriptor_b8d7f1c16b400059 = []byte{ var fileDescriptor_b8d7f1c16b400059 = []byte{
// 458 bytes of a gzipped FileDescriptorProto // 480 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x8b, 0xd3, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x41, 0x8b, 0xd3, 0x40,
0x14, 0xee, 0x24, 0x35, 0xbb, 0x7d, 0x69, 0xbb, 0xf1, 0xb1, 0x48, 0xd8, 0xc5, 0x50, 0xe2, 0xc1, 0x14, 0xee, 0x24, 0x6b, 0xb7, 0x7d, 0x69, 0xbb, 0xf1, 0xb1, 0x48, 0xe8, 0x62, 0x08, 0xf1, 0x12,
0x22, 0xd2, 0x95, 0x2a, 0x78, 0xf2, 0xb0, 0x96, 0x8a, 0x0b, 0xc5, 0x42, 0xa0, 0x0a, 0x9e, 0x8c, 0x44, 0xba, 0x52, 0x05, 0x4f, 0x1e, 0xd6, 0x52, 0xa1, 0x52, 0x2d, 0x04, 0xaa, 0xe0, 0xc9, 0x98,
0x99, 0x69, 0x0c, 0x1b, 0x32, 0x75, 0x66, 0xba, 0x74, 0xff, 0x85, 0x3f, 0xc4, 0x1f, 0xe2, 0x45, 0x99, 0xc6, 0xb0, 0x21, 0x53, 0x67, 0xd2, 0xb5, 0xfb, 0x2f, 0xfc, 0x21, 0xfe, 0x10, 0x6f, 0xee,
0xd8, 0xa3, 0x47, 0x69, 0xff, 0x88, 0x74, 0xd2, 0xac, 0xc9, 0xb6, 0x3d, 0xec, 0x25, 0xc9, 0x7c, 0xd1, 0xa3, 0xb4, 0x7f, 0x44, 0x3a, 0x69, 0x6a, 0xb2, 0xdd, 0x1e, 0xbc, 0x24, 0x79, 0xdf, 0x37,
0xdf, 0xfb, 0xde, 0xf7, 0xe6, 0x7b, 0x04, 0xce, 0x22, 0x9e, 0x49, 0x96, 0xc9, 0xb9, 0xfc, 0xff, 0xef, 0x7d, 0xdf, 0xbc, 0x8f, 0xc0, 0x79, 0xc8, 0x53, 0xc9, 0x52, 0xb9, 0x90, 0xff, 0xbe, 0xe6,
0x35, 0x13, 0x5c, 0xf1, 0x33, 0xfd, 0x2c, 0xa1, 0x3d, 0x0d, 0x60, 0x33, 0xcc, 0xae, 0x07, 0x05, 0x82, 0x67, 0xfc, 0x5c, 0x3d, 0x4b, 0x68, 0x4f, 0x01, 0xd8, 0x0a, 0xd2, 0xeb, 0x41, 0x81, 0xb9,
0xe6, 0xc7, 0x60, 0x8e, 0x78, 0x8c, 0x6d, 0x30, 0x12, 0xea, 0x92, 0x0e, 0xe9, 0x36, 0x03, 0x23, 0x11, 0xe8, 0x63, 0x1e, 0x61, 0x07, 0xb4, 0x98, 0x5a, 0xc4, 0x21, 0x5e, 0xcb, 0xd7, 0x62, 0x8a,
0xa1, 0xd8, 0x83, 0x03, 0xc1, 0x22, 0x2e, 0xa8, 0x74, 0x8d, 0x8e, 0xd9, 0xb5, 0xfb, 0xc7, 0xbd, 0x3d, 0x38, 0x16, 0x2c, 0xe4, 0x82, 0x4a, 0x4b, 0x73, 0x74, 0xcf, 0xe8, 0x9f, 0xf6, 0xca, 0x6d,
0xb2, 0xac, 0x17, 0x68, 0x32, 0x28, 0x8a, 0xb0, 0x03, 0x76, 0x24, 0x58, 0xa8, 0x18, 0x9d, 0x64, 0x3d, 0x5f, 0x91, 0x7e, 0x71, 0x08, 0x1d, 0x30, 0x42, 0xc1, 0x82, 0x8c, 0xd1, 0x69, 0x1a, 0x2f,
0xc9, 0xc2, 0x35, 0x3b, 0xa4, 0x5b, 0x0f, 0xca, 0x90, 0x9f, 0x82, 0x95, 0x8b, 0xb6, 0xbc, 0x1e, 0x2d, 0xdd, 0x21, 0xde, 0x91, 0x5f, 0x86, 0xdc, 0x04, 0xea, 0x79, 0xd3, 0x9e, 0xd6, 0x03, 0xa8,
0x81, 0x35, 0x13, 0xec, 0xea, 0x82, 0xba, 0x86, 0xc6, 0x36, 0x27, 0x74, 0xe1, 0x60, 0x16, 0x5e, 0xcf, 0x05, 0xbb, 0x1a, 0x51, 0x4b, 0x53, 0xd8, 0xb6, 0x42, 0x0b, 0x8e, 0xe7, 0xc1, 0x75, 0xc2,
0xa7, 0x3c, 0xa4, 0xba, 0x5f, 0x33, 0x28, 0x8e, 0x77, 0xdd, 0xea, 0xdb, 0x6e, 0x75, 0x30, 0xc6, 0x03, 0xaa, 0xe6, 0xb5, 0xfc, 0xa2, 0xbc, 0xad, 0x76, 0xb4, 0xaf, 0x76, 0x04, 0xda, 0xe4, 0xd2,
0x97, 0xfe, 0x2b, 0x68, 0x9d, 0x53, 0x3a, 0xe2, 0x71, 0xc0, 0xbe, 0xcf, 0x99, 0x54, 0xf8, 0x04, 0x7d, 0x0e, 0xed, 0x0b, 0x4a, 0xc7, 0x3c, 0xf2, 0xd9, 0xd7, 0x05, 0x93, 0x19, 0x3e, 0x02, 0x3d,
0xcc, 0x94, 0xc7, 0xda, 0xdb, 0xee, 0x3f, 0xac, 0x5e, 0x69, 0x5d, 0xb6, 0x66, 0xfd, 0x8f, 0xe0, 0xe1, 0x91, 0xd2, 0x36, 0xfa, 0xf7, 0xab, 0x57, 0xda, 0x1c, 0xdb, 0xb0, 0xee, 0x7b, 0x30, 0x2f,
0x9c, 0x53, 0xba, 0xb9, 0xe1, 0x46, 0x78, 0x0c, 0x0f, 0x52, 0x1e, 0x5f, 0x14, 0x63, 0xe7, 0x07, 0x28, 0xdd, 0xde, 0x70, 0xdb, 0x78, 0x0a, 0xf7, 0x12, 0x1e, 0x8d, 0x0a, 0xdb, 0x79, 0x81, 0x4f,
0x7c, 0x0e, 0x56, 0x1e, 0x80, 0x9e, 0x7c, 0x5f, 0x48, 0x9b, 0x1a, 0xff, 0x29, 0x1c, 0x7d, 0x0a, 0xa0, 0x9e, 0x2f, 0x40, 0x39, 0x3f, 0xb4, 0xa4, 0xed, 0x19, 0xf7, 0x2d, 0x9c, 0x7c, 0x08, 0xb2,
0x55, 0xf4, 0xad, 0x34, 0xcf, 0xce, 0xb6, 0xfe, 0x04, 0x5a, 0x45, 0xe1, 0xf0, 0x8a, 0x65, 0xfb, 0xf0, 0x4b, 0xc9, 0x4f, 0x17, 0x1a, 0xdf, 0x36, 0xd0, 0x88, 0x4a, 0x8b, 0x38, 0xba, 0xd7, 0xf2,
0xdc, 0xef, 0xb9, 0xa3, 0x67, 0x5f, 0xe0, 0x70, 0x28, 0xc4, 0x80, 0x53, 0x26, 0xb1, 0x0d, 0x30, 0x77, 0x35, 0xda, 0x00, 0x8b, 0x74, 0xc7, 0x6a, 0x8a, 0x2d, 0x21, 0xee, 0x14, 0xda, 0xc5, 0xb8,
0xc9, 0xd8, 0x62, 0xc6, 0x22, 0xc5, 0xa8, 0x53, 0xc3, 0x16, 0x34, 0xd6, 0x6e, 0x8b, 0x44, 0x2a, 0xe1, 0x15, 0x4b, 0x0f, 0x79, 0xfc, 0xcf, 0x24, 0x1f, 0x7f, 0x82, 0xc6, 0x50, 0x88, 0x01, 0xa7,
0xe9, 0x10, 0x3c, 0x02, 0x7b, 0xc4, 0xe3, 0x0f, 0x5c, 0xbd, 0xe3, 0xf3, 0x8c, 0x3a, 0x06, 0x22, 0x4c, 0x62, 0x07, 0x60, 0x9a, 0xb2, 0xe5, 0x9c, 0x85, 0x19, 0xa3, 0x66, 0x0d, 0xdb, 0xd0, 0xdc,
0xb4, 0xf3, 0x76, 0x03, 0x9e, 0x4d, 0xd3, 0x24, 0x52, 0x8e, 0x89, 0x0e, 0xd8, 0x43, 0x21, 0xb8, 0xa8, 0x2d, 0x63, 0x99, 0x49, 0x93, 0xe0, 0x09, 0x18, 0x63, 0x1e, 0xbd, 0xe3, 0xd9, 0x6b, 0xbe,
0x18, 0x4f, 0xa7, 0x92, 0x29, 0xe7, 0xa7, 0xd1, 0xff, 0x4d, 0xa0, 0x71, 0xeb, 0x8f, 0xaf, 0xc1, 0x48, 0xa9, 0xa9, 0x21, 0x42, 0x27, 0x1f, 0x37, 0xe0, 0xe9, 0x2c, 0x89, 0xc3, 0xcc, 0xd4, 0xd1,
0xca, 0xd3, 0xc7, 0xd3, 0xea, 0x60, 0x95, 0x9d, 0x9c, 0x38, 0x55, 0x72, 0x7c, 0x89, 0x6f, 0xa0, 0x04, 0x63, 0x28, 0x04, 0x17, 0x93, 0xd9, 0x4c, 0xb2, 0xcc, 0xfc, 0xa1, 0xf5, 0x7f, 0x11, 0x68,
0x71, 0xbb, 0x00, 0xf4, 0xb6, 0xb4, 0x95, 0xcd, 0xec, 0x90, 0xbf, 0x87, 0xc3, 0x22, 0x3e, 0x7c, 0xee, 0xf4, 0xf1, 0x05, 0xd4, 0xf3, 0x8c, 0xf0, 0xac, 0x6a, 0xac, 0x92, 0x5c, 0xd7, 0xac, 0x92,
0x5c, 0x65, 0xef, 0xe4, 0x7f, 0x72, 0xba, 0x9b, 0xd6, 0xa9, 0xbf, 0x20, 0x6f, 0xfb, 0xbf, 0x96, 0x93, 0x4b, 0x7c, 0x09, 0xcd, 0x5d, 0x4c, 0x68, 0xef, 0xf5, 0x56, 0xf2, 0xbb, 0xa3, 0xfd, 0x0d,
0x1e, 0xb9, 0x59, 0x7a, 0xe4, 0xef, 0xd2, 0x23, 0x3f, 0x56, 0x5e, 0xed, 0x66, 0xe5, 0xd5, 0xfe, 0x34, 0x8a, 0xf5, 0xe1, 0xc3, 0x2a, 0x7b, 0x2b, 0xa5, 0xee, 0xd9, 0xdd, 0xb4, 0xda, 0xba, 0x47,
0xac, 0xbc, 0xda, 0x67, 0x77, 0xdf, 0x5f, 0xf7, 0xd5, 0xd2, 0xaf, 0x97, 0xff, 0x02, 0x00, 0x00, 0x9e, 0x92, 0x57, 0xfd, 0x9f, 0x2b, 0x9b, 0xdc, 0xac, 0x6c, 0xf2, 0x67, 0x65, 0x93, 0xef, 0x6b,
0xff, 0xff, 0x2f, 0xe9, 0x67, 0xad, 0x98, 0x03, 0x00, 0x00, 0xbb, 0x76, 0xb3, 0xb6, 0x6b, 0xbf, 0xd7, 0x76, 0xed, 0xa3, 0x75, 0xe8, 0xef, 0xfc, 0x5c, 0x57,
0xaf, 0x67, 0x7f, 0x03, 0x00, 0x00, 0xff, 0xff, 0xf0, 0xc9, 0xa1, 0x1f, 0xc0, 0x03, 0x00, 0x00,
} }
func (m *Log) Marshal() (dAtA []byte, err error) { func (m *Log) Marshal() (dAtA []byte, err error) {
@ -678,12 +687,23 @@ func (m *WatchLogRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.LogId) > 0 { if len(m.UnwatchIds) > 0 {
i -= len(m.LogId) for iNdEx := len(m.UnwatchIds) - 1; iNdEx >= 0; iNdEx-- {
copy(dAtA[i:], m.LogId) i -= len(m.UnwatchIds[iNdEx])
i = encodeVarintConsensus(dAtA, i, uint64(len(m.LogId))) copy(dAtA[i:], m.UnwatchIds[iNdEx])
i-- i = encodeVarintConsensus(dAtA, i, uint64(len(m.UnwatchIds[iNdEx])))
dAtA[i] = 0xa i--
dAtA[i] = 0x12
}
}
if len(m.WatchIds) > 0 {
for iNdEx := len(m.WatchIds) - 1; iNdEx >= 0; iNdEx-- {
i -= len(m.WatchIds[iNdEx])
copy(dAtA[i:], m.WatchIds[iNdEx])
i = encodeVarintConsensus(dAtA, i, uint64(len(m.WatchIds[iNdEx])))
i--
dAtA[i] = 0xa
}
} }
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
@ -834,9 +854,17 @@ func (m *WatchLogRequest) Size() (n int) {
} }
var l int var l int
_ = l _ = l
l = len(m.LogId) if len(m.WatchIds) > 0 {
if l > 0 { for _, b := range m.WatchIds {
n += 1 + l + sovConsensus(uint64(l)) l = len(b)
n += 1 + l + sovConsensus(uint64(l))
}
}
if len(m.UnwatchIds) > 0 {
for _, b := range m.UnwatchIds {
l = len(b)
n += 1 + l + sovConsensus(uint64(l))
}
} }
return n return n
} }
@ -1461,7 +1489,7 @@ func (m *WatchLogRequest) Unmarshal(dAtA []byte) error {
switch fieldNum { switch fieldNum {
case 1: case 1:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LogId", wireType) return fmt.Errorf("proto: wrong wireType = %d for field WatchIds", wireType)
} }
var byteLen int var byteLen int
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -1488,10 +1516,40 @@ func (m *WatchLogRequest) Unmarshal(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.LogId = append(m.LogId[:0], dAtA[iNdEx:postIndex]...) m.WatchIds = append(m.WatchIds, make([]byte, postIndex-iNdEx))
if m.LogId == nil { copy(m.WatchIds[len(m.WatchIds)-1], dAtA[iNdEx:postIndex])
m.LogId = []byte{} iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field UnwatchIds", wireType)
} }
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConsensus
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthConsensus
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthConsensus
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.UnwatchIds = append(m.UnwatchIds, make([]byte, postIndex-iNdEx))
copy(m.UnwatchIds[len(m.UnwatchIds)-1], dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex

View File

@ -42,7 +42,7 @@ type DRPCConsensusClient interface {
AddLog(ctx context.Context, in *AddLogRequest) (*Ok, error) AddLog(ctx context.Context, in *AddLogRequest) (*Ok, error)
AddRecord(ctx context.Context, in *AddRecordRequest) (*Ok, error) AddRecord(ctx context.Context, in *AddRecordRequest) (*Ok, error)
WatchLog(ctx context.Context, in *WatchLogRequest) (DRPCConsensus_WatchLogClient, error) WatchLog(ctx context.Context) (DRPCConsensus_WatchLogClient, error)
} }
type drpcConsensusClient struct { type drpcConsensusClient struct {
@ -73,23 +73,18 @@ func (c *drpcConsensusClient) AddRecord(ctx context.Context, in *AddRecordReques
return out, nil return out, nil
} }
func (c *drpcConsensusClient) WatchLog(ctx context.Context, in *WatchLogRequest) (DRPCConsensus_WatchLogClient, error) { func (c *drpcConsensusClient) WatchLog(ctx context.Context) (DRPCConsensus_WatchLogClient, error) {
stream, err := c.cc.NewStream(ctx, "/anyConsensus.Consensus/WatchLog", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}) stream, err := c.cc.NewStream(ctx, "/anyConsensus.Consensus/WatchLog", drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
if err != nil { if err != nil {
return nil, err return nil, err
} }
x := &drpcConsensus_WatchLogClient{stream} x := &drpcConsensus_WatchLogClient{stream}
if err := x.MsgSend(in, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
return nil, err
}
if err := x.CloseSend(); err != nil {
return nil, err
}
return x, nil return x, nil
} }
type DRPCConsensus_WatchLogClient interface { type DRPCConsensus_WatchLogClient interface {
drpc.Stream drpc.Stream
Send(*WatchLogRequest) error
Recv() (*WatchLogEvent, error) Recv() (*WatchLogEvent, error)
} }
@ -97,6 +92,10 @@ type drpcConsensus_WatchLogClient struct {
drpc.Stream drpc.Stream
} }
func (x *drpcConsensus_WatchLogClient) Send(m *WatchLogRequest) error {
return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
}
func (x *drpcConsensus_WatchLogClient) Recv() (*WatchLogEvent, error) { func (x *drpcConsensus_WatchLogClient) Recv() (*WatchLogEvent, error) {
m := new(WatchLogEvent) m := new(WatchLogEvent)
if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil { if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
@ -112,7 +111,7 @@ func (x *drpcConsensus_WatchLogClient) RecvMsg(m *WatchLogEvent) error {
type DRPCConsensusServer interface { type DRPCConsensusServer interface {
AddLog(context.Context, *AddLogRequest) (*Ok, error) AddLog(context.Context, *AddLogRequest) (*Ok, error)
AddRecord(context.Context, *AddRecordRequest) (*Ok, error) AddRecord(context.Context, *AddRecordRequest) (*Ok, error)
WatchLog(*WatchLogRequest, DRPCConsensus_WatchLogStream) error WatchLog(DRPCConsensus_WatchLogStream) error
} }
type DRPCConsensusUnimplementedServer struct{} type DRPCConsensusUnimplementedServer struct{}
@ -125,7 +124,7 @@ func (s *DRPCConsensusUnimplementedServer) AddRecord(context.Context, *AddRecord
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
} }
func (s *DRPCConsensusUnimplementedServer) WatchLog(*WatchLogRequest, DRPCConsensus_WatchLogStream) error { func (s *DRPCConsensusUnimplementedServer) WatchLog(DRPCConsensus_WatchLogStream) error {
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented) return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
} }
@ -158,8 +157,7 @@ func (DRPCConsensusDescription) Method(n int) (string, drpc.Encoding, drpc.Recei
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) { func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return nil, srv.(DRPCConsensusServer). return nil, srv.(DRPCConsensusServer).
WatchLog( WatchLog(
in1.(*WatchLogRequest), &drpcConsensus_WatchLogStream{in1.(drpc.Stream)},
&drpcConsensus_WatchLogStream{in2.(drpc.Stream)},
) )
}, DRPCConsensusServer.WatchLog, true }, DRPCConsensusServer.WatchLog, true
default: default:
@ -206,6 +204,7 @@ func (x *drpcConsensus_AddRecordStream) SendAndClose(m *Ok) error {
type DRPCConsensus_WatchLogStream interface { type DRPCConsensus_WatchLogStream interface {
drpc.Stream drpc.Stream
Send(*WatchLogEvent) error Send(*WatchLogEvent) error
Recv() (*WatchLogRequest, error)
} }
type drpcConsensus_WatchLogStream struct { type drpcConsensus_WatchLogStream struct {
@ -215,3 +214,15 @@ type drpcConsensus_WatchLogStream struct {
func (x *drpcConsensus_WatchLogStream) Send(m *WatchLogEvent) error { func (x *drpcConsensus_WatchLogStream) Send(m *WatchLogEvent) error {
return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}) return x.MsgSend(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
} }
func (x *drpcConsensus_WatchLogStream) Recv() (*WatchLogRequest, error) {
m := new(WatchLogRequest)
if err := x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{}); err != nil {
return nil, err
}
return m, nil
}
func (x *drpcConsensus_WatchLogStream) RecvMsg(m *WatchLogRequest) error {
return x.MsgRecv(m, drpcEncoding_File_consensus_consensusproto_protos_consensus_proto{})
}

View File

@ -31,7 +31,7 @@ service Consensus {
// AddRecord adds new record to log // AddRecord adds new record to log
rpc AddRecord(AddRecordRequest) returns (Ok); rpc AddRecord(AddRecordRequest) returns (Ok);
// WatchLog fetches log and subscribes for a changes // WatchLog fetches log and subscribes for a changes
rpc WatchLog(WatchLogRequest) returns (stream WatchLogEvent); rpc WatchLog(stream WatchLogRequest) returns (stream WatchLogEvent);
} }
message Ok {} message Ok {}
@ -46,7 +46,8 @@ message AddRecordRequest {
} }
message WatchLogRequest { message WatchLogRequest {
bytes logId = 1; repeated bytes watchIds = 1;
repeated bytes unwatchIds = 2;
} }
message WatchLogEvent { message WatchLogEvent {

View File

@ -46,30 +46,19 @@ func (c *consensusRpc) AddRecord(ctx context.Context, req *consensusproto.AddRec
return &consensusproto.Ok{}, nil return &consensusproto.Ok{}, nil
} }
func (c *consensusRpc) WatchLog(req *consensusproto.WatchLogRequest, rpcStream consensusproto.DRPCConsensus_WatchLogStream) error { func (c *consensusRpc) WatchLog(rpcStream consensusproto.DRPCConsensus_WatchLogStream) error {
stream, err := c.stream.Subscribe(rpcStream.Context(), req.LogId) stream := c.stream.NewStream()
if err != nil {
return err
}
defer stream.Close() defer stream.Close()
var logSent bool go c.readStream(stream, rpcStream)
for { for {
if !logSent { recs := stream.WaitLogs()
if err = rpcStream.Send(&consensusproto.WatchLogEvent{ if len(recs) == 0 {
LogId: req.LogId, return rpcStream.Close()
Records: recordsToProto(stream.Records()), }
}); err != nil { for _, rec := range recs {
return err if err := rpcStream.Send(&consensusproto.WatchLogEvent{
} LogId: rec.Id,
logSent = true Records: recordsToProto(rec.Records),
} else {
recs := stream.WaitRecords()
if len(recs) == 0 {
return rpcStream.Close()
}
if err = rpcStream.Send(&consensusproto.WatchLogEvent{
LogId: req.LogId,
Records: recordsToProto(recs),
}); err != nil { }); err != nil {
return err return err
} }
@ -77,6 +66,18 @@ func (c *consensusRpc) WatchLog(req *consensusproto.WatchLogRequest, rpcStream c
} }
} }
func (c *consensusRpc) readStream(st *stream.Stream, rpcStream consensusproto.DRPCConsensus_WatchLogStream) {
defer st.Close()
for {
req, err := rpcStream.Recv()
if err != nil {
return
}
st.UnwatchIds(rpcStream.Context(), req.UnwatchIds)
st.WatchIds(rpcStream.Context(), req.WatchIds)
}
}
func logFromProto(log *consensusproto.Log) consensus.Log { func logFromProto(log *consensusproto.Log) consensus.Log {
return consensus.Log{ return consensus.Log{
Id: log.Id, Id: log.Id,

View File

@ -2,7 +2,6 @@ package stream
import ( import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/cheggaaa/mb/v2"
"sync" "sync"
) )
@ -10,9 +9,7 @@ type object struct {
logId []byte logId []byte
records []consensus.Record records []consensus.Record
streams map[uint32]*stream streams map[uint64]*Stream
lastStreamId uint32
mu sync.Mutex mu sync.Mutex
} }
@ -27,7 +24,7 @@ func (o *object) AddRecords(recs []consensus.Record) {
diff := recs[0 : len(recs)-len(o.records)] diff := recs[0 : len(recs)-len(o.records)]
o.records = recs o.records = recs
for _, st := range o.streams { for _, st := range o.streams {
st.AddRecords(diff) _ = st.AddRecords(o.logId, diff)
} }
} }
@ -37,18 +34,12 @@ func (o *object) Records() []consensus.Record {
return o.records return o.records
} }
func (o *object) NewStream() Stream { func (o *object) AddStream(s *Stream) {
o.mu.Lock() o.mu.Lock()
defer o.mu.Unlock() defer o.mu.Unlock()
o.lastStreamId++ o.streams[s.id] = s
st := &stream{ _ = s.AddRecords(o.logId, o.records)
id: o.lastStreamId, return
obj: o,
records: o.records,
mb: mb.New(consensus.Record{}, 100),
}
o.streams[st.id] = st
return st
} }
func (o *object) Locked() bool { func (o *object) Locked() bool {
@ -57,7 +48,7 @@ func (o *object) Locked() bool {
return len(o.streams) > 0 return len(o.streams) > 0
} }
func (o *object) removeStream(id uint32) { func (o *object) RemoveStream(id uint64) {
o.mu.Lock() o.mu.Lock()
defer o.mu.Unlock() defer o.mu.Unlock()
delete(o.streams, id) delete(o.streams, id)

View File

@ -7,8 +7,10 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/db" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/db"
"github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache" "github.com/anytypeio/go-anytype-infrastructure-experiments/pkg/ocache"
"github.com/cheggaaa/mb/v2"
"github.com/mr-tron/base58" "github.com/mr-tron/base58"
"go.uber.org/zap" "go.uber.org/zap"
"sync/atomic"
"time" "time"
) )
@ -30,21 +32,15 @@ func New() Service {
return &service{} return &service{}
} }
type Stream interface {
LogId() []byte
Records() []consensus.Record
WaitRecords() []consensus.Record
Close()
}
type Service interface { type Service interface {
Subscribe(ctx context.Context, logId []byte) (stream Stream, err error) NewStream() *Stream
app.ComponentRunnable app.ComponentRunnable
} }
type service struct { type service struct {
db db.Service db db.Service
cache ocache.OCache cache ocache.OCache
lastStreamId uint64
} }
func (s *service) Init(a *app.App) (err error) { func (s *service) Init(a *app.App) (err error) {
@ -58,14 +54,6 @@ func (s *service) Init(a *app.App) (err error) {
return s.db.SetChangeReceiver(s.receiveChange) return s.db.SetChangeReceiver(s.receiveChange)
} }
func (s *service) Subscribe(ctx context.Context, logId []byte) (Stream, error) {
obj, err := s.getObject(ctx, logId)
if err != nil {
return nil, err
}
return obj.NewStream(), nil
}
func (s *service) Name() (name string) { func (s *service) Name() (name string) {
return CName return CName
} }
@ -74,12 +62,39 @@ func (s *service) Run(ctx context.Context) (err error) {
return nil return nil
} }
func (s *service) NewStream() *Stream {
return &Stream{
id: atomic.AddUint64(&s.lastStreamId, 1),
logIds: make(map[string]struct{}),
mb: mb.New(consensus.Log{}, 100),
s: s,
}
}
func (s *service) AddStream(ctx context.Context, logId []byte, stream *Stream) (err error) {
obj, err := s.getObject(ctx, logId)
if err != nil {
return err
}
obj.AddStream(stream)
return
}
func (s *service) RemoveStream(ctx context.Context, logId []byte, streamId uint64) (err error) {
obj, err := s.getObject(ctx, logId)
if err != nil {
return err
}
obj.RemoveStream(streamId)
return
}
func (s *service) loadLog(ctx context.Context, id string) (value ocache.Object, err error) { func (s *service) loadLog(ctx context.Context, id string) (value ocache.Object, err error) {
if ctxLog := ctx.Value(ctxLogKey); ctxLog != nil { if ctxLog := ctx.Value(ctxLogKey); ctxLog != nil {
return &object{ return &object{
logId: ctxLog.(consensus.Log).Id, logId: ctxLog.(consensus.Log).Id,
records: ctxLog.(consensus.Log).Records, records: ctxLog.(consensus.Log).Records,
streams: make(map[uint32]*stream), streams: make(map[uint64]*Stream),
}, nil }, nil
} }
logId := logIdFromString(id) logId := logIdFromString(id)
@ -90,7 +105,7 @@ func (s *service) loadLog(ctx context.Context, id string) (value ocache.Object,
return &object{ return &object{
logId: dbLog.Id, logId: dbLog.Id,
records: dbLog.Records, records: dbLog.Records,
streams: make(map[uint32]*stream), streams: make(map[uint64]*Stream),
}, nil }, nil
} }

View File

@ -13,11 +13,12 @@ import (
var ctx = context.Background() var ctx = context.Background()
func TestService_Subscribe(t *testing.T) { func TestService_NewStream(t *testing.T) {
fx := newFixture(t) fx := newFixture(t)
defer fx.Finish(t) defer fx.Finish(t)
var expLogId = []byte("logId") var expLogId = []byte("logId")
var preloadLogId = []byte("preloadId")
fx.mockDB.fetchLog = func(ctx context.Context, logId []byte) (log consensus.Log, err error) { fx.mockDB.fetchLog = func(ctx context.Context, logId []byte) (log consensus.Log, err error) {
require.Equal(t, expLogId, logId) require.Equal(t, expLogId, logId)
@ -31,24 +32,7 @@ func TestService_Subscribe(t *testing.T) {
}, nil }, nil
} }
st1, err := fx.Subscribe(ctx, expLogId) fx.mockDB.receiver(preloadLogId, []consensus.Record{
require.NoError(t, err)
require.Equal(t, expLogId, st1.LogId())
sr1 := readStream(st1)
assert.Equal(t, uint32(1), sr1.id)
st2, err := fx.Subscribe(ctx, expLogId)
require.NoError(t, err)
require.Equal(t, expLogId, st2.LogId())
sr2 := readStream(st2)
assert.Equal(t, uint32(2), sr2.id)
fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver([]byte("other id"), []consensus.Record{
{ {
Id: []byte{'2'}, Id: []byte{'2'},
PrevId: []byte{'1'}, PrevId: []byte{'1'},
@ -57,7 +41,38 @@ func TestService_Subscribe(t *testing.T) {
Id: []byte{'1'}, Id: []byte{'1'},
}, },
}) })
st1 := fx.NewStream()
sr1 := readStream(st1)
assert.Equal(t, uint64(1), sr1.id)
st1.WatchIds(ctx, [][]byte{expLogId, preloadLogId})
st1.UnwatchIds(ctx, [][]byte{preloadLogId})
assert.Equal(t, [][]byte{expLogId}, st1.LogIds())
st2 := fx.NewStream()
sr2 := readStream(st2)
assert.Equal(t, uint64(2), sr2.id)
st2.WatchIds(ctx, [][]byte{expLogId, preloadLogId})
fx.mockDB.receiver(expLogId, []consensus.Record{ fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver(preloadLogId, []consensus.Record{
{
Id: []byte{'3'},
PrevId: []byte{'4'},
},
{ {
Id: []byte{'2'}, Id: []byte{'2'},
PrevId: []byte{'1'}, PrevId: []byte{'1'},
@ -77,9 +92,15 @@ func TestService_Subscribe(t *testing.T) {
} }
} }
require.Equal(t, sr1.records, sr2.records) require.Len(t, sr1.logs, 2)
require.Len(t, sr1.records, 1) assert.Len(t, sr1.logs[string(expLogId)].Records, 2)
assert.Equal(t, []byte{'2'}, sr1.records[0].Id) assert.Equal(t, []byte{'2'}, sr1.logs[string(expLogId)].Records[0].Id)
assert.Equal(t, []byte{'2'}, sr1.logs[string(preloadLogId)].Records[0].Id)
require.Len(t, sr2.logs, 2)
assert.Len(t, sr2.logs[string(expLogId)].Records, 2)
assert.Equal(t, []byte{'2'}, sr2.logs[string(expLogId)].Records[0].Id)
assert.Equal(t, []byte{'3'}, sr2.logs[string(preloadLogId)].Records[0].Id)
} }
func newFixture(t *testing.T) *fixture { func newFixture(t *testing.T) *fixture {
@ -104,10 +125,11 @@ func (fx *fixture) Finish(t *testing.T) {
require.NoError(t, fx.a.Close(ctx)) require.NoError(t, fx.a.Close(ctx))
} }
func readStream(st Stream) *streamReader { func readStream(st *Stream) *streamReader {
sr := &streamReader{ sr := &streamReader{
id: st.(*stream).id, id: st.id,
stream: st, stream: st,
logs: map[string]consensus.Log{},
finished: make(chan struct{}), finished: make(chan struct{}),
} }
go sr.read() go sr.read()
@ -115,21 +137,28 @@ func readStream(st Stream) *streamReader {
} }
type streamReader struct { type streamReader struct {
id uint32 id uint64
stream Stream stream *Stream
records []consensus.Record logs map[string]consensus.Log
finished chan struct{} finished chan struct{}
} }
func (sr *streamReader) read() { func (sr *streamReader) read() {
defer close(sr.finished) defer close(sr.finished)
for { for {
records := sr.stream.WaitRecords() logs := sr.stream.WaitLogs()
if len(records) == 0 { if len(logs) == 0 {
return return
} }
sr.records = append(sr.records, records...) for _, l := range logs {
if el, ok := sr.logs[string(l.Id)]; !ok {
sr.logs[string(l.Id)] = l
} else {
el.Records = append(l.Records, el.Records...)
sr.logs[string(l.Id)] = el
}
}
} }
} }

View File

@ -1,34 +1,74 @@
package stream package stream
import ( import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus" "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/cheggaaa/mb/v2" "github.com/cheggaaa/mb/v2"
"go.uber.org/zap"
"sync"
) )
type stream struct { type Stream struct {
id uint32 id uint64
obj *object logIds map[string]struct{}
records []consensus.Record mu sync.Mutex
mb *mb.MB[consensus.Record] mb *mb.MB[consensus.Log]
s *service
} }
func (s *stream) LogId() []byte { func (s *Stream) LogIds() [][]byte {
return s.obj.logId s.mu.Lock()
defer s.mu.Unlock()
logIds := make([][]byte, 0, len(s.logIds))
for logId := range s.logIds {
logIds = append(logIds, []byte(logId))
}
return logIds
} }
func (s *stream) AddRecords(records []consensus.Record) { func (s *Stream) AddRecords(logId []byte, records []consensus.Record) (err error) {
_ = s.mb.Add(records...) return s.mb.Add(consensus.Log{Id: logId, Records: records})
} }
func (s *stream) Records() []consensus.Record { func (s *Stream) WaitLogs() []consensus.Log {
return s.records
}
func (s *stream) WaitRecords() []consensus.Record {
return s.mb.Wait() return s.mb.Wait()
} }
func (s *stream) Close() { func (s *Stream) WatchIds(ctx context.Context, logIds [][]byte) {
_ = s.mb.Close() s.mu.Lock()
s.obj.removeStream(s.id) defer s.mu.Unlock()
for _, logId := range logIds {
logIdKey := string(logId)
if _, ok := s.logIds[logIdKey]; !ok {
s.logIds[logIdKey] = struct{}{}
if addErr := s.s.AddStream(ctx, logId, s); addErr != nil {
log.Warn("can't add stream for log", zap.Binary("logId", logId), zap.Error(addErr))
}
}
}
return
}
func (s *Stream) UnwatchIds(ctx context.Context, logIds [][]byte) {
s.mu.Lock()
defer s.mu.Unlock()
for _, logId := range logIds {
logIdKey := string(logId)
if _, ok := s.logIds[logIdKey]; ok {
delete(s.logIds, logIdKey)
if remErr := s.s.RemoveStream(ctx, logId, s.id); remErr != nil {
log.Warn("can't remove stream for log", zap.Binary("logId", logId), zap.Error(remErr))
}
}
}
return
}
func (s *Stream) Close() {
_ = s.mb.Close()
s.mu.Lock()
defer s.mu.Unlock()
for logId := range s.logIds {
_ = s.s.RemoveStream(context.TODO(), []byte(logId), s.id)
}
} }