Refactor tests
This commit is contained in:
parent
f4027b433d
commit
9a202207b4
@ -7,26 +7,10 @@ import (
|
|||||||
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
|
"github.com/anytypeio/go-anytype-infrastructure-experiments/consensus/consensusproto"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"storj.io/drpc"
|
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type testPeer struct {
|
|
||||||
id string
|
|
||||||
drpc.Conn
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t testPeer) Id() string {
|
|
||||||
return t.id
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t testPeer) LastUsage() time.Time {
|
|
||||||
return time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t testPeer) UpdateLastUsage() {}
|
|
||||||
|
|
||||||
type testServer struct {
|
type testServer struct {
|
||||||
stream chan spacesyncproto.DRPCSpace_StreamStream
|
stream chan spacesyncproto.DRPCSpace_StreamStream
|
||||||
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
|
addLog func(ctx context.Context, req *consensusproto.AddLogRequest) error
|
||||||
@ -79,8 +63,7 @@ func newFixture(t *testing.T, localId, remoteId peer.ID, handler MessageHandler)
|
|||||||
fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1)
|
fx.testServer.stream = make(chan spacesyncproto.DRPCSpace_StreamStream, 1)
|
||||||
require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer))
|
require.NoError(t, spacesyncproto.DRPCRegisterSpace(fx.drpcTS.Mux, fx.testServer))
|
||||||
clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId)
|
clientWrapper := rpctest.NewSecConnWrapper(nil, nil, localId, remoteId)
|
||||||
p := &testPeer{id: localId.String(), Conn: fx.drpcTS.DialWrapConn(nil, clientWrapper)}
|
fx.client = spacesyncproto.NewDRPCSpaceClient(fx.drpcTS.DialWrapConn(nil, clientWrapper))
|
||||||
fx.client = spacesyncproto.NewDRPCSpaceClient(p)
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
fx.clientStream, err = fx.client.Stream(context.Background())
|
fx.clientStream, err = fx.client.Stream(context.Background())
|
||||||
|
|||||||
@ -104,7 +104,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
|
|||||||
stream, err := s.clientFactory.Client(peer).Stream(ctx)
|
stream, err := s.clientFactory.Client(peer).Stream(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = rpcerr.Unwrap(err)
|
err = rpcerr.Unwrap(err)
|
||||||
log.With("spaceId", s.spaceId).Errorf("failed to open clientStream: %v", err)
|
log.With("spaceId", s.spaceId).Errorf("failed to open stream: %v", err)
|
||||||
// so here probably the request is failed because there is no such space,
|
// so here probably the request is failed because there is no such space,
|
||||||
// but diffService should handle such cases by sending pushSpace
|
// but diffService should handle such cases by sending pushSpace
|
||||||
continue
|
continue
|
||||||
@ -113,7 +113,7 @@ func (s *syncService) responsibleStreamCheckLoop(ctx context.Context) {
|
|||||||
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
|
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = rpcerr.Unwrap(err)
|
err = rpcerr.Unwrap(err)
|
||||||
log.With("spaceId", s.spaceId).Errorf("failed to send first message to clientStream: %v", err)
|
log.With("spaceId", s.spaceId).Errorf("failed to send first message to stream: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
s.streamPool.AddAndReadStreamAsync(stream)
|
s.streamPool.AddAndReadStreamAsync(stream)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user