commonspace with new streampool

This commit is contained in:
Sergey Cherepanov 2023-01-19 15:17:04 +03:00 committed by Mikhail Iudin
parent e4d8b4578d
commit 34848254be
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
18 changed files with 309 additions and 990 deletions

View File

@ -2,6 +2,7 @@ package headsync
import ( import (
"context" "context"
"fmt"
"github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/app/ldiff"
"github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
@ -91,7 +92,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
return err return err
} }
for _, p := range peers { for _, p := range peers {
if err := d.syncWithPeer(ctx, p); err != nil { if err = d.syncWithPeer(ctx, p); err != nil {
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
} }
} }
@ -110,7 +111,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
err = rpcerr.Unwrap(err) err = rpcerr.Unwrap(err)
if err != nil && err != spacesyncproto.ErrSpaceMissing { if err != nil && err != spacesyncproto.ErrSpaceMissing {
d.syncStatus.SetNodesOnline(p.Id(), false) d.syncStatus.SetNodesOnline(p.Id(), false)
return err return fmt.Errorf("diff error: %v", err)
} }
d.syncStatus.SetNodesOnline(p.Id(), true) d.syncStatus.SetNodesOnline(p.Id(), true)
@ -148,7 +149,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
// it may be already there (i.e. loaded) // it may be already there (i.e. loaded)
// and build func will not be called, thus we won't sync the tree // and build func will not be called, thus we won't sync the tree
// therefore we just do it manually // therefore we just do it manually
syncTree.Ping() _ = syncTree.Ping(ctx)
} }
} }

View File

@ -9,10 +9,10 @@ import (
type SyncAcl struct { type SyncAcl struct {
list.AclList list.AclList
synchandler.SyncHandler synchandler.SyncHandler
streamPool objectsync.StreamPool streamPool objectsync.MessagePool
} }
func NewSyncAcl(aclList list.AclList, streamPool objectsync.StreamPool) *SyncAcl { func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl {
return &SyncAcl{ return &SyncAcl{
AclList: aclList, AclList: aclList,
SyncHandler: nil, SyncHandler: nil,

View File

@ -40,32 +40,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
return m.recorder return m.recorder
} }
// BroadcastAsync mocks base method. // Broadcast mocks base method.
func (m *MockSyncClient) BroadcastAsync(arg0 *treechangeproto.TreeSyncMessage) error { func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BroadcastAsync", arg0) ret := m.ctrl.Call(m, "Broadcast", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// BroadcastAsync indicates an expected call of BroadcastAsync. // Broadcast indicates an expected call of Broadcast.
func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.Call { func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1)
} }
// BroadcastAsyncOrSendResponsible mocks base method. // BroadcastAsyncOrSendResponsible mocks base method.
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *treechangeproto.TreeSyncMessage) error { func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0) ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. // BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible.
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call { func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1)
} }
// CreateFullSyncRequest mocks base method. // CreateFullSyncRequest mocks base method.
@ -126,18 +126,18 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest))
} }
// SendAsync mocks base method. // SendWithReply mocks base method.
func (m *MockSyncClient) SendAsync(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) error { func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1 string, arg2 *treechangeproto.TreeSyncMessage, arg3 string) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "SendAsync", arg0, arg1, arg2) ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// SendAsync indicates an expected call of SendAsync. // SendWithReply indicates an expected call of SendWithReply.
func (mr *MockSyncClientMockRecorder) SendAsync(arg0, arg1, arg2 interface{}) *gomock.Call { func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAsync", reflect.TypeOf((*MockSyncClient)(nil).SendAsync), arg0, arg1, arg2) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3)
} }
// MockSyncTree is a mock of SyncTree interface. // MockSyncTree is a mock of SyncTree interface.
@ -395,17 +395,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call {
} }
// Ping mocks base method. // Ping mocks base method.
func (m *MockSyncTree) Ping() error { func (m *MockSyncTree) Ping(arg0 context.Context) error {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Ping") ret := m.ctrl.Call(m, "Ping", arg0)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0
} }
// Ping indicates an expected call of Ping. // Ping indicates an expected call of Ping.
func (mr *MockSyncTreeMockRecorder) Ping() *gomock.Call { func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping)) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0)
} }
// RLock mocks base method. // RLock mocks base method.

View File

@ -1,6 +1,7 @@
package synctree package synctree
import ( import (
"context"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
) )
@ -17,20 +18,20 @@ func newQueuedClient(client SyncClient, queue objectsync.ActionQueue) SyncClient
} }
} }
func (q *queuedClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { func (q *queuedClient) Broadcast(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
return q.queue.Send(func() error { return q.queue.Send(func() error {
return q.SyncClient.BroadcastAsync(message) return q.SyncClient.Broadcast(ctx, message)
}) })
} }
func (q *queuedClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { func (q *queuedClient) SendWithReply(ctx context.Context, peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
return q.queue.Send(func() error { return q.queue.Send(func() error {
return q.SyncClient.SendAsync(peerId, message, replyId) return q.SyncClient.SendWithReply(ctx, peerId, message, replyId)
}) })
} }
func (q *queuedClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { func (q *queuedClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
return q.queue.Send(func() error { return q.queue.Send(func() error {
return q.SyncClient.BroadcastAsyncOrSendResponsible(message) return q.SyncClient.BroadcastAsyncOrSendResponsible(ctx, message)
}) })
} }

View File

@ -2,6 +2,7 @@
package synctree package synctree
import ( import (
"context"
"github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
@ -11,72 +12,61 @@ import (
type SyncClient interface { type SyncClient interface {
RequestFactory RequestFactory
BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
} }
type syncClient struct { type syncClient struct {
objectsync.StreamPool objectsync.MessagePool
RequestFactory RequestFactory
spaceId string spaceId string
connector confconnector.ConfConnector connector confconnector.ConfConnector
configuration nodeconf.Configuration configuration nodeconf.Configuration
checker objectsync.StreamChecker
} }
func newSyncClient( func newSyncClient(
spaceId string, spaceId string,
pool objectsync.StreamPool, pool objectsync.MessagePool,
factory RequestFactory, factory RequestFactory,
configuration nodeconf.Configuration, configuration nodeconf.Configuration) SyncClient {
checker objectsync.StreamChecker) SyncClient {
return &syncClient{ return &syncClient{
StreamPool: pool, MessagePool: pool,
RequestFactory: factory, RequestFactory: factory,
configuration: configuration, configuration: configuration,
checker: checker,
spaceId: spaceId, spaceId: spaceId,
} }
} }
func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
if err != nil { if err != nil {
return return
} }
s.checker.CheckResponsiblePeers() return s.MessagePool.Broadcast(ctx, objMsg)
return s.StreamPool.BroadcastAsync(objMsg)
} }
func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) {
err = s.checker.CheckPeerConnection(peerId) objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, replyId)
if err != nil { if err != nil {
return return
} }
return s.MessagePool.SendPeer(ctx, peerId, objMsg)
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId)
if err != nil {
return
}
return s.StreamPool.SendAsync([]string{peerId}, objMsg)
} }
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "")
if err != nil { if err != nil {
return return
} }
if s.configuration.IsResponsible(s.spaceId) { if s.configuration.IsResponsible(s.spaceId) {
s.checker.CheckResponsiblePeers() return s.MessagePool.SendResponsible(ctx, objMsg)
return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg)
} }
return s.BroadcastAsync(message) return s.Broadcast(ctx, message)
} }
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
payload, err := message.Marshal() payload, err := message.Marshal()
if err != nil { if err != nil {
return return
@ -84,7 +74,8 @@ func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId s
objMsg = &spacesyncproto.ObjectSyncMessage{ objMsg = &spacesyncproto.ObjectSyncMessage{
ReplyId: replyId, ReplyId: replyId,
Payload: payload, Payload: payload,
ObjectId: id, ObjectId: objectId,
SpaceId: spaceId,
} }
return return
} }

View File

@ -30,15 +30,10 @@ type HeadNotifiable interface {
UpdateHeads(id string, heads []string) UpdateHeads(id string, heads []string)
} }
type ListenerSetter interface {
SetListener(listener updatelistener.UpdateListener)
}
type SyncTree interface { type SyncTree interface {
objecttree.ObjectTree objecttree.ObjectTree
synchandler.SyncHandler synchandler.SyncHandler
ListenerSetter Ping(ctx context.Context) (err error)
Ping() (err error)
} }
// SyncTree sends head updates to sync service and also sends new changes to update listener // SyncTree sends head updates to sync service and also sends new changes to update listener
@ -78,34 +73,24 @@ func newWrappedSyncClient(
factory RequestFactory, factory RequestFactory,
objectSync objectsync.ObjectSync, objectSync objectsync.ObjectSync,
configuration nodeconf.Configuration) SyncClient { configuration nodeconf.Configuration) SyncClient {
syncClient := newSyncClient(spaceId, objectSync.StreamPool(), factory, configuration, objectSync.StreamChecker()) syncClient := newSyncClient(spaceId, objectSync.MessagePool(), factory, configuration)
return newQueuedClient(syncClient, objectSync.ActionQueue()) return newQueuedClient(syncClient, objectSync.ActionQueue())
} }
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
streamChecker := deps.ObjectSync.StreamChecker()
peerId, err := peer.CtxPeerId(ctx) peerId, err := peer.CtxPeerId(ctx)
if err != nil { if err != nil {
streamChecker.CheckResponsiblePeers() peerId = deps.Configuration.NodeIds(deps.SpaceId)[0]
peerId, err = streamChecker.FirstResponsiblePeer()
if err != nil {
return
}
} }
newTreeRequest := GetRequestFactory().CreateNewTreeRequest() newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
objMsg, err := marshallTreeMessage(newTreeRequest, id, "") objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "")
if err != nil { if err != nil {
return return
} }
err = deps.ObjectSync.StreamChecker().CheckPeerConnection(peerId) resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
if err != nil {
return
}
resp, err := deps.ObjectSync.StreamPool().SendSync(peerId, objMsg)
if err != nil { if err != nil {
return return
} }
@ -218,16 +203,13 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
if isFirstBuild { if isFirstBuild {
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
// send to everybody, because everybody should know that the node or client got new tree // send to everybody, because everybody should know that the node or client got new tree
syncTree.syncClient.BroadcastAsync(headUpdate) if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil {
log.Error("broadcast error", zap.Error(e))
}
} }
return return
} }
func (s *syncTree) SetListener(listener updatelistener.UpdateListener) {
// this should be called under lock
s.listener = listener
}
func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) { func (s *syncTree) IterateFrom(id string, convert objecttree.ChangeConvertFunc, iterate objecttree.ChangeIterateFunc) (err error) {
if err = s.checkAlive(); err != nil { if err = s.checkAlive(); err != nil {
return return
@ -255,7 +237,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
} }
s.syncStatus.HeadsChange(s.Id(), res.Heads) s.syncStatus.HeadsChange(s.Id(), res.Heads)
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate) err = s.syncClient.Broadcast(ctx, headUpdate)
return return
} }
@ -282,7 +264,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
s.notifiable.UpdateHeads(s.Id(), res.Heads) s.notifiable.UpdateHeads(s.Id(), res.Heads)
} }
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
err = s.syncClient.BroadcastAsync(headUpdate) err = s.syncClient.Broadcast(ctx, headUpdate)
} }
return return
} }
@ -324,11 +306,11 @@ func (s *syncTree) checkAlive() (err error) {
return return
} }
func (s *syncTree) Ping() (err error) { func (s *syncTree) Ping(ctx context.Context) (err error) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
headUpdate := s.syncClient.CreateHeadUpdate(s, nil) headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate)
} }
func (s *syncTree) afterBuild() { func (s *syncTree) afterBuild() {

View File

@ -111,7 +111,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
return return
} }
return s.syncClient.SendAsync(senderId, fullRequest, replyId) return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId)
} }
if s.alreadyHasHeads(objTree, update.Heads) { if s.alreadyHasHeads(objTree, update.Heads) {
@ -135,7 +135,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
return return
} }
return s.syncClient.SendAsync(senderId, fullRequest, replyId) return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId)
} }
func (s *syncTreeHandler) handleFullSyncRequest( func (s *syncTreeHandler) handleFullSyncRequest(
@ -159,7 +159,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
if err != nil { if err != nil {
log.With(zap.Error(err)).Debug("full sync request finished with error") log.With(zap.Error(err)).Debug("full sync request finished with error")
s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId)
return return
} else if fullResponse != nil { } else if fullResponse != nil {
log.Debug("full sync response sent") log.Debug("full sync response sent")
@ -180,7 +180,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
return return
} }
return s.syncClient.SendAsync(senderId, fullResponse, replyId) return s.syncClient.SendWithReply(ctx, senderId, fullResponse, replyId)
} }
func (s *syncTreeHandler) handleFullSyncResponse( func (s *syncTreeHandler) handleFullSyncResponse(

View File

@ -0,0 +1,120 @@
package objectsync
import (
"context"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"go.uber.org/zap"
"strconv"
"strings"
"sync"
"sync/atomic"
)
type StreamManager interface {
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
}
// MessagePool can be made generic to work with different streams
type MessagePool interface {
synchandler.SyncHandler
StreamManager
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
}
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
type responseWaiter struct {
ch chan *spacesyncproto.ObjectSyncMessage
}
type messagePool struct {
sync.Mutex
StreamManager
messageHandler MessageHandler
waiters map[string]responseWaiter
waitersMx sync.Mutex
counter atomic.Uint64
queue ActionQueue
}
func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool {
s := &messagePool{
StreamManager: streamManager,
messageHandler: messageHandler,
waiters: make(map[string]responseWaiter),
queue: NewDefaultActionQueue(),
}
return s
}
func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
newCounter := s.counter.Add(1)
msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter)
s.waitersMx.Lock()
waiter := responseWaiter{
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
}
s.waiters[msg.ReplyId] = waiter
s.waitersMx.Unlock()
err = s.SendPeer(ctx, peerId, msg)
if err != nil {
return
}
select {
case <-ctx.Done():
s.waitersMx.Lock()
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting")
err = ctx.Err()
case reply = <-waiter.ch:
// success
}
return
}
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
if msg.ReplyId != "" {
// we got reply, send it to waiter
if s.stopWaiter(msg) {
return
}
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
return
}
return s.queue.Send(func() error {
if e := s.messageHandler(ctx, senderId, msg); e != nil {
log.Info("handle message error", zap.Error(e))
}
return nil
})
}
func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool {
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
if exists {
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
return true
}
s.waitersMx.Unlock()
return false
}
func genReplyKey(peerId, treeId string, counter uint64) string {
b := &strings.Builder{}
b.WriteString(peerId)
b.WriteString(".")
b.WriteString(treeId)
b.WriteString(".")
b.WriteString(strconv.FormatUint(counter, 36))
return b.String()
}

View File

@ -5,7 +5,6 @@ import (
"context" "context"
"github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/logger"
"github.com/anytypeio/any-sync/app/ocache" "github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
@ -18,19 +17,17 @@ var log = logger.NewNamed("commonspace.objectsync")
type ObjectSync interface { type ObjectSync interface {
ocache.ObjectLastUsage ocache.ObjectLastUsage
synchandler.SyncHandler synchandler.SyncHandler
StreamPool() StreamPool MessagePool() MessagePool
StreamChecker() StreamChecker
ActionQueue() ActionQueue ActionQueue() ActionQueue
Init() Init(getter syncobjectgetter.SyncObjectGetter)
Close() (err error) Close() (err error)
} }
type objectSync struct { type objectSync struct {
spaceId string spaceId string
streamPool StreamPool streamPool MessagePool
checker StreamChecker
objectGetter syncobjectgetter.SyncObjectGetter objectGetter syncobjectgetter.SyncObjectGetter
actionQueue ActionQueue actionQueue ActionQueue
@ -38,28 +35,14 @@ type objectSync struct {
cancelSync context.CancelFunc cancelSync context.CancelFunc
} }
func NewObjectSync( func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync ObjectSync) {
spaceId string, msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
confConnector confconnector.ConfConnector,
objectGetter syncobjectgetter.SyncObjectGetter) (objectSync ObjectSync) {
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
return objectSync.HandleMessage(ctx, senderId, message) return objectSync.HandleMessage(ctx, senderId, message)
}) })
clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
syncLog := log.With(zap.String("id", spaceId))
syncCtx, cancel := context.WithCancel(context.Background()) syncCtx, cancel := context.WithCancel(context.Background())
checker := NewStreamChecker(
spaceId,
confConnector,
streamPool,
clientFactory,
syncCtx,
syncLog)
objectSync = newObjectSync( objectSync = newObjectSync(
spaceId, spaceId,
streamPool, msgPool,
checker,
objectGetter,
syncCtx, syncCtx,
cancel) cancel)
return return
@ -67,36 +50,33 @@ func NewObjectSync(
func newObjectSync( func newObjectSync(
spaceId string, spaceId string,
streamPool StreamPool, streamPool MessagePool,
checker StreamChecker,
objectGetter syncobjectgetter.SyncObjectGetter,
syncCtx context.Context, syncCtx context.Context,
cancel context.CancelFunc, cancel context.CancelFunc,
) *objectSync { ) *objectSync {
return &objectSync{ return &objectSync{
objectGetter: objectGetter, streamPool: streamPool,
streamPool: streamPool, spaceId: spaceId,
spaceId: spaceId, syncCtx: syncCtx,
checker: checker, cancelSync: cancel,
syncCtx: syncCtx, actionQueue: NewDefaultActionQueue(),
cancelSync: cancel,
actionQueue: NewDefaultActionQueue(),
} }
} }
func (s *objectSync) Init() { func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) {
s.objectGetter = objectGetter
s.actionQueue.Run() s.actionQueue.Run()
go s.checker.CheckResponsiblePeers()
} }
func (s *objectSync) Close() (err error) { func (s *objectSync) Close() (err error) {
s.actionQueue.Close() s.actionQueue.Close()
s.cancelSync() s.cancelSync()
return s.streamPool.Close() return
} }
func (s *objectSync) LastUsage() time.Time { func (s *objectSync) LastUsage() time.Time {
return s.streamPool.LastUsage() // TODO: [che]
return time.Now()
} }
func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
@ -108,14 +88,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
return obj.HandleMessage(ctx, senderId, message) return obj.HandleMessage(ctx, senderId, message)
} }
func (s *objectSync) StreamPool() StreamPool { func (s *objectSync) MessagePool() MessagePool {
return s.streamPool return s.streamPool
} }
func (s *objectSync) StreamChecker() StreamChecker {
return s.checker
}
func (s *objectSync) ActionQueue() ActionQueue { func (s *objectSync) ActionQueue() ActionQueue {
return s.actionQueue return s.actionQueue
} }

View File

@ -1,146 +0,0 @@
package objectsync
import (
"context"
"fmt"
"github.com/anytypeio/any-sync/commonspace/confconnector"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
"go.uber.org/atomic"
"go.uber.org/zap"
"golang.org/x/exp/slices"
"time"
)
type StreamChecker interface {
CheckResponsiblePeers()
CheckPeerConnection(peerId string) (err error)
FirstResponsiblePeer() (peerId string, err error)
}
type streamChecker struct {
spaceId string
connector confconnector.ConfConnector
streamPool StreamPool
clientFactory spacesyncproto.ClientFactory
log *zap.Logger
syncCtx context.Context
lastCheck *atomic.Time
}
const streamCheckerInterval = time.Second * 5
func NewStreamChecker(
spaceId string,
connector confconnector.ConfConnector,
streamPool StreamPool,
clientFactory spacesyncproto.ClientFactory,
syncCtx context.Context,
log *zap.Logger) StreamChecker {
return &streamChecker{
spaceId: spaceId,
connector: connector,
streamPool: streamPool,
clientFactory: clientFactory,
log: log,
syncCtx: syncCtx,
lastCheck: atomic.NewTime(time.Time{}),
}
}
func (s *streamChecker) CheckResponsiblePeers() {
lastCheck := s.lastCheck.Load()
now := time.Now()
if lastCheck.Add(streamCheckerInterval).After(now) {
return
}
s.lastCheck.Store(now)
var (
activeNodeIds []string
configuration = s.connector.Configuration()
)
nodeIds := configuration.NodeIds(s.spaceId)
for _, nodeId := range nodeIds {
if s.streamPool.HasActiveStream(nodeId) {
s.log.Debug("has active stream for", zap.String("id", nodeId))
activeNodeIds = append(activeNodeIds, nodeId)
continue
}
}
s.log.Debug("total streams", zap.Int("total", len(activeNodeIds)))
newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds)
if err != nil {
s.log.Error("failed to dial peers", zap.Error(err))
return
}
for _, p := range newPeers {
err := s.createStream(p)
if err != nil {
log.With(zap.Error(err)).Error("failed to create stream")
continue
}
s.log.Debug("reading stream for", zap.String("id", p.Id()))
}
return
}
func (s *streamChecker) CheckPeerConnection(peerId string) (err error) {
if s.streamPool.HasActiveStream(peerId) {
return
}
var (
configuration = s.connector.Configuration()
pool = s.connector.Pool()
)
nodeIds := configuration.NodeIds(s.spaceId)
// we don't know the address of the peer
if !slices.Contains(nodeIds, peerId) {
err = fmt.Errorf("don't know the address of peer %s", peerId)
return
}
newPeer, err := pool.Dial(s.syncCtx, peerId)
if err != nil {
return
}
return s.createStream(newPeer)
}
func (s *streamChecker) createStream(p peer.Peer) (err error) {
stream, err := s.clientFactory.Client(p).ObjectSyncStream(s.syncCtx)
if err != nil {
// so here probably the request is failed because there is no such space,
// but diffService should handle such cases by sending pushSpace
err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err))
return
}
// sending empty message for the server to understand from which space is it coming
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
if err != nil {
err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err))
return
}
err = s.streamPool.AddAndReadStreamAsync(stream)
if err != nil {
err = fmt.Errorf("failed to read from stream async: %w", err)
return
}
return
}
func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) {
nodeIds := s.connector.Configuration().NodeIds(s.spaceId)
for _, nodeId := range nodeIds {
if s.streamPool.HasActiveStream(nodeId) {
peerId = nodeId
return
}
}
err = fmt.Errorf("no responsible peers are connected")
return
}

View File

@ -1,332 +0,0 @@
package objectsync
import (
"context"
"errors"
"fmt"
"github.com/anytypeio/any-sync/app/ocache"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/net/peer"
"go.uber.org/zap"
"sync"
"sync/atomic"
"time"
)
var ErrEmptyPeer = errors.New("don't have such a peer")
var ErrStreamClosed = errors.New("stream is already closed")
var maxStreamReaders = 10
var syncWaitPeriod = 2 * time.Second
var ErrSyncTimeout = errors.New("too long wait on sync receive")
// StreamPool can be made generic to work with different streams
type StreamPool interface {
ocache.ObjectLastUsage
AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error)
AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error)
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
HasActiveStream(peerId string) bool
Close() (err error)
}
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
type responseWaiter struct {
ch chan *spacesyncproto.ObjectSyncMessage
}
type streamPool struct {
sync.Mutex
peerStreams map[string]spacesyncproto.ObjectSyncStream
messageHandler MessageHandler
wg *sync.WaitGroup
waiters map[string]responseWaiter
waitersMx sync.Mutex
counter atomic.Uint64
lastUsage atomic.Int64
}
func newStreamPool(messageHandler MessageHandler) StreamPool {
s := &streamPool{
peerStreams: make(map[string]spacesyncproto.ObjectSyncStream),
messageHandler: messageHandler,
waiters: make(map[string]responseWaiter),
wg: &sync.WaitGroup{},
}
s.lastUsage.Store(time.Now().Unix())
return s
}
func (s *streamPool) LastUsage() time.Time {
return time.Unix(s.lastUsage.Load(), 0)
}
func (s *streamPool) HasActiveStream(peerId string) (res bool) {
s.Lock()
defer s.Unlock()
_, err := s.getOrDeleteStream(peerId)
return err == nil
}
func (s *streamPool) SendSync(
peerId string,
msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
newCounter := s.counter.Add(1)
msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter)
s.waitersMx.Lock()
waiter := responseWaiter{
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
}
s.waiters[msg.ReplyId] = waiter
s.waitersMx.Unlock()
err = s.SendAsync([]string{peerId}, msg)
if err != nil {
return
}
delay := time.NewTimer(syncWaitPeriod)
select {
case <-delay.C:
s.waitersMx.Lock()
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
log.With(zap.String("replyId", msg.ReplyId)).Error("time elapsed when waiting")
err = ErrSyncTimeout
case reply = <-waiter.ch:
if !delay.Stop() {
<-delay.C
}
}
return
}
func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
s.lastUsage.Store(time.Now().Unix())
getStreams := func() (streams []spacesyncproto.ObjectSyncStream) {
for _, pId := range peers {
stream, err := s.getOrDeleteStream(pId)
if err != nil {
continue
}
streams = append(streams, stream)
}
return streams
}
s.Lock()
streams := getStreams()
s.Unlock()
log.With(zap.String("objectId", message.ObjectId), zap.Int("existing peers len", len(streams)), zap.Strings("wanted peers", peers)).
Debug("sending message to peers")
for _, stream := range streams {
err = stream.Send(message)
if err != nil {
log.Debug("error sending message to stream", zap.Error(err))
}
}
if len(peers) != 1 {
err = nil
}
return err
}
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) {
stream, exists := s.peerStreams[id]
if !exists {
err = ErrEmptyPeer
return
}
select {
case <-stream.Context().Done():
delete(s.peerStreams, id)
err = ErrStreamClosed
default:
}
return
}
func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) {
s.Lock()
defer s.Unlock()
Loop:
for id, stream := range s.peerStreams {
select {
case <-stream.Context().Done():
delete(s.peerStreams, id)
continue Loop
default:
break
}
log.With(zap.String("id", id)).Debug("getting peer stream")
streams = append(streams, stream)
}
return
}
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
streams := s.getAllStreams()
log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))).
Debug("broadcasting message to peers")
for _, stream := range streams {
if err = stream.Send(message); err != nil {
log.Debug("error sending message to stream", zap.Error(err))
}
}
return nil
}
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) {
peerId, err := s.addStream(stream)
if err != nil {
return
}
go s.readPeerLoop(peerId, stream)
return
}
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) {
peerId, err := s.addStream(stream)
if err != nil {
return
}
return s.readPeerLoop(peerId, stream)
}
func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) {
s.Lock()
peerId, err = peer.CtxPeerId(stream.Context())
if err != nil {
s.Unlock()
return
}
log.With(zap.String("peer id", peerId)).Debug("adding stream")
if oldStream, ok := s.peerStreams[peerId]; ok {
s.Unlock()
oldStream.Close()
s.Lock()
log.With(zap.String("peer id", peerId)).Debug("closed old stream before adding")
}
s.peerStreams[peerId] = stream
s.wg.Add(1)
s.Unlock()
return
}
func (s *streamPool) Close() (err error) {
s.Lock()
wg := s.wg
s.Unlock()
streams := s.getAllStreams()
log.Debug("closing streams on lock")
for _, stream := range streams {
stream.Close()
}
log.Debug("closed streams")
if wg != nil {
wg.Wait()
}
return nil
}
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
var (
log = log.With(zap.String("peerId", peerId))
queue = NewDefaultActionQueue()
)
queue.Run()
defer func() {
log.Debug("stopped reading stream from peer")
s.removePeer(peerId, stream)
queue.Close()
s.wg.Done()
}()
log.Debug("started reading stream from peer")
stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool {
s.waitersMx.Lock()
waiter, exists := s.waiters[msg.ReplyId]
if exists {
delete(s.waiters, msg.ReplyId)
s.waitersMx.Unlock()
waiter.ch <- msg
return true
}
s.waitersMx.Unlock()
return false
}
process := func(msg *spacesyncproto.ObjectSyncMessage) error {
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
log.Debug("getting message with reply id")
err = s.messageHandler(stream.Context(), peerId, msg)
if err != nil {
log.With(zap.Error(err)).Debug("message handling failed")
}
return nil
}
for {
select {
case <-stream.Context().Done():
return
default:
}
var msg *spacesyncproto.ObjectSyncMessage
msg, err = stream.Recv()
s.lastUsage.Store(time.Now().Unix())
if err != nil {
stream.Close()
return
}
if msg.ReplyId != "" {
// then we can send it directly to waiters without adding to queue or starting a reader
if stopWaiter(msg) {
continue
}
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
}
queue.Send(func() error {
return process(msg)
})
}
}
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
s.Lock()
defer s.Unlock()
mapStream, ok := s.peerStreams[peerId]
if !ok {
return ErrEmptyPeer
}
// it can be the case that the stream was already replaced
if mapStream == stream {
delete(s.peerStreams, peerId)
}
return
}
func genStreamPoolKey(peerId, treeId string, counter uint64) string {
return fmt.Sprintf("%s.%s.%d", peerId, treeId, counter)
}

View File

@ -1,299 +0,0 @@
package objectsync
import (
"context"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/rpc/rpctest"
"github.com/stretchr/testify/require"
"testing"
"time"
)
type testServer struct {
stream chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
releaseStream chan error
watchErrOnce bool
}
func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
panic("implement me")
}
func (t *testServer) SpacePush(ctx context.Context, request *spacesyncproto.SpacePushRequest) (*spacesyncproto.SpacePushResponse, error) {
panic("implement me")
}
func (t *testServer) SpacePull(ctx context.Context, request *spacesyncproto.SpacePullRequest) (*spacesyncproto.SpacePullResponse, error) {
panic("implement me")
}
func (t *testServer) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error {
t.stream <- stream
return <-t.releaseStream
}
func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream {
select {
case <-time.After(time.Second * 5):
test.Fatalf("waiteStream timeout")
case st := <-t.stream:
return st
}
return nil
}
type fixture struct {
testServer *testServer
drpcTS *rpctest.TesServer
client spacesyncproto.DRPCSpaceSyncClient
clientStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
serverStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
pool *streamPool
clientId string
serverId string
}
func newFixture(t *testing.T, clientId, serverId string, handler MessageHandler) *fixture {
fx := &fixture{
testServer: &testServer{},
drpcTS: rpctest.NewTestServer(),
clientId: clientId,
serverId: serverId,
}
fx.testServer.stream = make(chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream, 1)
require.NoError(t, spacesyncproto.DRPCRegisterSpaceSync(fx.drpcTS.Mux, fx.testServer))
fx.client = spacesyncproto.NewDRPCSpaceSyncClient(fx.drpcTS.Dial(peer.CtxWithPeerId(context.Background(), clientId)))
var err error
fx.clientStream, err = fx.client.ObjectSyncStream(peer.CtxWithPeerId(context.Background(), serverId))
require.NoError(t, err)
fx.serverStream = fx.testServer.waitStream(t)
fx.pool = newStreamPool(handler).(*streamPool)
return fx
}
func (fx *fixture) run(t *testing.T) chan error {
waitCh := make(chan error)
go func() {
err := fx.pool.AddAndReadStreamSync(fx.clientStream)
waitCh <- err
}()
time.Sleep(time.Millisecond * 10)
fx.pool.Lock()
require.Equal(t, fx.pool.peerStreams[fx.serverId], fx.clientStream)
fx.pool.Unlock()
return waitCh
}
func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
remId := "remoteId"
t.Run("client close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
t.Run("server close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.serverStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
}
func TestStreamPool_Close(t *testing.T) {
remId := "remoteId"
t.Run("close", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
fx.run(t)
fx.pool.Close()
select {
case <-fx.clientStream.Context().Done():
break
case <-time.After(time.Millisecond * 100):
t.Fatal("context should be closed")
}
})
}
func TestStreamPool_ReceiveMessage(t *testing.T) {
remId := "remoteId"
t.Run("pool receive message from server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
recvChan := make(chan struct{})
fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
require.Equal(t, msg, message)
recvChan <- struct{}{}
return nil
})
waitCh := fx.run(t)
err := fx.serverStream.Send(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
}
func TestStreamPool_HasActiveStream(t *testing.T) {
remId := "remoteId"
t.Run("pool has active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
require.True(t, fx.pool.HasActiveStream(remId))
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
t.Run("pool has no active stream", func(t *testing.T) {
fx := newFixture(t, "", remId, nil)
waitCh := fx.run(t)
err := fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.False(t, fx.pool.HasActiveStream(remId))
require.Nil(t, fx.pool.peerStreams[remId])
})
}
func TestStreamPool_SendAsync(t *testing.T) {
remId := "remoteId"
t.Run("pool send async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.SendAsync([]string{remId}, msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
}
func TestStreamPool_SendSync(t *testing.T) {
remId := "remoteId"
t.Run("pool send sync to server", func(t *testing.T) {
objectId := "objectId"
payload := []byte("payload")
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
message.Payload = payload
err = fx.serverStream.Send(message)
require.NoError(t, err)
}()
waitCh := fx.run(t)
res, err := fx.pool.SendSync(remId, msg)
require.NoError(t, err)
require.Equal(t, payload, res.Payload)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
t.Run("pool send sync timeout", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
syncWaitPeriod = time.Millisecond * 30
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg.ObjectId, message.ObjectId)
require.NotEmpty(t, message.ReplyId)
}()
waitCh := fx.run(t)
_, err := fx.pool.SendSync(remId, msg)
require.Equal(t, ErrSyncTimeout, err)
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
}
func TestStreamPool_BroadcastAsync(t *testing.T) {
remId := "remoteId"
t.Run("pool broadcast async to server", func(t *testing.T) {
objectId := "objectId"
msg := &spacesyncproto.ObjectSyncMessage{
ObjectId: objectId,
}
fx := newFixture(t, "", remId, nil)
recvChan := make(chan struct{})
go func() {
message, err := fx.serverStream.Recv()
require.NoError(t, err)
require.Equal(t, msg, message)
recvChan <- struct{}{}
}()
waitCh := fx.run(t)
err := fx.pool.BroadcastAsync(msg)
require.NoError(t, err)
<-recvChan
err = fx.clientStream.Close()
require.NoError(t, err)
err = <-waitCh
require.Error(t, err)
require.Nil(t, fx.pool.peerStreams[remId])
})
}

View File

@ -1,24 +0,0 @@
package commonspace
import (
"context"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
)
type RpcHandler interface {
HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error)
Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error
}
type rpcHandler struct {
s *space
}
func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
return r.s.HeadSync().HandleRangeRequest(ctx, req)
}
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) (err error) {
// TODO: if needed we can launch full sync here
return r.s.ObjectSync().StreamPool().AddAndReadStreamSync(stream)
}

View File

@ -14,6 +14,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree/updatelistener"
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
"github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage"
"github.com/anytypeio/any-sync/commonspace/object/treegetter"
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/settings" "github.com/anytypeio/any-sync/commonspace/settings"
"github.com/anytypeio/any-sync/commonspace/settings/deletionstate" "github.com/anytypeio/any-sync/commonspace/settings/deletionstate"
@ -80,16 +81,14 @@ type Space interface {
DebugAllHeads() []headsync.TreeHeads DebugAllHeads() []headsync.TreeHeads
Description() (SpaceDescription, error) Description() (SpaceDescription, error)
SpaceSyncRpc() RpcHandler
DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error)
DeleteTree(ctx context.Context, id string) (err error) DeleteTree(ctx context.Context, id string) (err error)
BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error)
HeadSync() headsync.HeadSync HeadSync() headsync.HeadSync
ObjectSync() objectsync.ObjectSync
SyncStatus() syncstatus.StatusUpdater SyncStatus() syncstatus.StatusUpdater
Storage() spacestorage.SpaceStorage Storage() spacestorage.SpaceStorage
@ -101,13 +100,11 @@ type space struct {
mu sync.RWMutex mu sync.RWMutex
header *spacesyncproto.RawSpaceHeaderWithId header *spacesyncproto.RawSpaceHeaderWithId
rpc *rpcHandler
objectSync objectsync.ObjectSync objectSync objectsync.ObjectSync
headSync headsync.HeadSync headSync headsync.HeadSync
syncStatus syncstatus.StatusUpdater syncStatus syncstatus.StatusUpdater
storage spacestorage.SpaceStorage storage spacestorage.SpaceStorage
cache *commonGetter cache treegetter.TreeGetter
account accountservice.Service account accountservice.Service
aclList *syncacl.SyncAcl aclList *syncacl.SyncAcl
configuration nodeconf.Configuration configuration nodeconf.Configuration
@ -161,7 +158,6 @@ func (s *space) Init(ctx context.Context) (err error) {
return return
} }
s.header = header s.header = header
s.rpc = &rpcHandler{s: s}
initialIds, err := s.storage.StoredIds() initialIds, err := s.storage.StoredIds()
if err != nil { if err != nil {
return return
@ -174,8 +170,7 @@ func (s *space) Init(ctx context.Context) (err error) {
if err != nil { if err != nil {
return return
} }
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.StreamPool()) s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool())
s.cache.AddObject(s.aclList)
deletionState := deletionstate.NewDeletionState(s.storage) deletionState := deletionstate.NewDeletionState(s.storage)
deps := settings.Deps{ deps := settings.Deps{
@ -196,8 +191,9 @@ func (s *space) Init(ctx context.Context) (err error) {
DeletionState: deletionState, DeletionState: deletionState,
} }
s.settingsObject = settings.NewSettingsObject(deps, s.id) s.settingsObject = settings.NewSettingsObject(deps, s.id)
s.cache.AddObject(s.settingsObject)
s.objectSync.Init() objectGetter := newCommonSpaceGetter(s.id, s.aclList, s.cache, s.settingsObject)
s.objectSync.Init(objectGetter)
s.headSync.Init(initialIds, deletionState) s.headSync.Init(initialIds, deletionState)
err = s.settingsObject.Init(ctx) err = s.settingsObject.Init(ctx)
if err != nil { if err != nil {
@ -208,10 +204,6 @@ func (s *space) Init(ctx context.Context) (err error) {
return nil return nil
} }
func (s *space) SpaceSyncRpc() RpcHandler {
return s.rpc
}
func (s *space) ObjectSync() objectsync.ObjectSync { func (s *space) ObjectSync() objectsync.ObjectSync {
return s.objectSync return s.objectSync
} }
@ -297,11 +289,6 @@ type BuildTreeOpts struct {
WaitTreeRemoteSync bool WaitTreeRemoteSync bool
} }
type HistoryTreeOpts struct {
BeforeId string
Include bool
}
func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) { func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t objecttree.ObjectTree, err error) {
if s.isClosed.Load() { if s.isClosed.Load() {
err = ErrSpaceClosed err = ErrSpaceClosed
@ -323,24 +310,6 @@ func (s *space) BuildTree(ctx context.Context, id string, opts BuildTreeOpts) (t
return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps) return synctree.BuildSyncTreeOrGetRemote(ctx, id, deps)
} }
func (s *space) BuildHistoryTree(ctx context.Context, id string, opts HistoryTreeOpts) (t objecttree.HistoryTree, err error) {
if s.isClosed.Load() {
err = ErrSpaceClosed
return
}
params := objecttree.HistoryTreeParams{
AclList: s.aclList,
BeforeId: opts.BeforeId,
IncludeBeforeId: opts.Include,
}
params.TreeStorage, err = s.storage.TreeStorage(id)
if err != nil {
return
}
return objecttree.BuildHistoryTree(params)
}
func (s *space) DeleteTree(ctx context.Context, id string) (err error) { func (s *space) DeleteTree(ctx context.Context, id string) (err error) {
return s.settingsObject.DeleteObject(id) return s.settingsObject.DeleteObject(id)
} }

View File

@ -13,6 +13,7 @@ import (
"github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/objectsync"
"github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacestorage"
"github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/spacesyncproto"
"github.com/anytypeio/any-sync/commonspace/streammanager"
"github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/commonspace/syncstatus"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool" "github.com/anytypeio/any-sync/net/pool"
@ -39,12 +40,13 @@ type SpaceService interface {
} }
type spaceService struct { type spaceService struct {
config Config config Config
account accountservice.Service account accountservice.Service
configurationService nodeconf.Service configurationService nodeconf.Service
storageProvider spacestorage.SpaceStorageProvider storageProvider spacestorage.SpaceStorageProvider
treeGetter treegetter.TreeGetter streamManagerProvider streammanager.StreamManagerProvider
pool pool.Pool treeGetter treegetter.TreeGetter
pool pool.Pool
} }
func (s *spaceService) Init(a *app.App) (err error) { func (s *spaceService) Init(a *app.App) (err error) {
@ -53,6 +55,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter)
s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider)
s.pool = a.MustComponent(pool.CName).(pool.Pool) s.pool = a.MustComponent(pool.CName).(pool.Pool)
return nil return nil
} }
@ -115,7 +118,7 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
lastConfiguration := s.configurationService.GetLast() lastConfiguration := s.configurationService.GetLast()
confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool) confConnector := confconnector.NewConfConnector(lastConfiguration, s.pool)
getter := newCommonGetter(st.Id(), s.treeGetter)
syncStatus := syncstatus.NewNoOpSyncStatus() syncStatus := syncstatus.NewNoOpSyncStatus()
// this will work only for clients, not the best solution, but... // this will work only for clients, not the best solution, but...
if !lastConfiguration.IsResponsible(st.Id()) { if !lastConfiguration.IsResponsible(st.Id()) {
@ -123,14 +126,21 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
} }
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, getter, syncStatus, log) // TODO: [che] remove *5
objectSync := objectsync.NewObjectSync(id, confConnector, getter) headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log)
streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id)
if err != nil {
return nil, err
}
objectSync := objectsync.NewObjectSync(streamManager, id)
sp := &space{ sp := &space{
id: id, id: id,
objectSync: objectSync, objectSync: objectSync,
headSync: headSync, headSync: headSync,
syncStatus: syncStatus, syncStatus: syncStatus,
cache: getter, cache: s.treeGetter,
account: s.account, account: s.account,
configuration: lastConfiguration, configuration: lastConfiguration,
storage: st, storage: st,

View File

@ -0,0 +1,14 @@
package streammanager
import (
"context"
"github.com/anytypeio/any-sync/app"
"github.com/anytypeio/any-sync/commonspace/objectsync"
)
const CName = "common.commonspace.streammanager"
type StreamManagerProvider interface {
app.Component
NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error)
}

View File

@ -1,6 +1,7 @@
package streampool package streampool
import ( import (
"fmt"
"go.uber.org/zap" "go.uber.org/zap"
"storj.io/drpc" "storj.io/drpc"
"sync/atomic" "sync/atomic"
@ -17,6 +18,9 @@ type stream struct {
} }
func (sr *stream) write(msg drpc.Message) (err error) { func (sr *stream) write(msg drpc.Message) (err error) {
defer func() {
sr.l.Debug("write", zap.String("msg", msg.(fmt.Stringer).String()), zap.Error(err))
}()
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { if err = sr.stream.MsgSend(msg, EncodingProto); err != nil {
sr.l.Info("stream write error", zap.Error(err)) sr.l.Info("stream write error", zap.Error(err))
sr.streamClose() sr.streamClose()
@ -24,7 +28,7 @@ func (sr *stream) write(msg drpc.Message) (err error) {
return err return err
} }
func (sr *stream) readLoop() { func (sr *stream) readLoop() error {
defer func() { defer func() {
sr.streamClose() sr.streamClose()
}() }()
@ -32,7 +36,12 @@ func (sr *stream) readLoop() {
msg := sr.pool.handler.NewReadMessage() msg := sr.pool.handler.NewReadMessage()
if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil { if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil {
sr.l.Info("msg receive error", zap.Error(err)) sr.l.Info("msg receive error", zap.Error(err))
return return err
}
sr.l.Debug("read msg", zap.String("msg", msg.(fmt.Stringer).String()))
if err := sr.pool.handler.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil {
sr.l.Info("msg handle error", zap.Error(err))
return err
} }
} }
} }

View File

@ -1,7 +1,9 @@
package streampool package streampool
import ( import (
"fmt"
"github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/peer"
"github.com/anytypeio/any-sync/net/pool"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -21,10 +23,14 @@ type StreamHandler interface {
// StreamPool keeps and read streams // StreamPool keeps and read streams
type StreamPool interface { type StreamPool interface {
// AddStream adds new incoming stream into the pool // AddStream adds new outgoing stream into the pool
AddStream(peerId string, stream drpc.Stream, tags ...string) AddStream(peerId string, stream drpc.Stream, tags ...string)
// ReadStream adds new incoming stream and synchronously read it
ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error)
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error)
// SendById sends a message to given peerIds. Works only if stream exists
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
// Broadcast sends a message to all peers with given tags. Works async. // Broadcast sends a message to all peers with given tags. Works async.
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
// Close closes all streams // Close closes all streams
@ -42,7 +48,19 @@ type streamPool struct {
lastStreamId uint32 lastStreamId uint32
} }
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
st := s.addStream(peerId, drpcStream, tags...)
return st.readLoop()
}
func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) { func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) {
st := s.addStream(peerId, drpcStream, tags...)
go func() {
_ = st.readLoop()
}()
}
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
s.lastStreamId++ s.lastStreamId++
@ -60,7 +78,8 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st
for _, tag := range tags { for _, tag := range tags {
s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId)
} }
go st.readLoop() st.l.Debug("stream added", zap.Strings("tags", st.tags))
return st
} }
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
@ -75,6 +94,30 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P
return s.exec.Add(ctx, funcs...) return s.exec.Add(ctx, funcs...)
} }
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
s.mu.Lock()
var streams []*stream
for _, peerId := range peerIds {
for _, streamId := range s.streamIdsByPeer[peerId] {
streams = append(streams, s.streams[streamId])
}
}
s.mu.Unlock()
log.Debug("sendById", zap.String("msg", msg.(fmt.Stringer).String()), zap.Int("streams", len(streams)))
var funcs []func()
for _, st := range streams {
funcs = append(funcs, func() {
if e := st.write(msg); e != nil {
log.Debug("sendById write error", zap.Error(e))
}
})
}
if len(funcs) == 0 {
return pool.ErrUnableToConnect
}
return s.exec.Add(ctx, funcs...)
}
func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) { func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) {
// get all streams relates to peer // get all streams relates to peer
streams, err := s.getStreams(ctx, p) streams, err := s.getStreams(ctx, p)
@ -164,6 +207,9 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
} }
}) })
} }
if len(funcs) == 0 {
return
}
return s.exec.Add(ctx, funcs...) return s.exec.Add(ctx, funcs...)
} }
@ -195,6 +241,7 @@ func (s *streamPool) removeStream(streamId uint32) {
} }
delete(s.streams, streamId) delete(s.streams, streamId)
st.l.Debug("stream removed", zap.Strings("tags", st.tags))
} }
func (s *streamPool) Close() (err error) { func (s *streamPool) Close() (err error) {