Helper methods for local streams etc

This commit is contained in:
mcrakhman 2023-02-01 23:21:02 +01:00
parent 8ff7cc9e16
commit 3630744086
No known key found for this signature in database
GPG Key ID: DED12CFEF5B8396B
3 changed files with 27 additions and 0 deletions

View File

@ -34,6 +34,7 @@ func New() Dialer {
type Dialer interface {
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
UpdateAddrs(addrs map[string][]string)
SetPeerAddrs(peerId string, addrs []string)
app.Component
}
@ -62,6 +63,15 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) {
d.mu.Unlock()
}
func (d *dialer) SetPeerAddrs(peerId string, addrs []string) {
d.mu.Lock()
defer d.mu.Unlock()
if d.peerAddrs == nil {
return
}
d.peerAddrs[peerId] = addrs
}
func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) {
d.mu.RLock()
defer d.mu.RUnlock()

View File

@ -160,6 +160,10 @@ func (d *dialerMock) UpdateAddrs(addrs map[string][]string) {
return
}
func (d *dialerMock) SetPeerAddrs(peerId string, addrs []string) {
return
}
func (d *dialerMock) Init(a *app.App) (err error) {
return
}

View File

@ -40,6 +40,8 @@ type StreamPool interface {
AddTagsCtx(ctx context.Context, tags ...string) error
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
RemoveTagsCtx(ctx context.Context, tags ...string) error
// Streams gets all streams for specific tags
Streams(tags ...string) (streams []drpc.Stream)
// Close closes all streams
Close() error
}
@ -73,6 +75,17 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st
}()
}
func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) {
s.mu.Lock()
defer s.mu.Unlock()
for _, tag := range tags {
for _, id := range s.streamIdsByTag[tag] {
streams = append(streams, s.streams[id].stream)
}
}
return
}
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream {
s.mu.Lock()
defer s.mu.Unlock()