diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index 9db4d3db..5ec3ce98 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -132,7 +132,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) d.syncStatus.RemoveAllExcept(p.Id(), filteredIds, stateCounter) - d.pingTreesInCache(ctx, filteredIds) + d.syncTrees(ctx, p.Id(), filteredIds) d.log.Info("sync done:", zap.Int("newIds", len(newIds)), zap.Int("changedIds", len(changedIds)), @@ -143,7 +143,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) return } -func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { +func (d *diffSyncer) syncTrees(ctx context.Context, peerId string, trees []string) { for _, tId := range trees { tree, err := d.cache.GetTree(ctx, d.spaceId, tId) if err != nil { @@ -159,10 +159,10 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - if err = syncTree.Ping(ctx); err != nil { - d.log.WarnCtx(ctx, "synctree.Ping error", zap.Error(err), zap.String("treeId", tId)) + if err = syncTree.SyncWithPeer(ctx, peerId); err != nil { + d.log.WarnCtx(ctx, "synctree.SyncWithPeer error", zap.Error(err), zap.String("treeId", tId)) } else { - d.log.DebugCtx(ctx, "success tree ping", zap.String("treeId", tId)) + d.log.DebugCtx(ctx, "success synctree.SyncWithPeer", zap.String("treeId", tId)) } } } diff --git a/commonspace/object/tree/objecttree/treegraph.go b/commonspace/object/tree/objecttree/treegraph.go index 9bb9611e..722ef975 100644 --- a/commonspace/object/tree/objecttree/treegraph.go +++ b/commonspace/object/tree/objecttree/treegraph.go @@ -1,6 +1,5 @@ -//go:build ((!linux && !darwin) || android || ios || nographviz) && !amd64 -// +build !linux,!darwin android ios nographviz -// +build !amd64 +//go:build (!linux && !darwin) || android || ios || nographviz || windows +// +build !linux,!darwin android ios nographviz windows package objecttree diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 8218aa2e..992ae5c6 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -54,20 +54,6 @@ func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1) } -// BroadcastAsyncOrSendResponsible mocks base method. -func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. -func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1) -} - // CreateFullSyncRequest mocks base method. func (m *MockSyncClient) CreateFullSyncRequest(arg0 objecttree.ObjectTree, arg1, arg2 []string) (*treechangeproto.TreeSyncMessage, error) { m.ctrl.T.Helper() @@ -394,20 +380,6 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Lock", reflect.TypeOf((*MockSyncTree)(nil).Lock)) } -// Ping mocks base method. -func (m *MockSyncTree) Ping(arg0 context.Context) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ping", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// Ping indicates an expected call of Ping. -func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0) -} - // RLock mocks base method. func (m *MockSyncTree) RLock() { m.ctrl.T.Helper() @@ -486,6 +458,20 @@ func (mr *MockSyncTreeMockRecorder) Storage() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Storage", reflect.TypeOf((*MockSyncTree)(nil).Storage)) } +// SyncWithPeer mocks base method. +func (m *MockSyncTree) SyncWithPeer(arg0 context.Context, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SyncWithPeer", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SyncWithPeer indicates an expected call of SyncWithPeer. +func (mr *MockSyncTreeMockRecorder) SyncWithPeer(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncWithPeer", reflect.TypeOf((*MockSyncTree)(nil).SyncWithPeer), arg0, arg1) +} + // Unlock mocks base method. func (m *MockSyncTree) Unlock() { m.ctrl.T.Helper() diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index e858f190..6ad98c7c 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -12,7 +12,6 @@ import ( type SyncClient interface { RequestFactory Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) - BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) } @@ -52,18 +51,6 @@ func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *tree return s.MessagePool.SendPeer(ctx, peerId, objMsg) } -func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "") - if err != nil { - return - } - - if s.configuration.IsResponsible(s.spaceId) { - return s.MessagePool.SendResponsible(ctx, objMsg) - } - return s.Broadcast(ctx, message) -} - func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { payload, err := message.Marshal() if err != nil { diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index e2e1c2b1..c63dcc96 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -3,12 +3,10 @@ package synctree import ( "context" "errors" - "fmt" "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" - "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" @@ -16,7 +14,6 @@ import ( "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/nodeconf" - "github.com/gogo/protobuf/proto" "go.uber.org/zap" "sync/atomic" ) @@ -38,7 +35,7 @@ type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler ListenerSetter - Ping(ctx context.Context) (err error) + SyncWithPeer(ctx context.Context, peerId string) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -59,6 +56,10 @@ var log = logger.NewNamed("commonspace.synctree") var buildObjectTree = objecttree.BuildObjectTree var createSyncClient = newSyncClient +type ResponsiblePeersGetter interface { + GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) +} + type BuildDeps struct { SpaceId string ObjectSync objectsync.ObjectSync @@ -70,94 +71,13 @@ type BuildDeps struct { TreeStorage treestorage.TreeStorage TreeUsage *atomic.Int32 SyncStatus syncstatus.StatusUpdater + PeerGetter ResponsiblePeersGetter WaitTreeRemoteSync bool } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { - getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { - peerId, err := peer.CtxPeerId(ctx) - if err != nil { - log.WarnCtx(ctx, "peer not found in context, use first responsible") - peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] - } - - newTreeRequest := GetRequestFactory().CreateNewTreeRequest() - objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "") - if err != nil { - return - } - - resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) - if err != nil { - return - } - - msg = &treechangeproto.TreeSyncMessage{} - err = proto.Unmarshal(resp.Payload, msg) - return - } - - waitTree := func(wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { - if !wait { - return getTreeRemote() - } - for { - msg, err = getTreeRemote() - if err == nil { - return - } - select { - case <-ctx.Done(): - err = fmt.Errorf("waiting for object %s interrupted, context closed", id) - return - default: - break - } - } - } - - deps.TreeStorage, err = deps.SpaceStorage.TreeStorage(id) - if err == nil { - return buildSyncTree(ctx, false, deps) - } - - if err != nil && err != treestorage.ErrUnknownTreeId { - return - } - - status, err := deps.SpaceStorage.TreeDeletedStatus(id) - if err != nil { - return - } - if status != "" { - err = spacestorage.ErrTreeStorageAlreadyDeleted - return - } - - resp, err := waitTree(deps.WaitTreeRemoteSync) - if err != nil { - return - } - if resp.GetContent().GetFullSyncResponse() == nil { - err = fmt.Errorf("expected to get full sync response, but got something else") - return - } - fullSyncResp := resp.GetContent().GetFullSyncResponse() - - payload := treestorage.TreeStorageCreatePayload{ - RootRawChange: resp.RootChange, - Changes: fullSyncResp.Changes, - Heads: fullSyncResp.Heads, - } - - // basically building tree with in-memory storage and validating that it was without errors - log.With(zap.String("id", id)).DebugCtx(ctx, "validating tree") - err = objecttree.ValidateRawTree(payload, deps.AclList) - if err != nil { - return - } - // now we are sure that we can save it to the storage - deps.TreeStorage, err = deps.SpaceStorage.CreateTreeStorage(payload) + remoteGetter := treeRemoteGetter{treeId: id, deps: deps} + deps.TreeStorage, err = remoteGetter.getTree(ctx) if err != nil { return } @@ -308,11 +228,11 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) Ping(ctx context.Context) (err error) { +func (s *syncTree) SyncWithPeer(ctx context.Context, peerId string) (err error) { s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate) + return s.syncClient.SendWithReply(ctx, peerId, headUpdate, "") } func (s *syncTree) afterBuild() { diff --git a/commonspace/object/tree/synctree/treeremotegetter.go b/commonspace/object/tree/synctree/treeremotegetter.go new file mode 100644 index 00000000..561d850c --- /dev/null +++ b/commonspace/object/tree/synctree/treeremotegetter.go @@ -0,0 +1,140 @@ +package synctree + +import ( + "context" + "fmt" + "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" + "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" + "github.com/anytypeio/any-sync/commonspace/spacestorage" + "github.com/anytypeio/any-sync/net/peer" + "github.com/gogo/protobuf/proto" + "go.uber.org/zap" + "time" +) + +type treeRemoteGetter struct { + deps BuildDeps + treeId string +} + +func newRemoteGetter(treeId string, deps BuildDeps) treeRemoteGetter { + return treeRemoteGetter{treeId: treeId, deps: deps} +} + +func (t treeRemoteGetter) getPeers(ctx context.Context) (peerIds []string, err error) { + peerId, err := peer.CtxPeerId(ctx) + if err == nil { + peerIds = []string{peerId} + return + } + err = nil + log.WarnCtx(ctx, "peer not found in context, use responsible") + respPeers, err := t.deps.PeerGetter.GetResponsiblePeers(ctx) + if err != nil { + return + } + if len(respPeers) == 0 { + err = fmt.Errorf("no responsible peers") + return + } + for _, p := range respPeers { + peerIds = append(peerIds, p.Id()) + } + return +} + +func (t treeRemoteGetter) treeRequest(ctx context.Context, peerId string) (msg *treechangeproto.TreeSyncMessage, err error) { + newTreeRequest := GetRequestFactory().CreateNewTreeRequest() + objMsg, err := marshallTreeMessage(newTreeRequest, t.deps.SpaceId, t.treeId, "") + if err != nil { + return + } + + resp, err := t.deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) + if err != nil { + return + } + + msg = &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(resp.Payload, msg) + return +} + +func (t treeRemoteGetter) treeRequestLoop(ctx context.Context, wait bool) (msg *treechangeproto.TreeSyncMessage, err error) { + peerIdx := 0 +Loop: + for { + select { + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) + default: + break + } + availablePeers, err := t.getPeers(ctx) + if err != nil { + if !wait { + return nil, err + } + select { + // wait for peers to connect + case <-time.After(1 * time.Second): + continue Loop + case <-ctx.Done(): + return nil, fmt.Errorf("waiting for object %s interrupted, context closed", t.treeId) + } + } + + peerIdx = peerIdx % len(availablePeers) + msg, err = t.treeRequest(ctx, availablePeers[peerIdx]) + if err == nil || !wait { + return msg, err + } + peerIdx++ + } +} + +func (t treeRemoteGetter) getTree(ctx context.Context) (treeStorage treestorage.TreeStorage, err error) { + treeStorage, err = t.deps.SpaceStorage.TreeStorage(t.treeId) + if err == nil { + return + } + + if err != nil && err != treestorage.ErrUnknownTreeId { + return + } + + status, err := t.deps.SpaceStorage.TreeDeletedStatus(t.treeId) + if err != nil { + return + } + if status != "" { + err = spacestorage.ErrTreeStorageAlreadyDeleted + return + } + + resp, err := t.treeRequestLoop(ctx, t.deps.WaitTreeRemoteSync) + if err != nil { + return + } + if resp.GetContent().GetFullSyncResponse() == nil { + err = fmt.Errorf("expected to get full sync response, but got something else") + return + } + fullSyncResp := resp.GetContent().GetFullSyncResponse() + + payload := treestorage.TreeStorageCreatePayload{ + RootRawChange: resp.RootChange, + Changes: fullSyncResp.Changes, + Heads: fullSyncResp.Heads, + } + + // basically building tree with in-memory storage and validating that it was without errors + log.With(zap.String("id", t.treeId)).DebugCtx(ctx, "validating tree") + err = objecttree.ValidateRawTree(payload, t.deps.AclList) + if err != nil { + return + } + // now we are sure that we can save it to the storage + return t.deps.SpaceStorage.CreateTreeStorage(payload) +} diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go index 47ba14c4..c47fc30d 100644 --- a/commonspace/objectsync/msgpool.go +++ b/commonspace/objectsync/msgpool.go @@ -86,10 +86,6 @@ func (s *messagePool) SendPeer(ctx context.Context, peerId string, msg *spacesyn return s.PeerManager.SendPeer(ctx, peerId, msg) } -func (s *messagePool) SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { - s.updateLastUsage() - return s.PeerManager.SendResponsible(ctx, msg) -} func (s *messagePool) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { s.updateLastUsage() return s.PeerManager.Broadcast(ctx, msg) diff --git a/commonspace/peermanager/mock_peermanager/mock_peermanager.go b/commonspace/peermanager/mock_peermanager/mock_peermanager.go index ae97fe7b..f7d0ce74 100644 --- a/commonspace/peermanager/mock_peermanager/mock_peermanager.go +++ b/commonspace/peermanager/mock_peermanager/mock_peermanager.go @@ -78,17 +78,3 @@ func (mr *MockPeerManagerMockRecorder) SendPeer(arg0, arg1, arg2 interface{}) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendPeer", reflect.TypeOf((*MockPeerManager)(nil).SendPeer), arg0, arg1, arg2) } - -// SendResponsible mocks base method. -func (m *MockPeerManager) SendResponsible(arg0 context.Context, arg1 *spacesyncproto.ObjectSyncMessage) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendResponsible", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// SendResponsible indicates an expected call of SendResponsible. -func (mr *MockPeerManagerMockRecorder) SendResponsible(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendResponsible", reflect.TypeOf((*MockPeerManager)(nil).SendResponsible), arg0, arg1) -} diff --git a/commonspace/peermanager/peermanager.go b/commonspace/peermanager/peermanager.go index 373c3a51..676d000c 100644 --- a/commonspace/peermanager/peermanager.go +++ b/commonspace/peermanager/peermanager.go @@ -13,8 +13,6 @@ const CName = "common.commonspace.peermanager" type PeerManager interface { // SendPeer sends a message to a stream by peerId SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) - // SendResponsible sends a message to responsible peers streams - SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) // Broadcast sends a message to all subscribed peers Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) // GetResponsiblePeers dials or gets from cache responsible peers to unary operations diff --git a/commonspace/space.go b/commonspace/space.go index 378490d3..4cb02e9f 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -16,6 +16,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync" + "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/spacestorage" @@ -122,6 +123,7 @@ type space struct { aclList *syncacl.SyncAcl configuration nodeconf.Configuration settingsObject settings.SettingsObject + peerManager peermanager.PeerManager handleQueue multiqueue.MultiQueue[HandleMessage] @@ -295,6 +297,7 @@ func (s *space) PutTree(ctx context.Context, payload treestorage.TreeStorageCrea SpaceStorage: s.storage, TreeUsage: &s.treesUsed, SyncStatus: s.syncStatus, + PeerGetter: s.peerManager, } return synctree.PutSyncTree(ctx, payload, deps) } @@ -326,6 +329,7 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t TreeUsage: &s.treesUsed, SyncStatus: s.syncStatus, WaitTreeRemoteSync: opts.WaitTreeRemoteSync, + PeerGetter: s.peerManager, } return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 80e48ff0..c26839b8 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -139,6 +139,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { cache: getter, account: s.account, configuration: lastConfiguration, + peerManager: peerManager, storage: st, } return sp, nil diff --git a/go.mod b/go.mod index 599c7e82..5b385c0e 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/anytypeio/go-chash v0.0.2 github.com/awalterschulze/gographviz v2.0.3+incompatible github.com/cespare/xxhash v1.1.0 - github.com/cheggaaa/mb/v3 v3.0.0 + github.com/cheggaaa/mb/v3 v3.0.1 github.com/goccy/go-graphviz v0.0.9 github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 diff --git a/go.sum b/go.sum index a4f1a4ba..f1b9201c 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,8 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/cheggaaa/mb/v3 v3.0.0 h1:+FkV4fAefQfJSsfMtWC9cnSrVYKd3TXcerPTwRuWWfE= -github.com/cheggaaa/mb/v3 v3.0.0/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= +github.com/cheggaaa/mb/v3 v3.0.1 h1:BuEOipGTqybXYi5KXVCpqhR1LWN2lrurq6UrH+VBhXc= +github.com/cheggaaa/mb/v3 v3.0.1/go.mod h1:zCt2QeYukhd/g0bIdNqF+b/kKz1hnLFNDkP49qN5kqI= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= diff --git a/net/dialer/dialer.go b/net/dialer/dialer.go index 24718286..2076f959 100644 --- a/net/dialer/dialer.go +++ b/net/dialer/dialer.go @@ -34,6 +34,7 @@ func New() Dialer { type Dialer interface { Dial(ctx context.Context, peerId string) (peer peer.Peer, err error) UpdateAddrs(addrs map[string][]string) + SetPeerAddrs(peerId string, addrs []string) app.Component } @@ -62,6 +63,15 @@ func (d *dialer) UpdateAddrs(addrs map[string][]string) { d.mu.Unlock() } +func (d *dialer) SetPeerAddrs(peerId string, addrs []string) { + d.mu.Lock() + defer d.mu.Unlock() + if d.peerAddrs == nil { + return + } + d.peerAddrs[peerId] = addrs +} + func (d *dialer) Dial(ctx context.Context, peerId string) (p peer.Peer, err error) { d.mu.RLock() defer d.mu.RUnlock() diff --git a/net/pool/pool_test.go b/net/pool/pool_test.go index 94fde050..f913333c 100644 --- a/net/pool/pool_test.go +++ b/net/pool/pool_test.go @@ -160,6 +160,10 @@ func (d *dialerMock) UpdateAddrs(addrs map[string][]string) { return } +func (d *dialerMock) SetPeerAddrs(peerId string, addrs []string) { + return +} + func (d *dialerMock) Init(a *app.App) (err error) { return } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 8319e972..01527d14 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -40,6 +40,8 @@ type StreamPool interface { AddTagsCtx(ctx context.Context, tags ...string) error // RemoveTagsCtx removes tags from stream, stream will be extracted from ctx RemoveTagsCtx(ctx context.Context, tags ...string) error + // Streams gets all streams for specific tags + Streams(tags ...string) (streams []drpc.Stream) // Close closes all streams Close() error } @@ -73,6 +75,17 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st }() } +func (s *streamPool) Streams(tags ...string) (streams []drpc.Stream) { + s.mu.Lock() + defer s.mu.Unlock() + for _, tag := range tags { + for _, id := range s.streamIdsByTag[tag] { + streams = append(streams, s.streams[id].stream) + } + } + return +} + func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream { s.mu.Lock() defer s.mu.Unlock()