Add request manager tests
This commit is contained in:
parent
4d1494a17a
commit
c8c0839a57
@ -13,7 +13,9 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
"storj.io/drpc"
|
||||
"storj.io/drpc/drpcconn"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type fixture struct {
|
||||
@ -49,7 +51,7 @@ func (fx *fixture) stop() {
|
||||
fx.ctrl.Finish()
|
||||
}
|
||||
|
||||
func TestRequestManager_Request(t *testing.T) {
|
||||
func TestRequestManager_SyncRequest(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
t.Run("send request", func(t *testing.T) {
|
||||
@ -95,3 +97,93 @@ func TestRequestManager_Request(t *testing.T) {
|
||||
fx.requestManager.requestAndHandle(peerId, msg)
|
||||
})
|
||||
}
|
||||
|
||||
func TestRequestManager_QueueRequest(t *testing.T) {
|
||||
t.Run("max concurrent reqs for peer, independent reqs for other peer", func(t *testing.T) {
|
||||
// testing 2 concurrent requests to one peer and simultaneous to another peer
|
||||
fx := newFixture(t)
|
||||
defer fx.stop()
|
||||
fx.requestManager.workers = 2
|
||||
msgRelease := make(chan struct{})
|
||||
msgWait := make(chan struct{})
|
||||
msgs := sync.Map{}
|
||||
doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
||||
msgs.Store(req.ObjectId, struct{}{})
|
||||
<-msgWait
|
||||
<-msgRelease
|
||||
}
|
||||
otherPeer := "otherPeer"
|
||||
msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"}
|
||||
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
|
||||
msg3 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id3"}
|
||||
otherMsg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "otherId1"}
|
||||
|
||||
// sending requests to first peer
|
||||
peerId := "peerId"
|
||||
err := fx.requestManager.QueueRequest(peerId, msg1)
|
||||
require.NoError(t, err)
|
||||
err = fx.requestManager.QueueRequest(peerId, msg2)
|
||||
require.NoError(t, err)
|
||||
err = fx.requestManager.QueueRequest(peerId, msg3)
|
||||
require.NoError(t, err)
|
||||
|
||||
// waiting until all the messages are loaded
|
||||
msgWait <- struct{}{}
|
||||
msgWait <- struct{}{}
|
||||
_, ok := msgs.Load("id1")
|
||||
require.True(t, ok)
|
||||
_, ok = msgs.Load("id2")
|
||||
require.True(t, ok)
|
||||
// third message should not be read
|
||||
_, ok = msgs.Load("id3")
|
||||
require.False(t, ok)
|
||||
|
||||
// request for other peer should pass
|
||||
err = fx.requestManager.QueueRequest(otherPeer, otherMsg1)
|
||||
require.NoError(t, err)
|
||||
msgWait <- struct{}{}
|
||||
|
||||
_, ok = msgs.Load("otherId1")
|
||||
require.True(t, ok)
|
||||
close(msgRelease)
|
||||
})
|
||||
|
||||
t.Run("no requests after close", func(t *testing.T) {
|
||||
fx := newFixture(t)
|
||||
defer fx.stop()
|
||||
fx.requestManager.workers = 1
|
||||
msgRelease := make(chan struct{})
|
||||
msgWait := make(chan struct{})
|
||||
msgs := sync.Map{}
|
||||
doRequestAndHandle = func(manager *requestManager, peerId string, req *spacesyncproto.ObjectSyncMessage) {
|
||||
msgs.Store(req.ObjectId, struct{}{})
|
||||
<-msgWait
|
||||
<-msgRelease
|
||||
}
|
||||
msg1 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id1"}
|
||||
msg2 := &spacesyncproto.ObjectSyncMessage{ObjectId: "id2"}
|
||||
|
||||
// sending requests to first peer
|
||||
peerId := "peerId"
|
||||
err := fx.requestManager.QueueRequest(peerId, msg1)
|
||||
require.NoError(t, err)
|
||||
err = fx.requestManager.QueueRequest(peerId, msg2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// waiting until all the message is loaded
|
||||
msgWait <- struct{}{}
|
||||
_, ok := msgs.Load("id1")
|
||||
require.True(t, ok)
|
||||
_, ok = msgs.Load("id2")
|
||||
require.False(t, ok)
|
||||
|
||||
fx.requestManager.Close(context.Background())
|
||||
close(msgRelease)
|
||||
// waiting to know if the second one is not taken
|
||||
// because the manager is now closed
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
_, ok = msgs.Load("id2")
|
||||
require.False(t, ok)
|
||||
|
||||
})
|
||||
}
|
||||
|
||||
@ -10,7 +10,10 @@ import (
|
||||
// workers - how many processes will execute tasks
|
||||
// maxSize - limit for queue size
|
||||
func NewExecPool(workers, maxSize int) *ExecPool {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
ss := &ExecPool{
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
workers: workers,
|
||||
batch: mb.New[func()](maxSize),
|
||||
}
|
||||
@ -19,6 +22,8 @@ func NewExecPool(workers, maxSize int) *ExecPool {
|
||||
|
||||
// ExecPool needed for parallel execution of the incoming send tasks
|
||||
type ExecPool struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
workers int
|
||||
batch *mb.MB[func()]
|
||||
}
|
||||
@ -39,7 +44,7 @@ func (ss *ExecPool) Run() {
|
||||
|
||||
func (ss *ExecPool) sendLoop() {
|
||||
for {
|
||||
f, err := ss.batch.WaitOne(context.Background())
|
||||
f, err := ss.batch.WaitOne(ss.ctx)
|
||||
if err != nil {
|
||||
log.Debug("close send loop", zap.Error(err))
|
||||
return
|
||||
@ -49,5 +54,6 @@ func (ss *ExecPool) sendLoop() {
|
||||
}
|
||||
|
||||
func (ss *ExecPool) Close() (err error) {
|
||||
ss.cancel()
|
||||
return ss.batch.Close()
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user