pool.AddPeer close previous peer

This commit is contained in:
Sergey Cherepanov 2023-05-29 16:33:33 +02:00
parent 12a7dfe05a
commit a898c6fc9c
No known key found for this signature in database
GPG Key ID: 87F8EDE8FBDF637C
2 changed files with 36 additions and 3 deletions

View File

@ -17,7 +17,7 @@ type Pool interface {
// GetOneOf searches at least one existing connection in outgoing or creates a new one from a randomly selected id from given list
GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error)
// AddPeer adds incoming peer to the pool
AddPeer(p peer.Peer) (err error)
AddPeer(ctx context.Context, p peer.Peer) (err error)
}
type pool struct {
@ -89,6 +89,18 @@ func (p *pool) GetOneOf(ctx context.Context, peerIds []string) (peer.Peer, error
return nil, lastErr
}
func (p *pool) AddPeer(pr peer.Peer) (err error) {
func (p *pool) AddPeer(ctx context.Context, pr peer.Peer) (err error) {
if err = p.incoming.Add(pr.Id(), pr); err != nil {
if err == ocache.ErrExists {
// in case when an incoming connection with a peer already exists, we close and remove an existing connection
if v, e := p.incoming.Pick(ctx, pr.Id()); e == nil {
_ = v.Close()
_, _ = p.incoming.Remove(ctx, pr.Id())
return p.incoming.Add(pr.Id(), pr)
}
} else {
return err
}
}
return
}

View File

@ -132,6 +132,27 @@ func TestPool_GetOneOf(t *testing.T) {
})
}
func TestPool_AddPeer(t *testing.T) {
t.Run("success", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
require.NoError(t, fx.AddPeer(ctx, newTestPeer("p1")))
})
t.Run("two peers", func(t *testing.T) {
fx := newFixture(t)
defer fx.Finish()
p1, p2 := newTestPeer("p1"), newTestPeer("p1")
require.NoError(t, fx.AddPeer(ctx, p1))
require.NoError(t, fx.AddPeer(ctx, p2))
select {
case <-p1.closed:
default:
assert.Truef(t, false, "peer not closed")
}
})
}
func newFixture(t *testing.T) *fixture {
fx := &fixture{
Service: New(),