async dial

This commit is contained in:
Sergey Cherepanov 2023-01-30 13:16:07 +03:00
parent c6910bc2fe
commit b2578a19ba
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
2 changed files with 24 additions and 11 deletions

View File

@ -21,6 +21,9 @@ type StreamHandler interface {
NewReadMessage() drpc.Message NewReadMessage() drpc.Message
} }
// PeerGetter should dial or return cached peers
type PeerGetter func(ctx context.Context) (peers []peer.Peer, err error)
// StreamPool keeps and read streams // StreamPool keeps and read streams
type StreamPool interface { type StreamPool interface {
// AddStream adds new outgoing stream into the pool // AddStream adds new outgoing stream into the pool
@ -28,7 +31,7 @@ type StreamPool interface {
// ReadStream adds new incoming stream and synchronously read it // ReadStream adds new incoming stream and synchronously read it
ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error) ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error)
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) Send(ctx context.Context, msg drpc.Message, target PeerGetter) (err error)
// SendById sends a message to given peerIds. Works only if stream exists // SendById sends a message to given peerIds. Works only if stream exists
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
// Broadcast sends a message to all peers with given tags. Works async. // Broadcast sends a message to all peers with given tags. Works async.
@ -95,7 +98,7 @@ func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...st
return st return st
} }
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peerGetter PeerGetter) (err error) {
var sendOneFunc = func(sp peer.Peer) func() { var sendOneFunc = func(sp peer.Peer) func() {
return func() { return func() {
if e := s.sendOne(ctx, sp, msg); e != nil { if e := s.sendOne(ctx, sp, msg); e != nil {
@ -105,13 +108,17 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P
} }
} }
} }
return s.exec.Add(ctx, func() {
for _, p := range peers { peers, dialErr := peerGetter(ctx)
if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil { if dialErr != nil {
return log.InfoCtx(ctx, "can't get peers", zap.Error(dialErr))
} }
} for _, p := range peers {
return if err = s.exec.Add(ctx, sendOneFunc(p)); err != nil {
return
}
}
})
} }
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) { func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {

View File

@ -66,7 +66,9 @@ func TestStreamPool_AddStream(t *testing.T) {
defer s1.Close() defer s1.Close()
fx.AddStream("p1", s1, "space1", "common") fx.AddStream("p1", s1, "space1", "common")
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, p1)) require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "test"}, func(ctx context.Context) (peers []peer.Peer, err error) {
return []peer.Peer{p1}, nil
}))
var msg *testservice.StreamMessage var msg *testservice.StreamMessage
select { select {
case msg = <-fx.tsh.receiveCh: case msg = <-fx.tsh.receiveCh:
@ -85,7 +87,9 @@ func TestStreamPool_Send(t *testing.T) {
p, err := fx.tp.Dial(ctx, "p1") p, err := fx.tp.Dial(ctx, "p1")
require.NoError(t, err) require.NoError(t, err)
require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p)) require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, func(ctx context.Context) (peers []peer.Peer, err error) {
return []peer.Peer{p}, nil
}))
var msg *testservice.StreamMessage var msg *testservice.StreamMessage
select { select {
@ -107,7 +111,9 @@ func TestStreamPool_Send(t *testing.T) {
var numMsgs = 5 var numMsgs = 5
for i := 0; i < numMsgs; i++ { for i := 0; i < numMsgs; i++ {
go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, p)) go require.NoError(t, fx.Send(ctx, &testservice.StreamMessage{ReqData: "should open stream"}, func(ctx context.Context) (peers []peer.Peer, err error) {
return []peer.Peer{p}, nil
}))
} }
var msgs []*testservice.StreamMessage var msgs []*testservice.StreamMessage