Test stream pool

This commit is contained in:
mcrakhman 2022-10-24 21:57:19 +02:00
parent 8263bbfde2
commit f5b9275192
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
4 changed files with 399 additions and 4 deletions

View File

@ -245,7 +245,8 @@ func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.SpaceStre
Loop:
for {
msg, err := stream.Recv()
var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
break
@ -260,7 +261,8 @@ Loop:
limiter <- struct{}{}
}()
}
return s.removePeer(peerId)
s.removePeer(peerId)
return
}
func (s *streamPool) removePeer(peerId string) (err error) {

View File

@ -0,0 +1,339 @@
package syncservice
import (
"context"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/commonspace/spacesyncproto"
"github.com/anytypeio/go-anytype-infrastructure-experiments/common/net/rpc/rpctest"
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/require"
"storj.io/drpc"
"testing"
"time"
)
type testPeer struct {
id string
drpc.Conn
}
func (t testPeer) Id() string {
return t.id
}
func (t testPeer) LastUsage() time.Time {
return time.Now()
}
func (t testPeer) UpdateLastUsage() {}
type testServer struct {
stream chan spacesyncproto.DRPCSpace_StreamStream
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
addRecord func(ctx context.Context, req *consensusproto.AddRecordRequest) error
releaseStream chan error
watchErrOnce bool
}
func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
panic("implement me")
}
func (t *testServer) PushSpace(ctx context.Context, request *spacesyncproto.PushSpaceRequest) (*spacesyncproto.PushSpaceResponse, error) {
panic("implement me")
}
func (t *testServer) Stream(stream spacesyncproto.DRPCSpace_StreamStream) error {
t.stream <- stream
return <-t.releaseStream
}
func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpace_StreamStream {
select {
case <-time.After(time.Second * 5):
test.Fatalf("waiteStream timeout")
case st := <-t.stream:
return st
}
return nil
}
type fixture struct {
testServer *testServer
drpcTS *rpctest.TesServer
client spacesyncproto.DRPCSpaceClient
clientStream spacesyncproto.DRPCSpace_StreamStream
serverStream spacesyncproto.DRPCSpace_StreamStream
pool *streamPool
localId peer.ID
remoteId peer.ID
}
func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler) *fixture {
fx := &fixture{
testServer: &testServer{},
drpcTS: rpctest.NewTestServer(),
localId: localId,
remoteId: remoteId,
}
fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1)
require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer))
clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId)
p := &testPeer{id: localId.String(), Conn: fx.drpcTS.DialWrapConn(nil, clientWrapper)}
fx.client = spacesyncproto.NewDRPCSpaceClient(p)
var err error
fx.clientStream, err = fx.client.Stream(context.Background())
require.NoError(t, err)
fx.serverStream = fx.testServer.waitStream(t)
fx.pool = newStreamPool(handler).(*streamPool)
return fx
}
func (fx *fixture) run(t *testing.T) chan error {
waitCh := make(chan error)
go func() {
err := fx.pool.AddAndReadStreamSync(fx.clientStream)
waitCh <- err
}()
time.Sleep(time.Millisecond * 10)
fx.pool.Lock()
require.Equal(t, fx.pool.peerStreams[fx.remoteId.String()], fx.clientStream)
fx.pool.Unlock()
return waitCh
}
func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.serverStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_Close(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
var events []string
recvChan := make(chan struct{})
go func() {
fx.pool.Close()
events = append(events, "pool_close")
recvChan <- struct{}{}
}()
time.Sleep(50 * time.Millisecond) //err = <-waitCh
events = append(events, "stream_close")
err := fx.clientStream.Close()
require.NoError(t, err)
<-recvChan
require.Equal(t, []string{"stream_close", "pool_close"}, events)
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
var events []string
recvChan := make(chan struct{})
go func() {
fx.pool.Close()
events = append(events, "pool_close")
recvChan <- struct{}{}
}()
time.Sleep(50 * time.Millisecond) //err = <-waitCh
events = append(events, "stream_close")
err := fx.clientStream.Close()
require.NoError(t, err)
<-recvChan
require.Equal(t, []string{"stream_close", "pool_close"}, events)
})
}
func TestStreamPool_ReceiveMessage(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool receive message from server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
recvChan := make(chan struct{})
fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
require.Equal(t, msg, message)
recvChan <- struct{}{}
return nil
})
waitCh := fx.run(t)
err := fx.serverStream.Send(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_HasActiveStream(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool has active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
require.True(t, fx.pool.HasActiveStream(remId.String()))
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("pool has no active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.False(t, fx.pool.HasActiveStream(remId.String()))
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_SendAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool send async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.SendAsync([]string{remId.String()}, msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_SendSync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool send sync to server", func(t *testing.T) {
objectId := "objectId"
payload := []byte("payload")
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
message.Payload = payload
err = fx.serverStream.Send(message)
require.NoError(t, err)
}()
waitCh := fx.run(t)
res, err := fx.pool.SendSync(remId.String(), msg)
require.NoError(t, err)
require.Equal(t, payload, res.Payload)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
t.Run("pool send sync timeout", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
syncWaitPeriod = time.Millisecond * 30
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
}()
waitCh := fx.run(t)
_, err := fx.pool.SendSync(remId.String(), msg)
require.Equal(t, ErrSyncTimeout, err)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}
func TestStreamPool_BroadcastAsync(t *testing.T) {
remId := peer.ID("remoteId")
t.Run("pool broadcast async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.BroadcastAsync(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId.String()])
})
}

View File

@ -104,7 +104,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
stream, err := s.clientFactory.Client(peer).Stream(ctx)
if err != nil {
err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)
log.With("spaceId", s.spaceId).Errorf("failed to open clientStream: %v", err)
// so here probably the request is failed because there is no such space,
// but diffService should handle such cases by sending pushSpace
continue
@ -113,7 +113,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
if err != nil {
err = rpcerr.Unwrap(err)
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
log.With("spaceId", s.spaceId).Errorf("failed to send first message to clientStream: %v", err)
continue
}
s.streamPool.AddAndReadStreamAsync(stream)

View File

@ -2,6 +2,8 @@ package rpctest
import (
"context"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"net"
"storj.io/drpc"
"storj.io/drpc/drpcconn"
@ -9,6 +11,48 @@ import (
"storj.io/drpc/drpcserver"
)
type SecConnMock struct {
net.Conn
localPrivKey crypto.PrivKey
remotePubKey crypto.PubKey
localId peer.ID
remoteId peer.ID
}
func (s *SecConnMock) LocalPeer() peer.ID {
return s.localId
}
func (s *SecConnMock) LocalPrivateKey() crypto.PrivKey {
return s.localPrivKey
}
func (s *SecConnMock) RemotePeer() peer.ID {
return s.remoteId
}
func (s *SecConnMock) RemotePublicKey() crypto.PubKey {
return s.remotePubKey
}
type ConnWrapper func(conn net.Conn) net.Conn
func NewSecConnWrapper(
localPrivKey crypto.PrivKey,
remotePubKey crypto.PubKey,
localId peer.ID,
remoteId peer.ID) ConnWrapper {
return func(conn net.Conn) net.Conn {
return &SecConnMock{
Conn: conn,
localPrivKey: localPrivKey,
remotePubKey: remotePubKey,
localId: localId,
remoteId: remoteId,
}
}
}
func NewTestServer() *TesServer {
ts := &TesServer{
Mux: drpcmux.New(),
@ -23,7 +67,17 @@ type TesServer struct {
}
func (ts *TesServer) Dial() drpc.Conn {
return ts.DialWrapConn(nil, nil)
}
func (ts *TesServer) DialWrapConn(serverWrapper ConnWrapper, clientWrapper ConnWrapper) drpc.Conn {
sc, cc := net.Pipe()
if serverWrapper != nil {
sc = serverWrapper(sc)
}
if clientWrapper != nil {
cc = clientWrapper(cc)
}
go ts.Server.ServeOne(context.Background(), sc)
return drpcconn.New(cc)
}