Merge pull request #16 from anytypeio/middleware-compatibility

Middleware fixes etc
This commit is contained in:
Sergey Cherepanov 2023-02-07 15:33:00 +03:00 committed by GitHub
commit 6b5b3d2b6b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 206 additions and 162 deletions

View File

@ -132,7 +132,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter)
d.pingTreesInCache(ctx, filteredIds)
d.syncTrees(ctx, p.Id(), filteredIds)
d.log.Info("sync done:", zap.Int("newIds", len(newIds)),
zap.Int("changedIds", len(changedIds)),
@ -143,7 +143,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
return
}
func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) {
for _, tId := range trees {
tree, err := d.cache.GetTree(ctx, d.spaceId, tId)
if err != nil {
@ -159,10 +159,10 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
// it may be already there (i.e. loaded)
// and build func will not be called, thus we won't sync the tree
// therefore we just do it manually
if err = syncTree.Ping(ctx); err != nil {
d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId))
if err = syncTree.SyncWithPeer(ctx, peerId); err != nil {
d.log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err), zap.String("treeId", tId))
} else {
d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId))
d.log.DebugCtx(ctx, "success synctree.SyncWithPeer", zap.String("treeId", tId))
}
}
}

View File

@ -1,6 +1,5 @@
//go:build ((!linux && !darwin) || android || ios || nographviz) && !amd64
// +build !linux,!darwin android ios nographviz
// +build !amd64
//go:build (!linux && !darwin) || android || ios || nographviz || windows
// +build !linux,!darwin android ios nographviz windows
package objecttree

View File

@ -54,20 +54,6 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1)
}
// BroadcastAsyncOrSendResponsible mocks base method.
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible.
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1)
}
// CreateFullSyncRequest mocks base method.
func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) {
m.ctrl.T.Helper()
@ -394,20 +380,6 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncTree)(nil).Lock))
}
// Ping mocks base method.
func (m *MockSyncTree) Ping(arg0 context.Context) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Ping", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// Ping indicates an expected call of Ping.
func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0)
}
// RLock mocks base method.
func (m *MockSyncTree) RLock() {
m.ctrl.T.Helper()
@ -486,6 +458,20 @@ func (mr *MockSyncTreeMockRecorder) Storage() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockSyncTree)(nil).Storage))
}
// SyncWithPeer mocks base method.
func (m *MockSyncTree) SyncWithPeer(arg0 context.Context, arg1 string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SyncWithPeer", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SyncWithPeer indicates an expected call of SyncWithPeer.
func (mr *MockSyncTreeMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1)
}
// Unlock mocks base method.
func (m *MockSyncTree) Unlock() {
m.ctrl.T.Helper()

View File

@ -12,7 +12,6 @@ import (
type SyncClient interface {
RequestFactory
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
}
@ -52,18 +51,6 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *tree
return s.MessagePool.SendPeer(ctx, peerId, objMsg)
}
func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "")
if err != nil {
return
}
if s.configuration.IsResponsible(s.spaceId) {
return s.MessagePool.SendResponsible(ctx, objMsg)
}
return s.Broadcast(ctx, message)
}
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal()
if err != nil {

View File

@ -3,12 +3,10 @@ package synctree
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/commonspace/object/acl/list"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
@ -16,7 +14,6 @@ import (
"github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/nodeconf"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"sync/atomic"
)
@ -38,7 +35,7 @@ type SyncTree interface {
objecttree.ObjectTree
synchandler.SyncHandler
ListenerSetter
Ping(ctx context.Context) (err error)
SyncWithPeer(ctx context.Context, peerId string) (err error)
}
// SyncTree sends head updates to sync service and also sends new changes to update listener
@ -59,6 +56,10 @@ var log = logger.NewNamed("commonspace.synctree")
var buildObjectTree = objecttree.BuildObjectTree
var createSyncClient = newSyncClient
type ResponsiblePeersGetter interface {
GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error)
}
type BuildDeps struct {
SpaceId string
ObjectSync objectsync.ObjectSync
@ -70,94 +71,13 @@ type BuildDeps struct {
TreeStorage treestorage.TreeStorage
TreeUsage *atomic.Int32
SyncStatus syncstatus.StatusUpdater
PeerGetter ResponsiblePeersGetter
WaitTreeRemoteSync bool
}
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err != nil {
log.WarnCtx(ctx, "peer not found in context, use first responsible")
peerId = deps.Configuration.NodeIds(deps.SpaceId)[0]
}
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "")
if err != nil {
return
}
resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
if err != nil {
return
}
msg = &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(resp.Payload, msg)
return
}
waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
if !wait {
return getTreeRemote()
}
for {
msg, err = getTreeRemote()
if err == nil {
return
}
select {
case <-ctx.Done():
err = fmt.Errorf("waiting for object %s interrupted, context closed", id)
return
default:
break
}
}
}
deps.TreeStorage, err = deps.SpaceStorage.TreeStorage(id)
if err == nil {
return buildSyncTree(ctx, false, deps)
}
if err != nil && err != treestorage.ErrUnknownTreeId {
return
}
status, err := deps.SpaceStorage.TreeDeletedStatus(id)
if err != nil {
return
}
if status != "" {
err = spacestorage.ErrTreeStorageAlreadyDeleted
return
}
resp, err := waitTree(deps.WaitTreeRemoteSync)
if err != nil {
return
}
if resp.GetContent().GetFullSyncResponse() == nil {
err = fmt.Errorf("expected to get full sync response, but got something else")
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := treestorage.TreeStorageCreatePayload{
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with in-memory storage and validating that it was without errors
log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree")
err = objecttree.ValidateRawTree(payload, deps.AclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload)
remoteGetter := treeRemoteGetter{treeId: id, deps: deps}
deps.TreeStorage, err = remoteGetter.getTree(ctx)
if err != nil {
return
}
@ -308,11 +228,11 @@ func (s *syncTree) checkAlive() (err error) {
return
}
func (s *syncTree) Ping(ctx context.Context) (err error) {
func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) {
s.Lock()
defer s.Unlock()
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate)
return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "")
}
func (s *syncTree) afterBuild() {

View File

@ -0,0 +1,140 @@
package synctree
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/object/tree/objecttree"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/net/peer"
"github.com/gogo/protobuf/proto"
"go.uber.org/zap"
"time"
)
type treeRemoteGetter struct {
deps BuildDeps
treeId string
}
func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter {
return treeRemoteGetter{treeId: treeId, deps: deps}
}
func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) {
peerId, err := peer.CtxPeerId(ctx)
if err == nil {
peerIds = []string{peerId}
return
}
err = nil
log.WarnCtx(ctx, "peer not found in context, use responsible")
respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx)
if err != nil {
return
}
if len(respPeers) == 0 {
err = fmt.Errorf("no responsible peers")
return
}
for _, p := range respPeers {
peerIds = append(peerIds, p.Id())
}
return
}
func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) {
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "")
if err != nil {
return
}
resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
if err != nil {
return
}
msg = &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(resp.Payload, msg)
return
}
func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) {
peerIdx := 0
Loop:
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
default:
break
}
availablePeers, err := t.getPeers(ctx)
if err != nil {
if !wait {
return nil, err
}
select {
// wait for peers to connect
case <-time.After(1 * time.Second):
continue Loop
case <-ctx.Done():
return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId)
}
}
peerIdx = peerIdx % len(availablePeers)
msg, err = t.treeRequest(ctx, availablePeers[peerIdx])
if err == nil || !wait {
return msg, err
}
peerIdx++
}
}
func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) {
treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId)
if err == nil {
return
}
if err != nil && err != treestorage.ErrUnknownTreeId {
return
}
status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId)
if err != nil {
return
}
if status != "" {
err = spacestorage.ErrTreeStorageAlreadyDeleted
return
}
resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync)
if err != nil {
return
}
if resp.GetContent().GetFullSyncResponse() == nil {
err = fmt.Errorf("expected to get full sync response, but got something else")
return
}
fullSyncResp := resp.GetContent().GetFullSyncResponse()
payload := treestorage.TreeStorageCreatePayload{
RootRawChange: resp.RootChange,
Changes: fullSyncResp.Changes,
Heads: fullSyncResp.Heads,
}
// basically building tree with in-memory storage and validating that it was without errors
log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree")
err = objecttree.ValidateRawTree(payload, t.deps.AclList)
if err != nil {
return
}
// now we are sure that we can save it to the storage
return t.deps.SpaceStorage.CreateTreeStorage(payload)
}

View File

@ -86,10 +86,6 @@ func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyn
return s.PeerManager.SendPeer(ctx, peerId, msg)
}
func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage()
return s.PeerManager.SendResponsible(ctx, msg)
}
func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
s.updateLastUsage()
return s.PeerManager.Broadcast(ctx, msg)

View File

@ -78,17 +78,3 @@ func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *g
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2)
}
// SendResponsible mocks base method.
func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1)
ret0, _ := ret[0].(error)
return ret0
}
// SendResponsible indicates an expected call of SendResponsible.
func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1)
}

View File

@ -13,8 +13,6 @@ const CName = "common.commonspace.peermanager"
type PeerManager interface {
// SendPeer sends a message to a stream by peerId
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
// SendResponsible sends a message to responsible peers streams
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
// Broadcast sends a message to all subscribed peers
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
// GetResponsiblePeers dials or gets from cache responsible peers to unary operations

View File

@ -16,6 +16,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/peermanager"
"github.com/anytypeio/any-sync/commonspace/settings"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
"github.com/anytypeio/any-sync/commonspace/spacestorage"
@ -122,6 +123,7 @@ type space struct {
aclList *syncacl.SyncAcl
configuration nodeconf.Configuration
settingsObject settings.SettingsObject
peerManager peermanager.PeerManager
handleQueue multiqueue.MultiQueue[HandleMessage]
@ -295,6 +297,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea
SpaceStorage: s.storage,
TreeUsage: &s.treesUsed,
SyncStatus: s.syncStatus,
PeerGetter: s.peerManager,
}
return synctree.PutSyncTree(ctx, payload, deps)
}
@ -326,6 +329,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
TreeUsage: &s.treesUsed,
SyncStatus: s.syncStatus,
WaitTreeRemoteSync: opts.WaitTreeRemoteSync,
PeerGetter: s.peerManager,
}
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
}

View File

@ -139,6 +139,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
cache: getter,
account: s.account,
configuration: lastConfiguration,
peerManager: peerManager,
storage: st,
}
return sp, nil

2
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/anytypeio/go-chash v0.0.2
github.com/awalterschulze/gographviz v2.0.3+incompatible
github.com/cespare/xxhash v1.1.0
github.com/cheggaaa/mb/v3 v3.0.0
github.com/cheggaaa/mb/v3 v3.0.1
github.com/goccy/go-graphviz v0.0.9
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0

4
go.sum
View File

@ -59,8 +59,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cheggaaa/mb/v3 v3.0.0 h1:+FkV4fAefQfJSsfMtWC9cnSrVYKd3TXcerPTwRuWWfE=
github.com/cheggaaa/mb/v3 v3.0.0/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI=
github.com/cheggaaa/mb/v3 v3.0.1 h1:BuEOipGTqybXYi5KXVCpqhR1LWN2lrurq6UrH+VBhXc=
github.com/cheggaaa/mb/v3 v3.0.1/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=

View File

@ -34,6 +34,7 @@ func New() Dialer {
type Dialer interface {
Dial(ctx context.Context, peerId string) (peer peer.Peer, err error)
UpdateAddrs(addrs map[string][]string)
SetPeerAddrs(peerId string, addrs []string)
app.Component
}
@ -62,6 +63,15 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) {
d.mu.Unlock()
}
func (d *dialer) SetPeerAddrs(peerId string, addrs []string) {
d.mu.Lock()
defer d.mu.Unlock()
if d.peerAddrs == nil {
return
}
d.peerAddrs[peerId] = addrs
}
func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) {
d.mu.RLock()
defer d.mu.RUnlock()

View File

@ -160,6 +160,10 @@ func (d *dialerMock) UpdateAddrs(addrs map[string][]string) {
return
}
func (d *dialerMock) SetPeerAddrs(peerId string, addrs []string) {
return
}
func (d *dialerMock) Init(a *app.App) (err error) {
return
}

View File

@ -40,6 +40,8 @@ type StreamPool interface {
AddTagsCtx(ctx context.Context, tags ...string) error
// RemoveTagsCtx removes tags from stream, stream will be extracted from ctx
RemoveTagsCtx(ctx context.Context, tags ...string) error
// Streams gets all streams for specific tags
Streams(tags ...string) (streams []drpc.Stream)
// Close closes all streams
Close() error
}
@ -73,6 +75,17 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st
}()
}
func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) {
s.mu.Lock()
defer s.mu.Unlock()
for _, tag := range tags {
for _, id := range s.streamIdsByTag[tag] {
streams = append(streams, s.streams[id].stream)
}
}
return
}
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream {
s.mu.Lock()
defer s.mu.Unlock()