Helper methods for local streams etc
This commit is contained in:
parent
56e6cba575
commit
8097445e6e
@ -34,6 +34,7 @@ func New() Dialer {
|
|||||||
type Dialer interface {
|
type Dialer interface {
|
||||||
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
|
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
|
||||||
UpdateAddrs(addrs map[string][]string)
|
UpdateAddrs(addrs map[string][]string)
|
||||||
|
SetPeerAddrs(peerId string, addrs []string)
|
||||||
app.Component
|
app.Component
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,6 +63,15 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) {
|
|||||||
d.mu.Unlock()
|
d.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *dialer) SetPeerAddrs(peerId string, addrs []string) {
|
||||||
|
d.mu.Lock()
|
||||||
|
defer d.mu.Unlock()
|
||||||
|
if d.peerAddrs == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
d.peerAddrs[peerId] = addrs
|
||||||
|
}
|
||||||
|
|
||||||
func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) {
|
func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) {
|
||||||
d.mu.RLock()
|
d.mu.RLock()
|
||||||
defer d.mu.RUnlock()
|
defer d.mu.RUnlock()
|
||||||
|
|||||||
@ -160,6 +160,10 @@ func (d *dialerMock) UpdateAddrs(addrs map[string][]string) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *dialerMock) SetPeerAddrs(peerId string, addrs []string) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (d *dialerMock) Init(a *app.App) (err error) {
|
func (d *dialerMock) Init(a *app.App) (err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@ -40,6 +40,8 @@ type StreamPool interface {
|
|||||||
AddTagsCtx(ctx context.Context, tags ...string) error
|
AddTagsCtx(ctx context.Context, tags ...string) error
|
||||||
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
|
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
|
||||||
RemoveTagsCtx(ctx context.Context, tags ...string) error
|
RemoveTagsCtx(ctx context.Context, tags ...string) error
|
||||||
|
// Streams gets all streams for specific tags
|
||||||
|
Streams(tags ...string) (streams []drpc.Stream)
|
||||||
// Close closes all streams
|
// Close closes all streams
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
@ -73,6 +75,17 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
for _, tag := range tags {
|
||||||
|
for _, id := range s.streamIdsByTag[tag] {
|
||||||
|
streams = append(streams, s.streams[id].stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream {
|
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user