Merge branch 'main' into add-client-functionality

# Conflicts:
#	common/commonspace/syncservice/streampool.go
#	consensus/stream/service.go
This commit is contained in:
mcrakhman 2022-10-24 14:40:44 +02:00
commit 3546fabdfa
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
31 changed files with 1041 additions and 558 deletions

View File

@ -6,7 +6,7 @@ import (
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
"storj.io/drpc/drpcctx"
"sync"
"sync/atomic"

View File

@ -3,6 +3,7 @@ package synctree
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/diffservice"
spacestorage "github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/storage"
@ -15,6 +16,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/tree"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/acl/treechangeproto"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
)
var ErrSyncTreeClosed = errors.New("sync tree is closed")
@ -146,6 +148,10 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
if err != nil {
return
}
if resp.GetContent().GetFullSyncResponse() == nil {
err = fmt.Errorf("expected to get full sync response, but got something else")
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := storage.TreeStorageCreatePayload{
@ -156,6 +162,7 @@ func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t
}
// basically building tree with in-memory storage and validating that it was without errors
log.With(zap.String("id", id)).Debug("validating tree")
err = tree.ValidateRawTree(payload, deps.AclList)
if err != nil {
return

View File

@ -12,7 +12,6 @@ require (
github.com/huandu/skiplist v1.2.0
github.com/ipfs/go-cid v0.3.2
github.com/libp2p/go-libp2p v0.23.2
github.com/libp2p/go-libp2p-core v0.20.1
github.com/minio/sha256-simd v1.0.0
github.com/multiformats/go-multibase v0.1.1
github.com/multiformats/go-multihash v0.2.1

View File

@ -189,8 +189,6 @@ github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6
github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg=
github.com/libp2p/go-libp2p v0.23.2 h1:yqyTeKQJyofWXxEv/eEVUvOrGdt/9x+0PIQ4N1kaxmE=
github.com/libp2p/go-libp2p v0.23.2/go.mod h1:s9DEa5NLR4g+LZS+md5uGU4emjMWFiqkZr6hBTY8UxI=
github.com/libp2p/go-libp2p-core v0.20.1 h1:fQz4BJyIFmSZAiTbKV8qoYhEH5Dtv/cVhZbG3Ib/+Cw=
github.com/libp2p/go-libp2p-core v0.20.1/go.mod h1:6zR8H7CvQWgYLsbG4on6oLNSGcyKaYFSEYyDt51+bIY=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/libp2p/go-openssl v0.1.0/go.mod h1:OiOxwPpL3n4xlenjx2h7AwSGaFSC/KZvf6gNdOBQMtc=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=

View File

@ -8,7 +8,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
"go.uber.org/zap"
"net"
"storj.io/drpc"

View File

@ -2,7 +2,7 @@ package peer
import (
"context"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
"storj.io/drpc"
"sync/atomic"
"time"

View File

@ -0,0 +1,98 @@
package rpctest
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"math/rand"
"storj.io/drpc"
"sync"
"time"
)
var ErrCantConnect = errors.New("can't connect to test server")
func NewTestPool() *TestPool {
return &TestPool{}
}
type TestPool struct {
ts *TesServer
mu sync.Mutex
}
func (t *TestPool) WithServer(ts *TesServer) *TestPool {
t.mu.Lock()
defer t.mu.Unlock()
t.ts = ts
return t
}
func (t *TestPool) Get(ctx context.Context, id string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: id, Conn: t.ts.Dial()}, nil
}
func (t *TestPool) Dial(ctx context.Context, id string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: id, Conn: t.ts.Dial()}, nil
}
func (t *TestPool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial()}, nil
}
func (t *TestPool) DialOneOf(ctx context.Context, peerIds []string) (peer.Peer, error) {
t.mu.Lock()
defer t.mu.Unlock()
if t.ts == nil {
return nil, ErrCantConnect
}
return &testPeer{id: peerIds[rand.Intn(len(peerIds))], Conn: t.ts.Dial()}, nil
}
func (t *TestPool) Init(a *app.App) (err error) {
return nil
}
func (t *TestPool) Name() (name string) {
return pool.CName
}
func (t *TestPool) Run(ctx context.Context) (err error) {
return nil
}
func (t *TestPool) Close(ctx context.Context) (err error) {
return nil
}
type testPeer struct {
id string
drpc.Conn
}
func (t testPeer) Id() string {
return t.id
}
func (t testPeer) LastUsage() time.Time {
return time.Now()
}
func (t testPeer) UpdateLastUsage() {}

View File

@ -0,0 +1,29 @@
package rpctest
import (
"context"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
)
func NewTestServer() *TesServer {
ts := &TesServer{
Mux: drpcmux.New(),
}
ts.Server = drpcserver.New(ts.Mux)
return ts
}
type TesServer struct {
*drpcmux.Mux
*drpcserver.Server
}
func (ts *TesServer) Dial() drpc.Conn {
sc, cc := net.Pipe()
go ts.Server.ServeOne(context.Background(), sc)
return drpcconn.New(cc)
}

View File

@ -4,9 +4,9 @@ import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/metric"
secure2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/prometheus/client_golang/prometheus"
"github.com/zeebo/errs"
"go.uber.org/zap"
@ -32,22 +32,22 @@ type DRPCServer interface {
}
type configGetter interface {
GetGRPCServer() config2.GrpcServer
GetGRPCServer() config.GrpcServer
}
type drpcServer struct {
config config2.GrpcServer
config config.GrpcServer
drpcServer *drpcserver.Server
transport secure2.Service
listeners []secure2.ContextListener
transport secure.Service
listeners []secure.ContextListener
metric metric.Metric
cancel func()
*drpcmux.Mux
}
func (s *drpcServer) Init(a *app.App) (err error) {
s.config = a.MustComponent(config2.CName).(configGetter).GetGRPCServer()
s.transport = a.MustComponent(secure2.CName).(secure2.Service)
s.config = a.MustComponent(config.CName).(configGetter).GetGRPCServer()
s.transport = a.MustComponent(secure.CName).(secure.Service)
s.metric = a.MustComponent(metric.CName).(metric.Metric)
return nil
}
@ -87,7 +87,7 @@ func (s *drpcServer) Run(ctx context.Context) (err error) {
return
}
func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) {
func (s *drpcServer) serve(ctx context.Context, lis secure.ContextListener) {
l := log.With(zap.String("localAddr", lis.Addr().String()))
l.Info("drpc listener started")
defer func() {
@ -111,7 +111,7 @@ func (s *drpcServer) serve(ctx context.Context, lis secure2.ContextListener) {
}
continue
}
if _, ok := err.(secure2.HandshakeError); ok {
if _, ok := err.(secure.HandshakeError); ok {
l.Warn("listener handshake error", zap.Error(err))
continue
}

View File

@ -3,7 +3,7 @@ package secure
import (
"context"
"errors"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/sec"
)
var (

View File

@ -2,7 +2,7 @@ package secure
import (
"context"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p/core/crypto"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"net"
)

View File

@ -2,13 +2,13 @@ package secure
import (
"context"
commonaccount "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
commonaccount "github.com/anytypeio/go-anytype-infrastructure-experiments/common/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/sec"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/sec"
libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls"
"go.uber.org/zap"
"net"

View File

@ -1,4 +1,4 @@
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Configuration,ConfConnector
//go:generate mockgen -destination mock_nodeconf/mock_nodeconf.go github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf Service,Configuration,ConfConnector
package nodeconf
import (

View File

@ -1,5 +1,5 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Configuration,ConfConnector)
// Source: github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf (interfaces: Service,Configuration,ConfConnector)
// Package mock_nodeconf is a generated GoMock package.
package mock_nodeconf
@ -8,10 +8,105 @@ import (
context "context"
reflect "reflect"
app "github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
peer "github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/peer"
nodeconf "github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
gomock "github.com/golang/mock/gomock"
)
// MockService is a mock of Service interface.
type MockService struct {
ctrl *gomock.Controller
recorder *MockServiceMockRecorder
}
// MockServiceMockRecorder is the mock recorder for MockService.
type MockServiceMockRecorder struct {
mock *MockService
}
// NewMockService creates a new mock instance.
func NewMockService(ctrl *gomock.Controller) *MockService {
mock := &MockService{ctrl: ctrl}
mock.recorder = &MockServiceMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockService) EXPECT() *MockServiceMockRecorder {
return m.recorder
}
// ConsensusPeers mocks base method.
func (m *MockService) ConsensusPeers() []string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ConsensusPeers")
ret0, _ := ret[0].([]string)
return ret0
}
// ConsensusPeers indicates an expected call of ConsensusPeers.
func (mr *MockServiceMockRecorder) ConsensusPeers() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConsensusPeers", reflect.TypeOf((*MockService)(nil).ConsensusPeers))
}
// GetById mocks base method.
func (m *MockService) GetById(arg0 string) nodeconf.Configuration {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetById", arg0)
ret0, _ := ret[0].(nodeconf.Configuration)
return ret0
}
// GetById indicates an expected call of GetById.
func (mr *MockServiceMockRecorder) GetById(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetById", reflect.TypeOf((*MockService)(nil).GetById), arg0)
}
// GetLast mocks base method.
func (m *MockService) GetLast() nodeconf.Configuration {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetLast")
ret0, _ := ret[0].(nodeconf.Configuration)
return ret0
}
// GetLast indicates an expected call of GetLast.
func (mr *MockServiceMockRecorder) GetLast() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetLast", reflect.TypeOf((*MockService)(nil).GetLast))
}
// Init mocks base method.
func (m *MockService) Init(arg0 *app.App) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Init", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Init indicates an expected call of Init.
func (mr *MockServiceMockRecorder) Init(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockService)(nil).Init), arg0)
}
// Name mocks base method.
func (m *MockService) Name() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Name")
ret0, _ := ret[0].(string)
return ret0
}
// Name indicates an expected call of Name.
func (mr *MockServiceMockRecorder) Name() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockService)(nil).Name))
}
// MockConfiguration is a mock of Configuration interface.
type MockConfiguration struct {
ctrl *gomock.Controller

View File

@ -3,10 +3,10 @@ package nodeconf
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
config2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys"
encryptionkey2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
signingkey2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/encryptionkey"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/anytypeio/go-chash"
)
@ -36,8 +36,8 @@ type service struct {
type Node struct {
Address string
PeerId string
SigningKey signingkey2.PubKey
EncryptionKey encryptionkey2.PubKey
SigningKey signingkey.PubKey
EncryptionKey encryptionkey.PubKey
}
func (n *Node) Id() string {
@ -49,7 +49,7 @@ func (n *Node) Capacity() float64 {
}
func (s *service) Init(a *app.App) (err error) {
conf := a.MustComponent(config2.CName).(*config2.Config)
conf := a.MustComponent(config.CName).(*config.Config)
s.accountId = conf.Account.PeerId
config := &configuration{
@ -100,10 +100,10 @@ func (s *service) ConsensusPeers() []string {
}
func nodeFromConfigNode(
n config2.Node) (*Node, error) {
n config.Node) (*Node, error) {
decodedSigningKey, err := keys.DecodeKeyFromString(
n.SigningKey,
signingkey2.UnmarshalEd25519PrivateKey,
signingkey.UnmarshalEd25519PrivateKey,
nil)
if err != nil {
return nil, err
@ -111,7 +111,7 @@ func nodeFromConfigNode(
decodedEncryptionKey, err := keys.DecodeKeyFromString(
n.EncryptionKey,
encryptionkey2.NewEncryptionRsaPrivKeyFromBytes,
encryptionkey.NewEncryptionRsaPrivKeyFromBytes,
nil)
if err != nil {
return nil, err

View File

@ -63,20 +63,17 @@ message ACLUserAdd {
ACLUserPermissions permissions = 4;
}
// signing accept key
// rsa encryption key -> read keys
// accept key, encrypt key, invite id
// GetSpace(id) -> ... (space header + acl root) -> diff
// Join(ACLJoinRecord) -> Ok
//
message ACLUserInvite {
bytes acceptPublicKey = 1;
// TODO: change to read key
bytes encryptPublicKey = 2;
repeated bytes encryptedReadKeys = 3;
ACLUserPermissions permissions = 4;
// TODO: either derive inviteId from pub keys or think if it is possible to just use ACL record id
string inviteId = 5;
}

View File

@ -2,8 +2,8 @@ package peer
import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/util/keys/asymmetric/signingkey"
"github.com/libp2p/go-libp2p-core/crypto"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
)
func IDFromSigningPubKey(pubKey signingkey.PubKey) (peer.ID, error) {

View File

@ -1,367 +0,0 @@
package main
import (
"bytes"
"context"
"flag"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/config"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/dialer"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/secure"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/account"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusclient"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"go.uber.org/zap"
"gopkg.in/mgo.v2/bson"
"math/rand"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
var log = logger.NewNamed("main")
var (
flagConfigFile = flag.String("c", "etc/consensus-config.yml", "path to config file")
flagVersion = flag.Bool("v", false, "show version and exit")
flagHelp = flag.Bool("h", false, "show help and exit")
)
func main() {
flag.Parse()
rand.Seed(time.Now().UnixNano())
if *flagVersion {
fmt.Println(app.VersionDescription())
return
}
if *flagHelp {
flag.PrintDefaults()
return
}
if debug, ok := os.LookupEnv("ANYPROF"); ok && debug != "" {
go func() {
http.ListenAndServe(debug, nil)
}()
}
// create app
ctx := context.Background()
a := new(app.App)
// open config file
conf, err := config.NewFromFile(*flagConfigFile)
if err != nil {
log.Fatal("can't open config file", zap.Error(err))
}
// bootstrap components
a.Register(conf)
Bootstrap(a)
// start app
if err := a.Start(ctx); err != nil {
log.Fatal("can't start app", zap.Error(err))
}
log.Info("app started", zap.String("version", a.Version()))
if err := testClient(a.MustComponent(consensusclient.CName).(consensusclient.Service)); err != nil {
log.Fatal("test error", zap.Error(err))
} else {
log.Info("test success!")
}
b := &bench{service: a.MustComponent(consensusclient.CName).(consensusclient.Service)}
b.run()
// wait exit signal
exit := make(chan os.Signal, 1)
signal.Notify(exit, os.Interrupt, syscall.SIGKILL, syscall.SIGTERM, syscall.SIGQUIT)
sig := <-exit
log.Info("received exit signal, stop app...", zap.String("signal", fmt.Sprint(sig)))
// close app
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
if err := a.Close(ctx); err != nil {
log.Fatal("close error", zap.Error(err))
} else {
log.Info("goodbye!")
}
time.Sleep(time.Second / 3)
}
func Bootstrap(a *app.App) {
a.Register(account.New()).
Register(secure.New()).
Register(nodeconf.New()).
Register(dialer.New()).
Register(pool.New()).
Register(consensusclient.New())
}
func testClient(service consensusclient.Service) (err error) {
if err = testCreateLogAndRecord(service); err != nil {
return err
}
if err = testStream(service); err != nil {
return err
}
return
}
func testCreateLogAndRecord(service consensusclient.Service) (err error) {
ctx := context.Background()
// create log
newLogId := []byte(bson.NewObjectId())
st := time.Now()
lastRecId := []byte(bson.NewObjectId())
err = service.AddLog(ctx, &consensusproto.Log{
Id: newLogId,
Records: []*consensusproto.Record{
{
Id: lastRecId,
Payload: []byte("test"),
CreatedUnix: uint64(time.Now().Unix()),
},
},
})
if err != nil {
return err
}
log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st)))
// create log with same id
st = time.Now()
err = service.AddLog(ctx, &consensusproto.Log{
Id: newLogId,
Records: []*consensusproto.Record{
{
Id: lastRecId,
Payload: []byte("test"),
CreatedUnix: uint64(time.Now().Unix()),
},
},
})
if err != consensuserr.ErrLogExists {
return fmt.Errorf("unexpected error: '%v' want LogExists", zap.Error(err))
}
err = nil
log.Info("log duplicate checked", zap.Duration("dur", time.Since(st)))
// create record
st = time.Now()
recId := []byte(bson.NewObjectId())
err = service.AddRecord(ctx, newLogId, &consensusproto.Record{
Id: []byte(bson.NewObjectId()),
PrevId: lastRecId,
CreatedUnix: uint64(time.Now().Unix()),
})
if err != nil {
return err
}
lastRecId = recId
log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st)))
// record conflict
st = time.Now()
err = service.AddRecord(ctx, newLogId, &consensusproto.Record{
Id: []byte(bson.NewObjectId()),
PrevId: []byte(bson.NewObjectId()),
CreatedUnix: uint64(time.Now().Unix()),
})
if err != consensuserr.ErrConflict {
return fmt.Errorf("unexpected error: '%v' want Conflict", zap.Error(err))
}
err = nil
log.Info("conflict record checked", zap.Duration("dur", time.Since(st)))
return
}
func testStream(service consensusclient.Service) (err error) {
ctx := context.Background()
// create log
newLogId := []byte(bson.NewObjectId())
st := time.Now()
lastRecId := []byte(bson.NewObjectId())
err = service.AddLog(ctx, &consensusproto.Log{
Id: newLogId,
Records: []*consensusproto.Record{
{
Id: lastRecId,
Payload: []byte("test"),
CreatedUnix: uint64(time.Now().Unix()),
},
},
})
if err != nil {
return err
}
log.Info("log created", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st)))
stream, err := service.WatchLog(ctx)
if err != nil {
return err
}
defer stream.Close()
st = time.Now()
if err = stream.WatchIds([][]byte{newLogId}); err != nil {
return err
}
log.Info("watch", zap.String("id", bson.ObjectId(newLogId).Hex()), zap.Duration("dur", time.Since(st)))
sr := readStream(stream)
for i := 0; i < 10; i++ {
st = time.Now()
recId := []byte(bson.NewObjectId())
err = service.AddRecord(ctx, newLogId, &consensusproto.Record{
Id: recId,
PrevId: lastRecId,
CreatedUnix: uint64(time.Now().Unix()),
})
if err != nil {
return err
}
lastRecId = recId
log.Info("record created", zap.String("id", bson.ObjectId(lastRecId).Hex()), zap.Duration("dur", time.Since(st)))
}
sr.validate()
return nil
}
func readStream(stream consensusclient.Stream) *streamReader {
sr := &streamReader{stream: stream, logs: map[string]*consensusproto.Log{}}
go sr.read()
return sr
}
type streamReader struct {
stream consensusclient.Stream
logs map[string]*consensusproto.Log
}
func (sr *streamReader) read() {
for {
recs := sr.stream.WaitLogs()
if len(recs) == 0 {
return
}
for _, rec := range recs {
if el, ok := sr.logs[string(rec.Id)]; !ok {
sr.logs[string(rec.Id)] = &consensusproto.Log{
Id: rec.Id,
Records: rec.Records,
}
} else {
el.Records = append(rec.Records, el.Records...)
sr.logs[string(rec.Id)] = el
}
}
}
}
func (sr *streamReader) validate() {
var lc, rc int
for _, log := range sr.logs {
lc++
rc += len(log.Records)
validateLog(log)
}
fmt.Println("logs valid; log count:", lc, "records:", rc)
}
func validateLog(log *consensusproto.Log) {
var prevId []byte
for _, rec := range log.Records {
if len(prevId) != 0 {
if !bytes.Equal(prevId, rec.Id) {
panic(fmt.Sprintf("invalid log: %+v", log))
}
}
prevId = rec.PrevId
}
}
type bench struct {
service consensusclient.Service
stream consensusclient.Stream
}
func (b *bench) run() {
var err error
b.stream, err = b.service.WatchLog(context.Background())
if err != nil {
panic(err)
}
defer b.stream.Close()
sr := readStream(b.stream)
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
time.Sleep(time.Second / 100)
wg.Add(1)
go func() {
defer wg.Done()
b.client()
}()
fmt.Println("total streams:", i+1)
}
wg.Wait()
sr.validate()
}
func (b *bench) client() {
ctx := context.Background()
// create log
newLogId := []byte(bson.NewObjectId())
lastRecId := []byte(bson.NewObjectId())
err := b.service.AddLog(ctx, &consensusproto.Log{
Id: newLogId,
Records: []*consensusproto.Record{
{
Id: lastRecId,
Payload: []byte("test"),
CreatedUnix: uint64(time.Now().Unix()),
},
},
})
for i := 0; i < 5; i++ {
fmt.Println("watch", bson.ObjectId(newLogId).Hex())
if err = b.stream.WatchIds([][]byte{newLogId}); err != nil {
panic(err)
}
for i := 0; i < rand.Intn(20); i++ {
time.Sleep(time.Millisecond * time.Duration(rand.Intn(10000)))
recId := []byte(bson.NewObjectId())
err = b.service.AddRecord(ctx, newLogId, &consensusproto.Record{
Id: recId,
PrevId: lastRecId,
Payload: []byte("some payload 1 2 3 4 5 6 6 7 oijoj"),
CreatedUnix: uint64(time.Now().Unix()),
})
if err != nil {
panic(err)
}
lastRecId = recId
}
if err = b.stream.UnwatchIds([][]byte{newLogId}); err != nil {
panic(err)
}
fmt.Println("unwatch", bson.ObjectId(newLogId).Hex())
time.Sleep(time.Minute * 1)
}
}

View File

@ -5,6 +5,7 @@ import "time"
type Log struct {
Id []byte `bson:"_id"`
Records []Record `bson:"records"`
Err error `bson:"-"`
}
type Record struct {

View File

@ -2,37 +2,65 @@ package consensusclient
import (
"context"
"errors"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpcerr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
_ "github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"go.uber.org/zap"
"sync"
"time"
)
const CName = "consensus.client"
var log = logger.NewNamed(CName)
var (
ErrWatcherExists = errors.New("watcher exists")
ErrWatcherNotExists = errors.New("watcher not exists")
)
func New() Service {
return new(service)
}
// Watcher watches new events by specified logId
type Watcher interface {
AddConsensusRecords(recs []*consensusproto.Record)
AddConsensusError(err error)
}
type Service interface {
// AddLog adds new log to consensus servers
AddLog(ctx context.Context, clog *consensusproto.Log) (err error)
// AddRecord adds new record to consensus servers
AddRecord(ctx context.Context, logId []byte, clog *consensusproto.Record) (err error)
WatchLog(ctx context.Context) (stream Stream, err error)
app.Component
// Watch starts watching to given logId and calls watcher when any relative event received
Watch(logId []byte, w Watcher) (err error)
// UnWatch stops watching given logId and removes watcher
UnWatch(logId []byte) (err error)
app.ComponentRunnable
}
type service struct {
pool pool.Pool
nodeconf nodeconf.Service
watchers map[string]Watcher
stream *stream
close chan struct{}
mu sync.Mutex
}
func (s *service) Init(a *app.App) (err error) {
s.pool = a.MustComponent(pool.CName).(pool.Pool)
s.nodeconf = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.watchers = make(map[string]Watcher)
s.close = make(chan struct{})
return nil
}
@ -40,6 +68,11 @@ func (s *service) Name() (name string) {
return CName
}
func (s *service) Run(_ context.Context) error {
go s.streamWatcher()
return nil
}
func (s *service) getClient(ctx context.Context) (consensusproto.DRPCConsensusClient, error) {
peer, err := s.pool.GetOneOf(ctx, s.nodeconf.ConsensusPeers())
if err != nil {
@ -83,7 +116,37 @@ func (s *service) AddRecord(ctx context.Context, logId []byte, clog *consensuspr
return
}
func (s *service) WatchLog(ctx context.Context) (st Stream, err error) {
func (s *service) Watch(logId []byte, w Watcher) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.watchers[string(logId)]; ok {
return ErrWatcherExists
}
s.watchers[string(logId)] = w
if s.stream != nil {
if wErr := s.stream.WatchIds([][]byte{logId}); wErr != nil {
log.Warn("WatchIds error", zap.Error(wErr))
}
}
return
}
func (s *service) UnWatch(logId []byte) (err error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.watchers[string(logId)]; !ok {
return ErrWatcherNotExists
}
delete(s.watchers, string(logId))
if s.stream != nil {
if wErr := s.stream.UnwatchIds([][]byte{logId}); wErr != nil {
log.Warn("UnWatchIds error", zap.Error(wErr))
}
}
return
}
func (s *service) openStream(ctx context.Context) (st *stream, err error) {
cl, err := s.dialClient(ctx)
if err != nil {
return
@ -94,3 +157,87 @@ func (s *service) WatchLog(ctx context.Context) (st Stream, err error) {
}
return runStream(rpcStream), nil
}
func (s *service) streamWatcher() {
var (
err error
st *stream
i int
)
for {
// open stream
if st, err = s.openStream(context.Background()); err != nil {
// can't open stream, we will retry until success connection or close
if i < 60 {
i++
}
sleepTime := time.Second * time.Duration(i)
log.Error("watch log error", zap.Error(err), zap.Duration("waitTime", sleepTime))
select {
case <-time.After(sleepTime):
continue
case <-s.close:
return
}
}
i = 0
// collect ids and setup stream
s.mu.Lock()
var logIds = make([][]byte, 0, len(s.watchers))
for id := range s.watchers {
logIds = append(logIds, []byte(id))
}
s.stream = st
s.mu.Unlock()
// restore subscriptions
if len(logIds) > 0 {
if err = s.stream.WatchIds(logIds); err != nil {
log.Error("watch ids error", zap.Error(err))
continue
}
}
// read stream
if err = s.streamReader(); err != nil {
log.Error("stream read error", zap.Error(err))
continue
}
return
}
}
func (s *service) streamReader() error {
for {
events := s.stream.WaitLogs()
if len(events) == 0 {
return s.stream.Err()
}
for _, e := range events {
if w, ok := s.watchers[string(e.LogId)]; ok {
if e.Error == nil {
w.AddConsensusRecords(e.Records)
} else {
w.AddConsensusError(rpcerr.Err(uint64(e.Error.Error)))
}
} else {
log.Warn("received unexpected log id", zap.Binary("logId", e.LogId))
}
}
}
}
func (s *service) Close(_ context.Context) error {
s.mu.Lock()
if s.stream != nil {
_ = s.stream.Close()
}
s.mu.Unlock()
select {
case <-s.close:
default:
close(s.close)
}
return nil
}

View File

@ -0,0 +1,221 @@
package consensusclient
import (
"context"
"fmt"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/pool"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/nodeconf/mock_nodeconf"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
"time"
)
func TestService_Watch(t *testing.T) {
t.Run("not found error", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId = []byte{'1'}
w := &testWatcher{}
require.NoError(t, fx.Watch(logId, w))
st := fx.testServer.waitStream(t)
req, err := st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId}, req.WatchIds)
require.NoError(t, st.Send(&consensusproto.WatchLogEvent{
LogId: logId,
Error: &consensusproto.Err{
Error: consensusproto.ErrCodes_ErrorOffset + consensusproto.ErrCodes_LogNotFound,
},
}))
assert.Equal(t, consensuserr.ErrLogNotFound, w.err)
fx.testServer.releaseStream <- nil
})
t.Run("watcherExists error", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId = []byte{'1'}
w := &testWatcher{}
require.NoError(t, fx.Watch(logId, w))
require.Error(t, fx.Watch(logId, w))
st := fx.testServer.waitStream(t)
st.Recv()
fx.testServer.releaseStream <- nil
})
t.Run("watch", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
var logId1 = []byte{'1'}
w := &testWatcher{}
require.NoError(t, fx.Watch(logId1, w))
st := fx.testServer.waitStream(t)
req, err := st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId1}, req.WatchIds)
var logId2 = []byte{'2'}
w = &testWatcher{}
require.NoError(t, fx.Watch(logId2, w))
req, err = st.Recv()
require.NoError(t, err)
assert.Equal(t, [][]byte{logId2}, req.WatchIds)
fx.testServer.releaseStream <- nil
})
}
func TestService_UnWatch(t *testing.T) {
t.Run("no watcher", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
require.Error(t, fx.UnWatch([]byte{'1'}))
})
t.Run("success", func(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
w := &testWatcher{}
require.NoError(t, fx.Watch([]byte{'1'}, w))
assert.NoError(t, fx.UnWatch([]byte{'1'}))
})
}
func TestService_Init(t *testing.T) {
t.Run("reconnect on watch err", func(t *testing.T) {
fx := newFixture(t)
fx.testServer.watchErrOnce = true
fx.run(t)
defer fx.Finish()
fx.testServer.waitStream(t)
fx.testServer.releaseStream <- nil
})
t.Run("reconnect on start", func(t *testing.T) {
fx := newFixture(t)
fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(nil)
fx.run(t)
defer fx.Finish()
time.Sleep(time.Millisecond * 50)
fx.a.MustComponent(pool.CName).(*rpctest.TestPool).WithServer(fx.drpcTS)
fx.testServer.waitStream(t)
fx.testServer.releaseStream <- nil
})
}
func TestService_AddLog(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
assert.NoError(t, fx.AddLog(ctx, &consensusproto.Log{}))
}
func TestService_AddRecord(t *testing.T) {
fx := newFixture(t).run(t)
defer fx.Finish()
assert.NoError(t, fx.AddRecord(ctx, []byte{'1'}, &consensusproto.Record{}))
}
var ctx = context.Background()
func newFixture(t *testing.T) *fixture {
fx := &fixture{
Service: New(),
a: &app.App{},
ctrl: gomock.NewController(t),
testServer: &testServer{
stream: make(chan consensusproto.DRPCConsensus_WatchLogStream),
releaseStream: make(chan error),
},
}
fx.nodeconf = mock_nodeconf.NewMockService(fx.ctrl)
fx.nodeconf.EXPECT().Init(gomock.Any())
fx.nodeconf.EXPECT().Name().Return(nodeconf.CName).AnyTimes()
fx.nodeconf.EXPECT().ConsensusPeers().Return([]string{"c1", "c2"}).AnyTimes()
fx.drpcTS = rpctest.NewTestServer()
require.NoError(t, consensusproto.DRPCRegisterConsensus(fx.drpcTS.Mux, fx.testServer))
fx.a.Register(fx.Service).
Register(fx.nodeconf).
Register(rpctest.NewTestPool().WithServer(fx.drpcTS))
return fx
}
type fixture struct {
Service
nodeconf *mock_nodeconf.MockService
a *app.App
ctrl *gomock.Controller
testServer *testServer
drpcTS *rpctest.TesServer
}
func (fx *fixture) run(t *testing.T) *fixture {
require.NoError(t, fx.a.Start(ctx))
return fx
}
func (fx *fixture) Finish() {
assert.NoError(fx.ctrl.T, fx.a.Close(ctx))
fx.ctrl.Finish()
}
type testServer struct {
stream chan consensusproto.DRPCConsensus_WatchLogStream
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error
releaseStream chan error
watchErrOnce bool
}
func (t *testServer) AddLog(ctx context.Context, req *consensusproto.AddLogRequest) (*consensusproto.Ok, error) {
if t.addLog != nil {
if err := t.addLog(ctx, req); err != nil {
return nil, err
}
}
return &consensusproto.Ok{}, nil
}
func (t *testServer) AddRecord(ctx context.Context, req *consensusproto.AddRecordRequest) (*consensusproto.Ok, error) {
if t.addRecord != nil {
if err := t.addRecord(ctx, req); err != nil {
return nil, err
}
}
return &consensusproto.Ok{}, nil
}
func (t *testServer) WatchLog(stream consensusproto.DRPCConsensus_WatchLogStream) error {
fmt.Println("watchLog", t.watchErrOnce)
if t.watchErrOnce {
t.watchErrOnce = false
return fmt.Errorf("error")
}
t.stream <- stream
return <-t.releaseStream
}
func (t *testServer) waitStream(test *testing.T) consensusproto.DRPCConsensus_WatchLogStream {
select {
case <-time.After(time.Second * 5):
test.Fatalf("waiteStream timeout")
case st := <-t.stream:
return st
}
return nil
}
type testWatcher struct {
recs [][]*consensusproto.Record
err error
}
func (t *testWatcher) AddConsensusRecords(recs []*consensusproto.Record) {
t.recs = append(t.recs, recs)
}
func (t *testWatcher) AddConsensusError(err error) {
t.err = err
}

View File

@ -5,17 +5,10 @@ import (
"github.com/cheggaaa/mb/v2"
)
type Stream interface {
WatchIds(logIds [][]byte) (err error)
UnwatchIds(logIds [][]byte) (err error)
WaitLogs() []*consensusproto.Log
Close() error
}
func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream {
func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) *stream {
st := &stream{
rpcStream: rpcStream,
mb: mb.New((*consensusproto.Log)(nil), 100),
mb: mb.New((*consensusproto.WatchLogEvent)(nil), 100),
}
go st.readStream()
return st
@ -23,7 +16,8 @@ func runStream(rpcStream consensusproto.DRPCConsensus_WatchLogClient) Stream {
type stream struct {
rpcStream consensusproto.DRPCConsensus_WatchLogClient
mb *mb.MB[*consensusproto.Log]
mb *mb.MB[*consensusproto.WatchLogEvent]
err error
}
func (s *stream) WatchIds(logIds [][]byte) (err error) {
@ -38,21 +32,23 @@ func (s *stream) UnwatchIds(logIds [][]byte) (err error) {
})
}
func (s *stream) WaitLogs() []*consensusproto.Log {
func (s *stream) WaitLogs() []*consensusproto.WatchLogEvent {
return s.mb.Wait()
}
func (s *stream) Err() error {
return s.err
}
func (s *stream) readStream() {
defer s.Close()
for {
event, err := s.rpcStream.Recv()
if err != nil {
s.err = err
return
}
if err = s.mb.Add(&consensusproto.Log{
Id: event.LogId,
Records: event.Records,
}); err != nil {
if err = s.mb.Add(event); err != nil {
return
}
}

View File

@ -363,6 +363,7 @@ func (m *WatchLogRequest) GetUnwatchIds() [][]byte {
type WatchLogEvent struct {
LogId []byte `protobuf:"bytes,1,opt,name=logId,proto3" json:"logId,omitempty"`
Records []*Record `protobuf:"bytes,2,rep,name=records,proto3" json:"records,omitempty"`
Error *Err `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
}
func (m *WatchLogEvent) Reset() { *m = WatchLogEvent{} }
@ -412,6 +413,57 @@ func (m *WatchLogEvent) GetRecords() []*Record {
return nil
}
func (m *WatchLogEvent) GetError() *Err {
if m != nil {
return m.Error
}
return nil
}
type Err struct {
Error ErrCodes `protobuf:"varint,1,opt,name=error,proto3,enum=anyConsensus.ErrCodes" json:"error,omitempty"`
}
func (m *Err) Reset() { *m = Err{} }
func (m *Err) String() string { return proto.CompactTextString(m) }
func (*Err) ProtoMessage() {}
func (*Err) Descriptor() ([]byte, []int) {
return fileDescriptor_6b92aaf7feaf5a54, []int{7}
}
func (m *Err) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Err) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Err.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Err) XXX_Merge(src proto.Message) {
xxx_messageInfo_Err.Merge(m, src)
}
func (m *Err) XXX_Size() int {
return m.Size()
}
func (m *Err) XXX_DiscardUnknown() {
xxx_messageInfo_Err.DiscardUnknown(m)
}
var xxx_messageInfo_Err proto.InternalMessageInfo
func (m *Err) GetError() ErrCodes {
if m != nil {
return m.Error
}
return ErrCodes_Unexpected
}
func init() {
proto.RegisterEnum("anyConsensus.ErrCodes", ErrCodes_name, ErrCodes_value)
proto.RegisterType((*Log)(nil), "anyConsensus.Log")
@ -421,6 +473,7 @@ func init() {
proto.RegisterType((*AddRecordRequest)(nil), "anyConsensus.AddRecordRequest")
proto.RegisterType((*WatchLogRequest)(nil), "anyConsensus.WatchLogRequest")
proto.RegisterType((*WatchLogEvent)(nil), "anyConsensus.WatchLogEvent")
proto.RegisterType((*Err)(nil), "anyConsensus.Err")
}
func init() {
@ -428,37 +481,39 @@ func init() {
}
var fileDescriptor_6b92aaf7feaf5a54 = []byte{
// 475 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0x4d, 0x8b, 0xd3, 0x50,
0x14, 0xed, 0x4b, 0xc6, 0x4e, 0x7b, 0xd3, 0x8f, 0x78, 0x19, 0x24, 0x74, 0x30, 0x84, 0x88, 0x10,
0x45, 0x5a, 0xa9, 0x82, 0x2b, 0x17, 0x63, 0xa9, 0x50, 0xa9, 0x16, 0x02, 0x55, 0x70, 0x65, 0xcc,
0x7b, 0x8d, 0x61, 0x42, 0x5e, 0x7d, 0x2f, 0x1d, 0x3b, 0xff, 0xc2, 0x1f, 0xe2, 0x0f, 0x71, 0xe7,
0x2c, 0x5d, 0x4a, 0xfb, 0x47, 0xa4, 0x2f, 0x4d, 0x68, 0x9c, 0x99, 0x85, 0x9b, 0x84, 0x7b, 0xce,
0x3d, 0xf7, 0x9e, 0xdc, 0x43, 0xe0, 0x61, 0xc8, 0x53, 0xc9, 0x52, 0xb9, 0x92, 0x4b, 0xc1, 0x33,
0x3e, 0x50, 0x4f, 0x39, 0x28, 0xd1, 0xbe, 0x02, 0xb0, 0x15, 0xa4, 0x97, 0xa3, 0x02, 0x73, 0xc7,
0xa0, 0x4f, 0x79, 0x84, 0x1d, 0xd0, 0x62, 0x6a, 0x11, 0x87, 0x78, 0x2d, 0x5f, 0x8b, 0x29, 0xf6,
0xe1, 0x58, 0xb0, 0x90, 0x0b, 0x2a, 0x2d, 0xcd, 0xd1, 0x3d, 0x63, 0x78, 0xd2, 0x3f, 0x94, 0xf5,
0x7d, 0x45, 0xfa, 0x45, 0x93, 0x9b, 0x40, 0x3d, 0x87, 0xae, 0x4d, 0xba, 0x07, 0xf5, 0xa5, 0x60,
0x17, 0x13, 0x6a, 0x69, 0x0a, 0xdb, 0x57, 0x68, 0xc1, 0xf1, 0x32, 0xb8, 0x4c, 0x78, 0x40, 0x2d,
0x5d, 0x11, 0x45, 0x89, 0x0e, 0x18, 0xa1, 0x60, 0x41, 0xc6, 0xe8, 0x3c, 0x8d, 0xd7, 0xd6, 0x91,
0x43, 0xbc, 0x23, 0xff, 0x10, 0x72, 0x8f, 0x40, 0x9b, 0x9d, 0xbb, 0xcf, 0xa1, 0x7d, 0x46, 0xe9,
0x94, 0x47, 0x3e, 0xfb, 0xba, 0x62, 0x32, 0xc3, 0x07, 0xa0, 0x27, 0x3c, 0x52, 0xbb, 0x8d, 0xe1,
0xdd, 0xaa, 0xe1, 0x5d, 0xdb, 0x8e, 0x75, 0xdf, 0x83, 0x79, 0x46, 0xe9, 0xde, 0xff, 0x5e, 0x78,
0x02, 0x77, 0x12, 0x1e, 0x4d, 0x0a, 0xdb, 0x79, 0x81, 0x4f, 0xa0, 0x9e, 0x7f, 0x9e, 0x72, 0x7e,
0xdb, 0x09, 0xf6, 0x3d, 0xee, 0x5b, 0xe8, 0x7e, 0x08, 0xb2, 0xf0, 0xcb, 0x81, 0x9f, 0x1e, 0x34,
0xbe, 0xed, 0xa0, 0x09, 0x95, 0x16, 0x71, 0x74, 0xaf, 0xe5, 0x97, 0x35, 0xda, 0x00, 0xab, 0xb4,
0x64, 0x35, 0xc5, 0x1e, 0x20, 0xee, 0x1c, 0xda, 0xc5, 0xb8, 0xf1, 0x05, 0x4b, 0x6f, 0xf3, 0xf8,
0x9f, 0x39, 0x3d, 0xfe, 0x04, 0x8d, 0xb1, 0x10, 0x23, 0x4e, 0x99, 0xc4, 0x0e, 0xc0, 0x3c, 0x65,
0xeb, 0x25, 0x0b, 0x33, 0x46, 0xcd, 0x1a, 0xb6, 0xa1, 0xb9, 0xdb, 0xb6, 0x8e, 0x65, 0x26, 0x4d,
0x82, 0x5d, 0x30, 0xa6, 0x3c, 0x7a, 0xc7, 0xb3, 0xd7, 0x7c, 0x95, 0x52, 0x53, 0x43, 0x84, 0x4e,
0x3e, 0x6e, 0xc4, 0xd3, 0x45, 0x12, 0x87, 0x99, 0xa9, 0xa3, 0x09, 0xc6, 0x58, 0x08, 0x2e, 0x66,
0x8b, 0x85, 0x64, 0x99, 0xf9, 0x43, 0x1b, 0xfe, 0x22, 0xd0, 0x2c, 0xf7, 0xe3, 0x0b, 0xa8, 0xe7,
0x19, 0xe1, 0x69, 0xd5, 0x58, 0x25, 0xb9, 0x9e, 0x59, 0x25, 0x67, 0xe7, 0xf8, 0x12, 0x9a, 0x65,
0x4c, 0x68, 0x5f, 0xd3, 0x56, 0xf2, 0xbb, 0x41, 0xfe, 0x06, 0x1a, 0xc5, 0xf9, 0xf0, 0x7e, 0x95,
0xfd, 0x27, 0xa5, 0xde, 0xe9, 0xcd, 0xb4, 0xba, 0xba, 0x47, 0x9e, 0x92, 0x57, 0x8f, 0x7e, 0x6e,
0x6c, 0x72, 0xb5, 0xb1, 0xc9, 0x9f, 0x8d, 0x4d, 0xbe, 0x6f, 0xed, 0xda, 0xd5, 0xd6, 0xae, 0xfd,
0xde, 0xda, 0xb5, 0x8f, 0xdd, 0x41, 0xf5, 0x97, 0xfb, 0x5c, 0x57, 0xaf, 0x67, 0x7f, 0x03, 0x00,
0x00, 0xff, 0xff, 0x40, 0xd0, 0xc6, 0x8e, 0x8b, 0x03, 0x00, 0x00,
// 509 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x53, 0xcd, 0x8a, 0xd3, 0x50,
0x14, 0xee, 0x4d, 0x3a, 0x9d, 0xf6, 0xf4, 0x2f, 0x1e, 0x86, 0x21, 0x74, 0x30, 0x94, 0x88, 0x58,
0x65, 0x68, 0xa5, 0x23, 0xb8, 0x72, 0x31, 0x96, 0x0a, 0x95, 0x6a, 0x21, 0x30, 0x0a, 0xae, 0xac,
0xb9, 0xb7, 0x31, 0x4c, 0xc8, 0xad, 0xf7, 0xa6, 0x63, 0x67, 0xe3, 0x33, 0xf8, 0x20, 0x3e, 0x88,
0x3b, 0x67, 0xe9, 0x52, 0xda, 0x17, 0x91, 0xdc, 0xfc, 0xd0, 0xd8, 0x99, 0x85, 0x9b, 0x96, 0xf3,
0x7d, 0xe7, 0xe7, 0xbb, 0xe7, 0x7c, 0x81, 0x87, 0x2e, 0x0f, 0x25, 0x0b, 0xe5, 0x4a, 0x2e, 0x05,
0x8f, 0xf8, 0x40, 0xfd, 0xca, 0x41, 0x8e, 0xf6, 0x15, 0x80, 0x8d, 0x79, 0x78, 0x3d, 0xca, 0x30,
0x7b, 0x0c, 0xfa, 0x94, 0x7b, 0xd8, 0x02, 0xcd, 0xa7, 0x26, 0xe9, 0x92, 0x5e, 0xc3, 0xd1, 0x7c,
0x8a, 0x7d, 0x38, 0x14, 0xcc, 0xe5, 0x82, 0x4a, 0x53, 0xeb, 0xea, 0xbd, 0xfa, 0xf0, 0xa8, 0xbf,
0x5b, 0xd6, 0x77, 0x14, 0xe9, 0x64, 0x49, 0x76, 0x00, 0x95, 0x04, 0xda, 0xeb, 0x74, 0x0c, 0x95,
0xa5, 0x60, 0x57, 0x13, 0x6a, 0x6a, 0x0a, 0x4b, 0x23, 0x34, 0xe1, 0x70, 0x39, 0xbf, 0x0e, 0xf8,
0x9c, 0x9a, 0xba, 0x22, 0xb2, 0x10, 0xbb, 0x50, 0x77, 0x05, 0x9b, 0x47, 0x8c, 0x5e, 0x84, 0xfe,
0xda, 0x2c, 0x77, 0x49, 0xaf, 0xec, 0xec, 0x42, 0x76, 0x19, 0xb4, 0xd9, 0xa5, 0xfd, 0x0c, 0x9a,
0xe7, 0x94, 0x4e, 0xb9, 0xe7, 0xb0, 0x2f, 0x2b, 0x26, 0x23, 0x7c, 0x00, 0x7a, 0xc0, 0x3d, 0x35,
0xbb, 0x3e, 0xbc, 0x57, 0x14, 0x1c, 0xa7, 0xc5, 0xac, 0xfd, 0x0e, 0x8c, 0x73, 0x4a, 0x53, 0xfd,
0x69, 0xe1, 0x11, 0x1c, 0x04, 0xdc, 0x9b, 0x64, 0xb2, 0x93, 0x00, 0x4f, 0xa1, 0x92, 0x3c, 0x4f,
0x29, 0xbf, 0x6b, 0x05, 0x69, 0x8e, 0xfd, 0x06, 0xda, 0xef, 0xe7, 0x91, 0xfb, 0x79, 0x47, 0x4f,
0x07, 0xaa, 0x5f, 0x63, 0x68, 0x42, 0xa5, 0x49, 0xba, 0x7a, 0xaf, 0xe1, 0xe4, 0x31, 0x5a, 0x00,
0xab, 0x30, 0x67, 0x35, 0xc5, 0xee, 0x20, 0xf6, 0x37, 0x68, 0x66, 0xed, 0xc6, 0x57, 0x2c, 0xbc,
0x4b, 0xe3, 0x7f, 0xde, 0x09, 0x1f, 0xc1, 0x01, 0x13, 0x82, 0x0b, 0xb5, 0xf3, 0xbd, 0x25, 0x8d,
0x85, 0x70, 0x12, 0xde, 0x3e, 0x03, 0x7d, 0x2c, 0x04, 0x9e, 0x66, 0xf9, 0xf1, 0xd4, 0xd6, 0xf0,
0x78, 0x2f, 0x7f, 0xc4, 0x29, 0x93, 0x69, 0xd1, 0x93, 0x8f, 0x50, 0xcd, 0x20, 0x6c, 0x01, 0x5c,
0x84, 0x6c, 0xbd, 0x64, 0x6e, 0xc4, 0xa8, 0x51, 0xc2, 0x26, 0xd4, 0xe2, 0xb7, 0xac, 0x7d, 0x19,
0x49, 0x83, 0x60, 0x1b, 0xea, 0x53, 0xee, 0xbd, 0xe5, 0xd1, 0x2b, 0xbe, 0x0a, 0xa9, 0xa1, 0x21,
0x42, 0x2b, 0x11, 0x3b, 0xe2, 0xe1, 0x22, 0xf0, 0xdd, 0xc8, 0xd0, 0xd1, 0x80, 0xfa, 0x38, 0x6e,
0x3c, 0x5b, 0x2c, 0x24, 0x8b, 0x8c, 0x1f, 0xda, 0xf0, 0x17, 0x81, 0x5a, 0x3e, 0x1f, 0x9f, 0x43,
0x25, 0x71, 0x00, 0x9e, 0x14, 0x85, 0x15, 0x7c, 0xd1, 0x31, 0x8a, 0xe4, 0xec, 0x12, 0x5f, 0x40,
0x2d, 0x37, 0x01, 0x5a, 0x7b, 0xb5, 0x05, 0x77, 0xdc, 0x52, 0xfe, 0x1a, 0xaa, 0xd9, 0x71, 0xf0,
0x7e, 0x91, 0xfd, 0xc7, 0x03, 0x9d, 0x93, 0xdb, 0x69, 0x75, 0xd3, 0x1e, 0x79, 0x4a, 0x5e, 0x3e,
0xfe, 0xb9, 0xb1, 0xc8, 0xcd, 0xc6, 0x22, 0x7f, 0x36, 0x16, 0xf9, 0xbe, 0xb5, 0x4a, 0x37, 0x5b,
0xab, 0xf4, 0x7b, 0x6b, 0x95, 0x3e, 0xb4, 0x07, 0xc5, 0x0f, 0xfa, 0x53, 0x45, 0xfd, 0x9d, 0xfd,
0x0d, 0x00, 0x00, 0xff, 0xff, 0xaa, 0x39, 0x8f, 0xb0, 0xe9, 0x03, 0x00, 0x00,
}
func (m *Log) Marshal() (dAtA []byte, err error) {
@ -715,6 +770,18 @@ func (m *WatchLogEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i
var l int
_ = l
if m.Error != nil {
{
size, err := m.Error.MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintConsensus(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0x1a
}
if len(m.Records) > 0 {
for iNdEx := len(m.Records) - 1; iNdEx >= 0; iNdEx-- {
{
@ -739,6 +806,34 @@ func (m *WatchLogEvent) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil
}
func (m *Err) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Err) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Err) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.Error != 0 {
i = encodeVarintConsensus(dAtA, i, uint64(m.Error))
i--
dAtA[i] = 0x8
}
return len(dAtA) - i, nil
}
func encodeVarintConsensus(dAtA []byte, offset int, v uint64) int {
offset -= sovConsensus(v)
base := offset
@ -869,6 +964,22 @@ func (m *WatchLogEvent) Size() (n int) {
n += 1 + l + sovConsensus(uint64(l))
}
}
if m.Error != nil {
l = m.Error.Size()
n += 1 + l + sovConsensus(uint64(l))
}
return n
}
func (m *Err) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if m.Error != 0 {
n += 1 + sovConsensus(uint64(m.Error))
}
return n
}
@ -1634,6 +1745,111 @@ func (m *WatchLogEvent) Unmarshal(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConsensus
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthConsensus
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthConsensus
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if m.Error == nil {
m.Error = &Err{}
}
if err := m.Error.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipConsensus(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthConsensus
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *Err) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConsensus
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Err: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Err: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Error", wireType)
}
m.Error = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowConsensus
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.Error |= ErrCodes(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipConsensus(dAtA[iNdEx:])

View File

@ -52,4 +52,9 @@ message WatchLogRequest {
message WatchLogEvent {
bytes logId = 1;
repeated Record records = 2;
Err error = 3;
}
message Err {
ErrCodes error = 1;
}

View File

@ -6,8 +6,10 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/server"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/db"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/stream"
"storj.io/drpc/drpcerr"
"time"
)
@ -57,11 +59,26 @@ func (c *consensusRpc) WatchLog(rpcStream consensusproto.DRPCConsensus_WatchLogS
return rpcStream.Close()
}
for _, rec := range recs {
if err := rpcStream.Send(&consensusproto.WatchLogEvent{
LogId: rec.Id,
Records: recordsToProto(rec.Records),
}); err != nil {
return err
if rec.Err == nil {
if err := rpcStream.Send(&consensusproto.WatchLogEvent{
LogId: rec.Id,
Records: recordsToProto(rec.Records),
}); err != nil {
return err
}
} else {
errCode := consensusproto.ErrCodes(drpcerr.Code(rec.Err))
if errCode == 0 {
errCode = consensusproto.ErrCodes(drpcerr.Code(consensuserr.ErrUnexpected))
}
if err := rpcStream.Send(&consensusproto.WatchLogEvent{
LogId: rec.Id,
Error: &consensusproto.Err{
Error: errCode,
},
}); err != nil {
return err
}
}
}
}

View File

@ -5,7 +5,7 @@ import (
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app/logger"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/metric"
ocache2 "github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/pkg/ocache"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/db"
"github.com/cheggaaa/mb/v2"
@ -42,21 +42,20 @@ type Service interface {
type service struct {
db db.Service
cache ocache2.OCache
cache ocache.OCache
lastStreamId uint64
}
func (s *service) Init(a *app.App) (err error) {
s.db = a.MustComponent(db.CName).(db.Service)
cacheOpts := []ocache2.Option{
ocache2.WithTTL(cacheTTL),
ocache2.WithLogger(log.Named("cache").Sugar()),
cacheOpts := []ocache.Option{
ocache.WithTTL(cacheTTL),
ocache.WithLogger(log.Named("cache").Sugar()),
}
if ms := a.Component(metric.CName); ms != nil {
cacheOpts = append(cacheOpts, ocache2.WithPrometheus(ms.(metric.Metric).Registry(), "consensus", "logcache"))
cacheOpts = append(cacheOpts, ocache.WithPrometheus(ms.(metric.Metric).Registry(), "consensus", "logcache"))
}
s.cache = ocache2.New(s.loadLog, cacheOpts...)
s.cache = ocache.New(s.loadLog, cacheOpts...)
return s.db.SetChangeReceiver(s.receiveChange)
}
@ -98,7 +97,7 @@ func (s *service) RemoveStream(ctx context.Context, logId []byte, streamId uint6
return
}
func (s *service) loadLog(ctx context.Context, id string) (value ocache2.Object, err error) {
func (s *service) loadLog(ctx context.Context, id string) (value ocache.Object, err error) {
if ctxLog := ctx.Value(ctxLogKey); ctxLog != nil {
return &object{
logId: ctxLog.(consensus.Log).Id,

View File

@ -4,6 +4,7 @@ import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/app"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto/consensuserr"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -14,93 +15,111 @@ import (
var ctx = context.Background()
func TestService_NewStream(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
t.Run("watch/unwatch", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
var expLogId = []byte("logId")
var preloadLogId = []byte("preloadId")
var expLogId = []byte("logId")
var preloadLogId = []byte("preloadId")
fx.mockDB.fetchLog = func(ctx context.Context, logId []byte) (log consensus.Log, err error) {
require.Equal(t, expLogId, logId)
return consensus.Log{
Id: logId,
Records: []consensus.Record{
{
Id: []byte{'1'},
fx.mockDB.fetchLog = func(ctx context.Context, logId []byte) (log consensus.Log, err error) {
require.Equal(t, expLogId, logId)
return consensus.Log{
Id: logId,
Records: []consensus.Record{
{
Id: []byte{'1'},
},
},
},
}, nil
}
fx.mockDB.receiver(preloadLogId, []consensus.Record{
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
st1 := fx.NewStream()
sr1 := readStream(st1)
assert.Equal(t, uint64(1), sr1.id)
st1.WatchIds(ctx, [][]byte{expLogId, preloadLogId})
st1.UnwatchIds(ctx, [][]byte{preloadLogId})
assert.Equal(t, [][]byte{expLogId}, st1.LogIds())
st2 := fx.NewStream()
sr2 := readStream(st2)
assert.Equal(t, uint64(2), sr2.id)
st2.WatchIds(ctx, [][]byte{expLogId, preloadLogId})
fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver(preloadLogId, []consensus.Record{
{
Id: []byte{'3'},
PrevId: []byte{'4'},
},
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
st1.Close()
st2.Close()
for _, sr := range []*streamReader{sr1, sr2} {
select {
case <-time.After(time.Second / 3):
require.False(t, true, "timeout")
case <-sr.finished:
}, nil
}
}
require.Len(t, sr1.logs, 2)
assert.Len(t, sr1.logs[string(expLogId)].Records, 2)
assert.Equal(t, []byte{'2'}, sr1.logs[string(expLogId)].Records[0].Id)
assert.Equal(t, []byte{'2'}, sr1.logs[string(preloadLogId)].Records[0].Id)
fx.mockDB.receiver(preloadLogId, []consensus.Record{
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
require.Len(t, sr2.logs, 2)
assert.Len(t, sr2.logs[string(expLogId)].Records, 2)
assert.Equal(t, []byte{'2'}, sr2.logs[string(expLogId)].Records[0].Id)
assert.Equal(t, []byte{'3'}, sr2.logs[string(preloadLogId)].Records[0].Id)
st1 := fx.NewStream()
sr1 := readStream(st1)
assert.Equal(t, uint64(1), sr1.id)
st1.WatchIds(ctx, [][]byte{expLogId, preloadLogId})
st1.UnwatchIds(ctx, [][]byte{preloadLogId})
assert.Equal(t, [][]byte{expLogId}, st1.LogIds())
st2 := fx.NewStream()
sr2 := readStream(st2)
assert.Equal(t, uint64(2), sr2.id)
st2.WatchIds(ctx, [][]byte{expLogId, preloadLogId})
fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver(expLogId, []consensus.Record{
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
fx.mockDB.receiver(preloadLogId, []consensus.Record{
{
Id: []byte{'3'},
PrevId: []byte{'4'},
},
{
Id: []byte{'2'},
PrevId: []byte{'1'},
},
{
Id: []byte{'1'},
},
})
st1.Close()
st2.Close()
for _, sr := range []*streamReader{sr1, sr2} {
select {
case <-time.After(time.Second / 3):
require.False(t, true, "timeout")
case <-sr.finished:
}
}
require.Len(t, sr1.logs, 2)
assert.Len(t, sr1.logs[string(expLogId)].Records, 2)
assert.Equal(t, []byte{'2'}, sr1.logs[string(expLogId)].Records[0].Id)
assert.Equal(t, []byte{'2'}, sr1.logs[string(preloadLogId)].Records[0].Id)
require.Len(t, sr2.logs, 2)
assert.Len(t, sr2.logs[string(expLogId)].Records, 2)
assert.Equal(t, []byte{'2'}, sr2.logs[string(expLogId)].Records[0].Id)
assert.Equal(t, []byte{'3'}, sr2.logs[string(preloadLogId)].Records[0].Id)
})
t.Run("error", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
fx.mockDB.fetchLog = func(ctx context.Context, logId []byte) (log consensus.Log, err error) {
return log, consensuserr.ErrLogNotFound
}
st1 := fx.NewStream()
sr1 := readStream(st1)
id := []byte("nonExists")
assert.Equal(t, uint64(1), sr1.id)
st1.WatchIds(ctx, [][]byte{id})
st1.Close()
<-sr1.finished
require.Len(t, sr1.logs, 1)
assert.Equal(t, consensuserr.ErrLogNotFound, sr1.logs[string(id)].Err)
})
}
func newFixture(t *testing.T) *fixture {

View File

@ -48,7 +48,11 @@ func (s *Stream) WatchIds(ctx context.Context, logIds [][]byte) {
if _, ok := s.logIds[logIdKey]; !ok {
s.logIds[logIdKey] = struct{}{}
if addErr := s.s.AddStream(ctx, logId, s); addErr != nil {
log.Warn("can't add stream for log", zap.Binary("logId", logId), zap.Error(addErr))
log.Info("can't add stream for log", zap.Binary("logId", logId), zap.Error(addErr))
_ = s.mb.Add(consensus.Log{
Id: logId,
Err: addErr,
})
}
}
}

View File

@ -4,9 +4,12 @@ go 1.19
replace github.com/anytypeio/go-anytype-infrastructure-experiments/common => ../common
replace github.com/anytypeio/go-anytype-infrastructure-experiments/consensus => ../consensus
require (
github.com/akrylysov/pogreb v0.10.1
github.com/anytypeio/go-anytype-infrastructure-experiments/common v0.0.0-00010101000000-000000000000
github.com/anytypeio/go-anytype-infrastructure-experiments/consensus v0.0.0-00010101000000-000000000000
go.uber.org/zap v1.23.0
)
@ -15,6 +18,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheggaaa/mb/v2 v2.0.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/fogleman/gg v1.3.0 // indirect
github.com/goccy/go-graphviz v0.0.9 // indirect
@ -52,7 +56,7 @@ require (
github.com/zeebo/errs v1.3.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/image v0.0.0-20200119044424-58c23975cae1 // indirect
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab // indirect
google.golang.org/protobuf v1.28.1 // indirect

View File

@ -57,6 +57,8 @@ github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghf
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/mb/v2 v2.0.1 h1:gn0khbEbKlw3i5VOYi0VnHEHayjZKfUDOyGSpHAybBs=
github.com/cheggaaa/mb/v2 v2.0.1/go.mod h1:XGeZw20Iqgjky26KL0mvCwk3+4NyZCUbshSo6ALne+c=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
@ -310,8 +312,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d h1:sK3txAijHtOK88l68nt020reeT1ZdKLIYetKl95FzVY=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=

View File

@ -53,10 +53,6 @@ func (s *service) Name() (name string) {
}
func (s *service) Run(ctx context.Context) (err error) {
go func() {
time.Sleep(time.Second * 5)
_, _ = s.GetSpace(ctx, "testDSpace")
}()
return
}