common streampool pkg

This commit is contained in:
Sergey Cherepanov 2023-01-18 13:46:49 +03:00
parent c1bf0e12cc
commit 330c97ab30
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
13 changed files with 1071 additions and 0 deletions

View File

@ -15,6 +15,7 @@ proto:
$(eval PKGMAP := $$(P_TREE_CHANGES),$$(P_ACL_RECORDS)) $(eval PKGMAP := $$(P_TREE_CHANGES),$$(P_ACL_RECORDS))
protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. commonspace/spacesyncproto/protos/*.proto protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. commonspace/spacesyncproto/protos/*.proto
protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. commonfile/fileproto/protos/*.proto protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. commonfile/fileproto/protos/*.proto
protoc --gogofaster_out=$(PKGMAP):. --go-drpc_out=protolib=github.com/gogo/protobuf:. net/streampool/testservice/protos/*.proto
deps: deps:
go mod download go mod download

1
go.mod
View File

@ -31,6 +31,7 @@ require (
go.uber.org/atomic v1.10.0 go.uber.org/atomic v1.10.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3
golang.org/x/net v0.3.0
gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
storj.io/drpc v0.0.32 storj.io/drpc v0.0.32

1
go.sum
View File

@ -574,6 +574,7 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.3.0 h1:VWL6FNY2bEEmsGVKabSlHu5Irp34xmMRoqb/9lF9lxk= golang.org/x/net v0.3.0 h1:VWL6FNY2bEEmsGVKabSlHu5Irp34xmMRoqb/9lF9lxk=
golang.org/x/net v0.3.0/go.mod h1:MBQ8lrhLObU/6UmLb4fmbmk5OcyYmqtbGd/9yIeKjEE=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=

View File

@ -0,0 +1,34 @@
package streampool
import (
"errors"
"github.com/gogo/protobuf/proto"
"storj.io/drpc"
)
var (
// EncodingProto drpc.Encoding implementation for gogo protobuf
EncodingProto drpc.Encoding = protoEncoding{}
)
var (
errNotAProtoMsg = errors.New("encoding: not a proto message")
)
type protoEncoding struct{}
func (p protoEncoding) Marshal(msg drpc.Message) ([]byte, error) {
pmsg, ok := msg.(proto.Message)
if !ok {
return nil, errNotAProtoMsg
}
return proto.Marshal(pmsg)
}
func (p protoEncoding) Unmarshal(buf []byte, msg drpc.Message) error {
pmsg, ok := msg.(proto.Message)
if !ok {
return errNotAProtoMsg
}
return proto.Unmarshal(buf, pmsg)
}

View File

@ -0,0 +1,24 @@
package streampool
import (
"github.com/anytypeio/any-sync/net/streampool/testservice"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)
func TestProtoEncoding(t *testing.T) {
t.Run("not a proto err", func(t *testing.T) {
_, err := EncodingProto.Marshal("string")
assert.Error(t, err)
err = EncodingProto.Unmarshal(nil, "sss")
assert.Error(t, err)
})
t.Run("encode", func(t *testing.T) {
data, err := EncodingProto.Marshal(&testservice.StreamMessage{ReqData: "1"})
require.NoError(t, err)
msg := &testservice.StreamMessage{}
require.NoError(t, EncodingProto.Unmarshal(data, msg))
assert.Equal(t, "1", msg.ReqData)
})
}

View File

@ -0,0 +1,44 @@
package streampool
import (
"context"
"github.com/cheggaaa/mb/v3"
"go.uber.org/zap"
)
// newStreamSender creates new sendPool
// workers - how many processes will execute tasks
// maxSize - limit for queue size
func newStreamSender(workers, maxSize int) *sendPool {
ss := &sendPool{
batch: mb.New[func()](maxSize),
}
for i := 0; i < workers; i++ {
go ss.sendLoop()
}
return ss
}
// sendPool needed for parallel execution of the incoming send tasks
type sendPool struct {
batch *mb.MB[func()]
}
func (ss *sendPool) Add(ctx context.Context, f ...func()) (err error) {
return ss.batch.Add(ctx, f...)
}
func (ss *sendPool) sendLoop() {
for {
f, err := ss.batch.WaitOne(context.Background())
if err != nil {
log.Debug("close send loop", zap.Error(err))
return
}
f()
}
}
func (ss *sendPool) Close() (err error) {
return ss.batch.Close()
}

45
net/streampool/stream.go Normal file
View File

@ -0,0 +1,45 @@
package streampool
import (
"go.uber.org/zap"
"storj.io/drpc"
"sync/atomic"
)
type stream struct {
peerId string
stream drpc.Stream
pool *streamPool
streamId uint32
closed atomic.Bool
l *zap.Logger
tags []string
}
func (sr *stream) write(msg drpc.Message) (err error) {
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil {
sr.l.Info("stream write error", zap.Error(err))
sr.streamClose()
}
return err
}
func (sr *stream) readLoop() {
defer func() {
sr.streamClose()
}()
for {
msg := sr.pool.handler.NewReadMessage()
if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil {
sr.l.Info("msg receive error", zap.Error(err))
return
}
}
}
func (sr *stream) streamClose() {
if !sr.closed.Swap(true) {
_ = sr.stream.Close()
sr.pool.removeStream(sr.streamId)
}
}

View File

@ -0,0 +1,202 @@
package streampool
import (
"github.com/anytypeio/any-sync/net/peer"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"golang.org/x/net/context"
"storj.io/drpc"
"sync"
)
// StreamHandler handles incoming messages from streams
type StreamHandler interface {
// OpenStream opens stream with given peer
OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error)
// HandleMessage handles incoming message
HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error)
// NewReadMessage creates new empty message for unmarshalling into it
NewReadMessage() drpc.Message
}
// StreamPool keeps and read streams
type StreamPool interface {
// AddStream adds new incoming stream into the pool
AddStream(peerId string, stream drpc.Stream, tags ...string)
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error)
// Broadcast sends a message to all peers with given tags. Works async.
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
// Close closes all streams
Close() error
}
type streamPool struct {
handler StreamHandler
streamIdsByPeer map[string][]uint32
streamIdsByTag map[string][]uint32
streams map[uint32]*stream
opening map[string]chan struct{}
exec *sendPool
mu sync.RWMutex
lastStreamId uint32
}
func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) {
s.mu.Lock()
defer s.mu.Unlock()
s.lastStreamId++
streamId := s.lastStreamId
st := &stream{
peerId: peerId,
stream: drpcStream,
pool: s,
streamId: streamId,
l: log.With(zap.String("peerId", peerId), zap.Uint32("streamId", streamId)),
tags: tags,
}
s.streams[streamId] = st
s.streamIdsByPeer[peerId] = append(s.streamIdsByPeer[peerId], streamId)
for _, tag := range tags {
s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId)
}
go st.readLoop()
}
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
var funcs []func()
for _, p := range peers {
funcs = append(funcs, func() {
if e := s.sendOne(ctx, p, msg); e != nil {
log.Info("send peer error", zap.Error(e))
}
})
}
return s.exec.Add(ctx, funcs...)
}
func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) {
// get all streams relates to peer
streams, err := s.getStreams(ctx, p)
if err != nil {
return
}
for _, st := range streams {
if err = st.write(msg); err != nil {
log.Info("stream write error", zap.Error(err))
// continue with next stream
continue
} else {
// stop sending on success
break
}
}
return
}
func (s *streamPool) getStreams(ctx context.Context, p peer.Peer) (streams []*stream, err error) {
s.mu.Lock()
// check cached streams
streamIds := s.streamIdsByPeer[p.Id()]
for _, streamId := range streamIds {
streams = append(streams, s.streams[streamId])
}
var openingCh chan struct{}
// no cached streams found
if len(streams) == 0 {
// start opening process
openingCh = s.openStream(ctx, p)
}
s.mu.Unlock()
// not empty openingCh means we should wait for the stream opening and try again
if openingCh != nil {
select {
case <-openingCh:
return s.getStreams(ctx, p)
case <-ctx.Done():
return nil, ctx.Err()
}
}
return streams, nil
}
func (s *streamPool) openStream(ctx context.Context, p peer.Peer) chan struct{} {
if ch, ok := s.opening[p.Id()]; ok {
// already have an opening process for this stream - return channel
return ch
}
ch := make(chan struct{})
s.opening[p.Id()] = ch
go func() {
// start stream opening in separate goroutine to avoid lock whole pool
defer func() {
s.mu.Lock()
defer s.mu.Unlock()
close(ch)
delete(s.opening, p.Id())
}()
// open new stream and add to pool
st, tags, err := s.handler.OpenStream(ctx, p)
if err != nil {
log.Warn("stream open error", zap.Error(err))
return
}
s.AddStream(p.Id(), st, tags...)
}()
return ch
}
func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) {
s.mu.Lock()
var streams []*stream
for _, tag := range tags {
for _, streamId := range s.streamIdsByTag[tag] {
streams = append(streams, s.streams[streamId])
}
}
s.mu.Unlock()
var funcs []func()
for _, st := range streams {
funcs = append(funcs, func() {
if e := st.write(msg); e != nil {
log.Debug("broadcast write error", zap.Error(e))
}
})
}
return s.exec.Add(ctx, funcs...)
}
func (s *streamPool) removeStream(streamId uint32) {
s.mu.Lock()
defer s.mu.Unlock()
st := s.streams[streamId]
if st == nil {
log.Fatal("removeStream: stream does not exist", zap.Uint32("streamId", streamId))
}
var removeStream = func(m map[string][]uint32, key string) {
streamIds := m[key]
idx := slices.Index(streamIds, streamId)
if idx == -1 {
log.Fatal("removeStream: streamId does not exist", zap.Uint32("streamId", streamId))
}
streamIds = slices.Delete(streamIds, idx, idx+1)
if len(streamIds) == 0 {
delete(m, key)
} else {
m[key] = streamIds
}
}
removeStream(s.streamIdsByPeer, st.peerId)
for _, tag := range st.tags {
removeStream(s.streamIdsByTag, tag)
}
delete(s.streams, streamId)
}
func (s *streamPool) Close() (err error) {
return s.exec.Close()
}

View File

@ -0,0 +1,199 @@
package streampool
import (
"fmt"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/rpc/rpctest"
"github.com/anytypeio/any-sync/net/streampool/testservice"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
"sort"
"storj.io/drpc"
"sync"
"sync/atomic"
"testing"
"time"
)
var ctx = context.Background()
func TestStreamPool_AddStream(t *testing.T) {
newClientStream := func(fx *fixture, peerId string) (st testservice.DRPCTest_TestStreamClient, p peer.Peer) {
p, err := fx.tp.Dial(ctx, peerId)
require.NoError(t, err)
s, err := testservice.NewDRPCTestClient(p).TestStream(ctx)
require.NoError(t, err)
return s, p
}
t.Run("broadcast incoming", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
s1, _ := newClientStream(fx, "p1")
fx.AddStream("p1", s1, "space1", "common")
s2, _ := newClientStream(fx, "p2")
fx.AddStream("p2", s2, "space2", "common")
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space1"}, "space1"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "space2"}, "space2"))
require.NoError(t, fx.Broadcast(ctx, &testservice.StreamMessage{ReqData: "common"}, "common"))
var serverResults []string
for i := 0; i < 4; i++ {
select {
case msg := <-fx.tsh.receiveCh:
serverResults = append(serverResults, msg.ReqData)
case <-time.After(time.Second):
require.NoError(t, fmt.Errorf("timeout"))
}
}
sort.Strings(serverResults)
assert.Equal(t, []string{"common", "common", "space1", "space2"}, serverResults)
assert.NoError(t, s1.Close())
assert.NoError(t, s2.Close())
})
t.Run("send incoming", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
s1, p1 := newClientStream(fx, "p1")
defer s1.Close()
fx.AddStream("p1", s1, "space1", "common")
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, p1))
var msg *testservice.StreamMessage
select {
case msg = <-fx.tsh.receiveCh:
case <-time.After(time.Second):
require.NoError(t, fmt.Errorf("timeout"))
}
assert.Equal(t, "test", msg.ReqData)
})
}
func TestStreamPool_Send(t *testing.T) {
t.Run("open stream", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
p, err := fx.tp.Dial(ctx, "p1")
require.NoError(t, err)
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p))
var msg *testservice.StreamMessage
select {
case msg = <-fx.tsh.receiveCh:
case <-time.After(time.Second):
require.NoError(t, fmt.Errorf("timeout"))
}
assert.Equal(t, "should open stream", msg.ReqData)
})
t.Run("parallel open stream", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish(t)
p, err := fx.tp.Dial(ctx, "p1")
require.NoError(t, err)
fx.th.streamOpenDelay = time.Second / 3
var numMsgs = 5
for i := 0; i < numMsgs; i++ {
go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p))
}
var msgs []*testservice.StreamMessage
for i := 0; i < numMsgs; i++ {
select {
case msg := <-fx.tsh.receiveCh:
msgs = append(msgs, msg)
case <-time.After(time.Second):
require.NoError(t, fmt.Errorf("timeout"))
}
}
assert.Len(t, msgs, numMsgs)
// make sure that we have only one stream
assert.Equal(t, int32(1), fx.tsh.streamsCount.Load())
})
}
func newFixture(t *testing.T) *fixture {
fx := &fixture{}
ts := rpctest.NewTestServer()
fx.tsh = &testServerHandler{receiveCh: make(chan *testservice.StreamMessage, 100)}
require.NoError(t, testservice.DRPCRegisterTest(ts, fx.tsh))
fx.tp = rpctest.NewTestPool().WithServer(ts)
fx.th = &testHandler{}
fx.StreamPool = New().NewStreamPool(fx.th)
return fx
}
type fixture struct {
StreamPool
tp *rpctest.TestPool
th *testHandler
tsh *testServerHandler
}
func (fx *fixture) Finish(t *testing.T) {
require.NoError(t, fx.Close())
require.NoError(t, fx.tp.Close(ctx))
}
type testHandler struct {
streamOpenDelay time.Duration
incomingMessages []drpc.Message
mu sync.Mutex
}
func (t *testHandler) OpenStream(ctx context.Context, p peer.Peer) (stream drpc.Stream, tags []string, err error) {
if t.streamOpenDelay > 0 {
time.Sleep(t.streamOpenDelay)
}
stream, err = testservice.NewDRPCTestClient(p).TestStream(ctx)
return
}
func (t *testHandler) HandleMessage(ctx context.Context, peerId string, msg drpc.Message) (err error) {
t.mu.Lock()
defer t.mu.Unlock()
t.incomingMessages = append(t.incomingMessages, msg)
return nil
}
func (t *testHandler) DRPCEncoding() drpc.Encoding {
return EncodingProto
}
func (t *testHandler) NewReadMessage() drpc.Message {
return new(testservice.StreamMessage)
}
type testServerHandler struct {
receiveCh chan *testservice.StreamMessage
streamsCount atomic.Int32
mu sync.Mutex
}
func (t *testServerHandler) TestStream(st testservice.DRPCTest_TestStreamStream) error {
t.streamsCount.Add(1)
defer t.streamsCount.Add(-1)
for {
msg, err := st.Recv()
if err != nil {
return err
}
t.receiveCh <- msg
if err = st.Send(msg); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,42 @@
package streampool
import (
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/app/logger"
)
const CName = "common.net.streampool"
var log = logger.NewNamed(CName)
func New() Service {
return new(service)
}
type Service interface {
NewStreamPool(h StreamHandler) StreamPool
app.Component
}
type service struct {
}
func (s *service) NewStreamPool(h StreamHandler) StreamPool {
return &streamPool{
handler: h,
streamIdsByPeer: map[string][]uint32{},
streamIdsByTag: map[string][]uint32{},
streams: map[uint32]*stream{},
opening: map[string]chan struct{}{},
exec: newStreamSender(10, 100),
lastStreamId: 0,
}
}
func (s *service) Init(a *app.App) (err error) {
return nil
}
func (s *service) Name() (name string) {
return CName
}

View File

@ -0,0 +1,13 @@
syntax = "proto3";
package testService;
option go_package = "net/streampool/testservice";
service Test {
rpc TestStream(stream StreamMessage) returns (stream StreamMessage);
}
message StreamMessage {
string reqData = 1;
}

View File

@ -0,0 +1,317 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: net/streampool/testservice/protos/testservice.proto
package testservice
import (
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type StreamMessage struct {
ReqData string `protobuf:"bytes,1,opt,name=reqData,proto3" json:"reqData,omitempty"`
}
func (m *StreamMessage) Reset() { *m = StreamMessage{} }
func (m *StreamMessage) String() string { return proto.CompactTextString(m) }
func (*StreamMessage) ProtoMessage() {}
func (*StreamMessage) Descriptor() ([]byte, []int) {
return fileDescriptor_1c28d5a3a78be18f, []int{0}
}
func (m *StreamMessage) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *StreamMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_StreamMessage.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *StreamMessage) XXX_Merge(src proto.Message) {
xxx_messageInfo_StreamMessage.Merge(m, src)
}
func (m *StreamMessage) XXX_Size() int {
return m.Size()
}
func (m *StreamMessage) XXX_DiscardUnknown() {
xxx_messageInfo_StreamMessage.DiscardUnknown(m)
}
var xxx_messageInfo_StreamMessage proto.InternalMessageInfo
func (m *StreamMessage) GetReqData() string {
if m != nil {
return m.ReqData
}
return ""
}
func init() {
proto.RegisterType((*StreamMessage)(nil), "testService.StreamMessage")
}
func init() {
proto.RegisterFile("net/streampool/testservice/protos/testservice.proto", fileDescriptor_1c28d5a3a78be18f)
}
var fileDescriptor_1c28d5a3a78be18f = []byte{
// 173 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x32, 0xce, 0x4b, 0x2d, 0xd1,
0x2f, 0x2e, 0x29, 0x4a, 0x4d, 0xcc, 0x2d, 0xc8, 0xcf, 0xcf, 0xd1, 0x2f, 0x49, 0x2d, 0x2e, 0x29,
0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x2f, 0x46, 0x16, 0xd2,
0x03, 0x0b, 0x09, 0x71, 0x83, 0x84, 0x82, 0x21, 0x42, 0x4a, 0x9a, 0x5c, 0xbc, 0xc1, 0x60, 0xfd,
0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x42, 0x12, 0x5c, 0xec, 0x45, 0xa9, 0x85, 0x2e, 0x89,
0x25, 0x89, 0x12, 0x8c, 0x0a, 0x8c, 0x1a, 0x9c, 0x41, 0x30, 0xae, 0x51, 0x00, 0x17, 0x4b, 0x48,
0x6a, 0x71, 0x89, 0x90, 0x07, 0x17, 0x17, 0x88, 0x86, 0x68, 0x13, 0x92, 0xd2, 0x43, 0x32, 0x4e,
0x0f, 0xc5, 0x2c, 0x29, 0x3c, 0x72, 0x1a, 0x8c, 0x06, 0x8c, 0x4e, 0x26, 0x27, 0x1e, 0xc9, 0x31,
0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x17, 0x1e, 0xcb,
0x31, 0xdc, 0x78, 0x2c, 0xc7, 0x10, 0x25, 0x85, 0xdb, 0x63, 0x49, 0x6c, 0x60, 0x6f, 0x18, 0x03,
0x02, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x59, 0x8d, 0x93, 0xfd, 0x00, 0x00, 0x00,
}
func (m *StreamMessage) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *StreamMessage) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *StreamMessage) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.ReqData) > 0 {
i -= len(m.ReqData)
copy(dAtA[i:], m.ReqData)
i = encodeVarintTestservice(dAtA, i, uint64(len(m.ReqData)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintTestservice(dAtA []byte, offset int, v uint64) int {
offset -= sovTestservice(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *StreamMessage) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.ReqData)
if l > 0 {
n += 1 + l + sovTestservice(uint64(l))
}
return n
}
func sovTestservice(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozTestservice(x uint64) (n int) {
return sovTestservice(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *StreamMessage) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTestservice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: StreamMessage: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: StreamMessage: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field ReqData", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTestservice
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTestservice
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTestservice
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.ReqData = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTestservice(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTestservice
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipTestservice(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTestservice
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTestservice
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTestservice
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthTestservice
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupTestservice
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthTestservice
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthTestservice = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowTestservice = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupTestservice = fmt.Errorf("proto: unexpected end of group")
)

View File

@ -0,0 +1,148 @@
// Code generated by protoc-gen-go-drpc. DO NOT EDIT.
// protoc-gen-go-drpc version: v0.0.32
// source: net/streampool/testservice/protos/testservice.proto
package testservice
import (
bytes "bytes"
context "context"
errors "errors"
jsonpb "github.com/gogo/protobuf/jsonpb"
proto "github.com/gogo/protobuf/proto"
drpc "storj.io/drpc"
drpcerr "storj.io/drpc/drpcerr"
)
type drpcEncoding_File_net_streampool_testservice_protos_testservice_proto struct{}
func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) Marshal(msg drpc.Message) ([]byte, error) {
return proto.Marshal(msg.(proto.Message))
}
func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) Unmarshal(buf []byte, msg drpc.Message) error {
return proto.Unmarshal(buf, msg.(proto.Message))
}
func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) JSONMarshal(msg drpc.Message) ([]byte, error) {
var buf bytes.Buffer
err := new(jsonpb.Marshaler).Marshal(&buf, msg.(proto.Message))
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (drpcEncoding_File_net_streampool_testservice_protos_testservice_proto) JSONUnmarshal(buf []byte, msg drpc.Message) error {
return jsonpb.Unmarshal(bytes.NewReader(buf), msg.(proto.Message))
}
type DRPCTestClient interface {
DRPCConn() drpc.Conn
TestStream(ctx context.Context) (DRPCTest_TestStreamClient, error)
}
type drpcTestClient struct {
cc drpc.Conn
}
func NewDRPCTestClient(cc drpc.Conn) DRPCTestClient {
return &drpcTestClient{cc}
}
func (c *drpcTestClient) DRPCConn() drpc.Conn { return c.cc }
func (c *drpcTestClient) TestStream(ctx context.Context) (DRPCTest_TestStreamClient, error) {
stream, err := c.cc.NewStream(ctx, "/testService.Test/TestStream", drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{})
if err != nil {
return nil, err
}
x := &drpcTest_TestStreamClient{stream}
return x, nil
}
type DRPCTest_TestStreamClient interface {
drpc.Stream
Send(*StreamMessage) error
Recv() (*StreamMessage, error)
}
type drpcTest_TestStreamClient struct {
drpc.Stream
}
func (x *drpcTest_TestStreamClient) Send(m *StreamMessage) error {
return x.MsgSend(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{})
}
func (x *drpcTest_TestStreamClient) Recv() (*StreamMessage, error) {
m := new(StreamMessage)
if err := x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}); err != nil {
return nil, err
}
return m, nil
}
func (x *drpcTest_TestStreamClient) RecvMsg(m *StreamMessage) error {
return x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{})
}
type DRPCTestServer interface {
TestStream(DRPCTest_TestStreamStream) error
}
type DRPCTestUnimplementedServer struct{}
func (s *DRPCTestUnimplementedServer) TestStream(DRPCTest_TestStreamStream) error {
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}
type DRPCTestDescription struct{}
func (DRPCTestDescription) NumMethods() int { return 1 }
func (DRPCTestDescription) Method(n int) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
case 0:
return "/testService.Test/TestStream", drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return nil, srv.(DRPCTestServer).
TestStream(
&drpcTest_TestStreamStream{in1.(drpc.Stream)},
)
}, DRPCTestServer.TestStream, true
default:
return "", nil, nil, nil, false
}
}
func DRPCRegisterTest(mux drpc.Mux, impl DRPCTestServer) error {
return mux.Register(impl, DRPCTestDescription{})
}
type DRPCTest_TestStreamStream interface {
drpc.Stream
Send(*StreamMessage) error
Recv() (*StreamMessage, error)
}
type drpcTest_TestStreamStream struct {
drpc.Stream
}
func (x *drpcTest_TestStreamStream) Send(m *StreamMessage) error {
return x.MsgSend(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{})
}
func (x *drpcTest_TestStreamStream) Recv() (*StreamMessage, error) {
m := new(StreamMessage)
if err := x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{}); err != nil {
return nil, err
}
return m, nil
}
func (x *drpcTest_TestStreamStream) RecvMsg(m *StreamMessage) error {
return x.MsgRecv(m, drpcEncoding_File_net_streampool_testservice_protos_testservice_proto{})
}