commonspace with new streampool
This commit is contained in:
parent
330c97ab30
commit
79d81662ce
@ -2,6 +2,7 @@ package headsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/ldiff"
|
||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/synctree"
|
||||
@ -91,7 +92,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
for _, p := range peers {
|
||||
if err := d.syncWithPeer(ctx, p); err != nil {
|
||||
if err = d.syncWithPeer(ctx, p); err != nil {
|
||||
d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err))
|
||||
}
|
||||
}
|
||||
@ -110,7 +111,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error)
|
||||
err = rpcerr.Unwrap(err)
|
||||
if err != nil && err != spacesyncproto.ErrSpaceMissing {
|
||||
d.syncStatus.SetNodesOnline(p.Id(), false)
|
||||
return err
|
||||
return fmt.Errorf("diff error: %v", err)
|
||||
}
|
||||
d.syncStatus.SetNodesOnline(p.Id(), true)
|
||||
|
||||
@ -148,7 +149,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) {
|
||||
// it may be already there (i.e. loaded)
|
||||
// and build func will not be called, thus we won't sync the tree
|
||||
// therefore we just do it manually
|
||||
syncTree.Ping()
|
||||
_ = syncTree.Ping(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -9,10 +9,10 @@ import (
|
||||
type SyncAcl struct {
|
||||
list.AclList
|
||||
synchandler.SyncHandler
|
||||
streamPool objectsync.StreamPool
|
||||
streamPool objectsync.MessagePool
|
||||
}
|
||||
|
||||
func NewSyncAcl(aclList list.AclList, streamPool objectsync.StreamPool) *SyncAcl {
|
||||
func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl {
|
||||
return &SyncAcl{
|
||||
AclList: aclList,
|
||||
SyncHandler: nil,
|
||||
|
||||
@ -38,32 +38,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder {
|
||||
return m.recorder
|
||||
}
|
||||
|
||||
// BroadcastAsync mocks base method.
|
||||
func (m *MockSyncClient) BroadcastAsync(arg0 *treechangeproto.TreeSyncMessage) error {
|
||||
// Broadcast mocks base method.
|
||||
func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "BroadcastAsync", arg0)
|
||||
ret := m.ctrl.Call(m, "Broadcast", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// BroadcastAsync indicates an expected call of BroadcastAsync.
|
||||
func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.Call {
|
||||
// Broadcast indicates an expected call of Broadcast.
|
||||
func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1)
|
||||
}
|
||||
|
||||
// BroadcastAsyncOrSendResponsible mocks base method.
|
||||
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *treechangeproto.TreeSyncMessage) error {
|
||||
func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0)
|
||||
ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible.
|
||||
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call {
|
||||
func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1)
|
||||
}
|
||||
|
||||
// CreateFullSyncRequest mocks base method.
|
||||
@ -124,18 +124,18 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call {
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest))
|
||||
}
|
||||
|
||||
// SendAsync mocks base method.
|
||||
func (m *MockSyncClient) SendAsync(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) error {
|
||||
// SendWithReply mocks base method.
|
||||
func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1 string, arg2 *treechangeproto.TreeSyncMessage, arg3 string) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "SendAsync", arg0, arg1, arg2)
|
||||
ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// SendAsync indicates an expected call of SendAsync.
|
||||
func (mr *MockSyncClientMockRecorder) SendAsync(arg0, arg1, arg2 interface{}) *gomock.Call {
|
||||
// SendWithReply indicates an expected call of SendWithReply.
|
||||
func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAsync", reflect.TypeOf((*MockSyncClient)(nil).SendAsync), arg0, arg1, arg2)
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3)
|
||||
}
|
||||
|
||||
// MockSyncTree is a mock of SyncTree interface.
|
||||
@ -364,17 +364,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call {
|
||||
}
|
||||
|
||||
// Ping mocks base method.
|
||||
func (m *MockSyncTree) Ping() error {
|
||||
func (m *MockSyncTree) Ping(arg0 context.Context) error {
|
||||
m.ctrl.T.Helper()
|
||||
ret := m.ctrl.Call(m, "Ping")
|
||||
ret := m.ctrl.Call(m, "Ping", arg0)
|
||||
ret0, _ := ret[0].(error)
|
||||
return ret0
|
||||
}
|
||||
|
||||
// Ping indicates an expected call of Ping.
|
||||
func (mr *MockSyncTreeMockRecorder) Ping() *gomock.Call {
|
||||
func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call {
|
||||
mr.mock.ctrl.T.Helper()
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping))
|
||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0)
|
||||
}
|
||||
|
||||
// RLock mocks base method.
|
||||
|
||||
@ -1,6 +1,7 @@
|
||||
package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
)
|
||||
@ -17,20 +18,20 @@ func newQueuedClient(client SyncClient, queue objectsync.ActionQueue) SyncClient
|
||||
}
|
||||
}
|
||||
|
||||
func (q *queuedClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
func (q *queuedClient) Broadcast(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
return q.queue.Send(func() error {
|
||||
return q.SyncClient.BroadcastAsync(message)
|
||||
return q.SyncClient.Broadcast(ctx, message)
|
||||
})
|
||||
}
|
||||
|
||||
func (q *queuedClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||
func (q *queuedClient) SendWithReply(ctx context.Context, peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||
return q.queue.Send(func() error {
|
||||
return q.SyncClient.SendAsync(peerId, message, replyId)
|
||||
return q.SyncClient.SendWithReply(ctx, peerId, message, replyId)
|
||||
})
|
||||
}
|
||||
|
||||
func (q *queuedClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
func (q *queuedClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
return q.queue.Send(func() error {
|
||||
return q.SyncClient.BroadcastAsyncOrSendResponsible(message)
|
||||
return q.SyncClient.BroadcastAsyncOrSendResponsible(ctx, message)
|
||||
})
|
||||
}
|
||||
|
||||
@ -2,6 +2,7 @@
|
||||
package synctree
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
@ -11,72 +12,61 @@ import (
|
||||
|
||||
type SyncClient interface {
|
||||
RequestFactory
|
||||
BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error)
|
||||
BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error)
|
||||
SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error)
|
||||
Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
|
||||
BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error)
|
||||
SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error)
|
||||
}
|
||||
|
||||
type syncClient struct {
|
||||
objectsync.StreamPool
|
||||
objectsync.MessagePool
|
||||
RequestFactory
|
||||
spaceId string
|
||||
connector confconnector.ConfConnector
|
||||
configuration nodeconf.Configuration
|
||||
|
||||
checker objectsync.StreamChecker
|
||||
}
|
||||
|
||||
func newSyncClient(
|
||||
spaceId string,
|
||||
pool objectsync.StreamPool,
|
||||
pool objectsync.MessagePool,
|
||||
factory RequestFactory,
|
||||
configuration nodeconf.Configuration,
|
||||
checker objectsync.StreamChecker) SyncClient {
|
||||
configuration nodeconf.Configuration) SyncClient {
|
||||
return &syncClient{
|
||||
StreamPool: pool,
|
||||
MessagePool: pool,
|
||||
RequestFactory: factory,
|
||||
configuration: configuration,
|
||||
checker: checker,
|
||||
spaceId: spaceId,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
|
||||
func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) {
|
||||
objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.checker.CheckResponsiblePeers()
|
||||
return s.StreamPool.BroadcastAsync(objMsg)
|
||||
return s.MessagePool.Broadcast(ctx, objMsg)
|
||||
}
|
||||
|
||||
func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||
err = s.checker.CheckPeerConnection(peerId)
|
||||
func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) {
|
||||
objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, replyId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.StreamPool.SendAsync([]string{peerId}, objMsg)
|
||||
return s.MessagePool.SendPeer(ctx, peerId, objMsg)
|
||||
}
|
||||
|
||||
func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "")
|
||||
func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) {
|
||||
objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if s.configuration.IsResponsible(s.spaceId) {
|
||||
s.checker.CheckResponsiblePeers()
|
||||
return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg)
|
||||
return s.MessagePool.SendResponsible(ctx, objMsg)
|
||||
}
|
||||
return s.BroadcastAsync(message)
|
||||
return s.Broadcast(ctx, message)
|
||||
}
|
||||
|
||||
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
payload, err := message.Marshal()
|
||||
if err != nil {
|
||||
return
|
||||
@ -84,7 +74,8 @@ func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId s
|
||||
objMsg = &spacesyncproto.ObjectSyncMessage{
|
||||
ReplyId: replyId,
|
||||
Payload: payload,
|
||||
ObjectId: id,
|
||||
ObjectId: objectId,
|
||||
SpaceId: spaceId,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -33,7 +33,7 @@ type HeadNotifiable interface {
|
||||
type SyncTree interface {
|
||||
objecttree.ObjectTree
|
||||
synchandler.SyncHandler
|
||||
Ping() (err error)
|
||||
Ping(ctx context.Context) (err error)
|
||||
}
|
||||
|
||||
// SyncTree sends head updates to sync service and also sends new changes to update listener
|
||||
@ -73,34 +73,24 @@ func newWrappedSyncClient(
|
||||
factory RequestFactory,
|
||||
objectSync objectsync.ObjectSync,
|
||||
configuration nodeconf.Configuration) SyncClient {
|
||||
syncClient := newSyncClient(spaceId, objectSync.StreamPool(), factory, configuration, objectSync.StreamChecker())
|
||||
syncClient := newSyncClient(spaceId, objectSync.MessagePool(), factory, configuration)
|
||||
return newQueuedClient(syncClient, objectSync.ActionQueue())
|
||||
}
|
||||
|
||||
func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) {
|
||||
getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) {
|
||||
streamChecker := deps.ObjectSync.StreamChecker()
|
||||
peerId, err := peer.CtxPeerId(ctx)
|
||||
if err != nil {
|
||||
streamChecker.CheckResponsiblePeers()
|
||||
peerId, err = streamChecker.FirstResponsiblePeer()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
peerId = deps.Configuration.NodeIds(deps.SpaceId)[0]
|
||||
}
|
||||
|
||||
newTreeRequest := GetRequestFactory().CreateNewTreeRequest()
|
||||
objMsg, err := marshallTreeMessage(newTreeRequest, id, "")
|
||||
objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
err = deps.ObjectSync.StreamChecker().CheckPeerConnection(peerId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := deps.ObjectSync.StreamPool().SendSync(peerId, objMsg)
|
||||
resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -213,7 +203,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy
|
||||
if isFirstBuild {
|
||||
headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil)
|
||||
// send to everybody, because everybody should know that the node or client got new tree
|
||||
syncTree.syncClient.BroadcastAsync(headUpdate)
|
||||
if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil {
|
||||
log.Error("broadcast error", zap.Error(e))
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -245,7 +237,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh
|
||||
}
|
||||
s.syncStatus.HeadsChange(s.Id(), res.Heads)
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
err = s.syncClient.Broadcast(ctx, headUpdate)
|
||||
return
|
||||
}
|
||||
|
||||
@ -272,7 +264,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree.
|
||||
s.notifiable.UpdateHeads(s.Id(), res.Heads)
|
||||
}
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added)
|
||||
err = s.syncClient.BroadcastAsync(headUpdate)
|
||||
err = s.syncClient.Broadcast(ctx, headUpdate)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -314,11 +306,11 @@ func (s *syncTree) checkAlive() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (s *syncTree) Ping() (err error) {
|
||||
func (s *syncTree) Ping(ctx context.Context) (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
headUpdate := s.syncClient.CreateHeadUpdate(s, nil)
|
||||
return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate)
|
||||
return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate)
|
||||
}
|
||||
|
||||
func (s *syncTree) afterBuild() {
|
||||
|
||||
@ -111,7 +111,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
||||
return
|
||||
}
|
||||
|
||||
return s.syncClient.SendAsync(senderId, fullRequest, replyId)
|
||||
return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId)
|
||||
}
|
||||
|
||||
if s.alreadyHasHeads(objTree, update.Heads) {
|
||||
@ -135,7 +135,7 @@ func (s *syncTreeHandler) handleHeadUpdate(
|
||||
return
|
||||
}
|
||||
|
||||
return s.syncClient.SendAsync(senderId, fullRequest, replyId)
|
||||
return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId)
|
||||
}
|
||||
|
||||
func (s *syncTreeHandler) handleFullSyncRequest(
|
||||
@ -159,7 +159,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("full sync request finished with error")
|
||||
|
||||
s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId)
|
||||
s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId)
|
||||
return
|
||||
} else if fullResponse != nil {
|
||||
log.Debug("full sync response sent")
|
||||
@ -180,7 +180,7 @@ func (s *syncTreeHandler) handleFullSyncRequest(
|
||||
return
|
||||
}
|
||||
|
||||
return s.syncClient.SendAsync(senderId, fullResponse, replyId)
|
||||
return s.syncClient.SendWithReply(ctx, senderId, fullResponse, replyId)
|
||||
}
|
||||
|
||||
func (s *syncTreeHandler) handleFullSyncResponse(
|
||||
|
||||
120
commonspace/objectsync/msgpool.go
Normal file
120
commonspace/objectsync/msgpool.go
Normal file
@ -0,0 +1,120 @@
|
||||
package objectsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"go.uber.org/zap"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type StreamManager interface {
|
||||
SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
}
|
||||
|
||||
// MessagePool can be made generic to work with different streams
|
||||
type MessagePool interface {
|
||||
synchandler.SyncHandler
|
||||
StreamManager
|
||||
SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||
}
|
||||
|
||||
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
|
||||
type responseWaiter struct {
|
||||
ch chan *spacesyncproto.ObjectSyncMessage
|
||||
}
|
||||
|
||||
type messagePool struct {
|
||||
sync.Mutex
|
||||
StreamManager
|
||||
messageHandler MessageHandler
|
||||
waiters map[string]responseWaiter
|
||||
waitersMx sync.Mutex
|
||||
counter atomic.Uint64
|
||||
queue ActionQueue
|
||||
}
|
||||
|
||||
func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool {
|
||||
s := &messagePool{
|
||||
StreamManager: streamManager,
|
||||
messageHandler: messageHandler,
|
||||
waiters: make(map[string]responseWaiter),
|
||||
queue: NewDefaultActionQueue(),
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
newCounter := s.counter.Add(1)
|
||||
msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter)
|
||||
|
||||
s.waitersMx.Lock()
|
||||
waiter := responseWaiter{
|
||||
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
||||
}
|
||||
s.waiters[msg.ReplyId] = waiter
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
err = s.SendPeer(ctx, peerId, msg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
s.waitersMx.Lock()
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting")
|
||||
err = ctx.Err()
|
||||
case reply = <-waiter.ch:
|
||||
// success
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
if msg.ReplyId != "" {
|
||||
// we got reply, send it to waiter
|
||||
if s.stopWaiter(msg) {
|
||||
return
|
||||
}
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
|
||||
return
|
||||
}
|
||||
return s.queue.Send(func() error {
|
||||
if e := s.messageHandler(ctx, senderId, msg); e != nil {
|
||||
log.Info("handle message error", zap.Error(e))
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool {
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.ReplyId]
|
||||
if exists {
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
waiter.ch <- msg
|
||||
return true
|
||||
}
|
||||
s.waitersMx.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
func genReplyKey(peerId, treeId string, counter uint64) string {
|
||||
b := &strings.Builder{}
|
||||
b.WriteString(peerId)
|
||||
b.WriteString(".")
|
||||
b.WriteString(treeId)
|
||||
b.WriteString(".")
|
||||
b.WriteString(strconv.FormatUint(counter, 36))
|
||||
return b.String()
|
||||
}
|
||||
@ -5,7 +5,6 @@ import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app/logger"
|
||||
"github.com/anytypeio/any-sync/app/ocache"
|
||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||
"github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync/synchandler"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
@ -18,8 +17,7 @@ var log = logger.NewNamed("commonspace.objectsync")
|
||||
type ObjectSync interface {
|
||||
ocache.ObjectLastUsage
|
||||
synchandler.SyncHandler
|
||||
StreamPool() StreamPool
|
||||
StreamChecker() StreamChecker
|
||||
MessagePool() MessagePool
|
||||
ActionQueue() ActionQueue
|
||||
|
||||
Init(getter syncobjectgetter.SyncObjectGetter)
|
||||
@ -29,8 +27,7 @@ type ObjectSync interface {
|
||||
type objectSync struct {
|
||||
spaceId string
|
||||
|
||||
streamPool StreamPool
|
||||
checker StreamChecker
|
||||
streamPool MessagePool
|
||||
objectGetter syncobjectgetter.SyncObjectGetter
|
||||
actionQueue ActionQueue
|
||||
|
||||
@ -38,26 +35,14 @@ type objectSync struct {
|
||||
cancelSync context.CancelFunc
|
||||
}
|
||||
|
||||
func NewObjectSync(
|
||||
spaceId string,
|
||||
confConnector confconnector.ConfConnector) (objectSync ObjectSync) {
|
||||
streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync ObjectSync) {
|
||||
msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
return objectSync.HandleMessage(ctx, senderId, message)
|
||||
})
|
||||
clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient)
|
||||
syncLog := log.With(zap.String("id", spaceId))
|
||||
syncCtx, cancel := context.WithCancel(context.Background())
|
||||
checker := NewStreamChecker(
|
||||
spaceId,
|
||||
confConnector,
|
||||
streamPool,
|
||||
clientFactory,
|
||||
syncCtx,
|
||||
syncLog)
|
||||
objectSync = newObjectSync(
|
||||
spaceId,
|
||||
streamPool,
|
||||
checker,
|
||||
msgPool,
|
||||
syncCtx,
|
||||
cancel)
|
||||
return
|
||||
@ -65,15 +50,13 @@ func NewObjectSync(
|
||||
|
||||
func newObjectSync(
|
||||
spaceId string,
|
||||
streamPool StreamPool,
|
||||
checker StreamChecker,
|
||||
streamPool MessagePool,
|
||||
syncCtx context.Context,
|
||||
cancel context.CancelFunc,
|
||||
) *objectSync {
|
||||
return &objectSync{
|
||||
streamPool: streamPool,
|
||||
spaceId: spaceId,
|
||||
checker: checker,
|
||||
syncCtx: syncCtx,
|
||||
cancelSync: cancel,
|
||||
actionQueue: NewDefaultActionQueue(),
|
||||
@ -83,17 +66,17 @@ func newObjectSync(
|
||||
func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) {
|
||||
s.objectGetter = objectGetter
|
||||
s.actionQueue.Run()
|
||||
go s.checker.CheckResponsiblePeers()
|
||||
}
|
||||
|
||||
func (s *objectSync) Close() (err error) {
|
||||
s.actionQueue.Close()
|
||||
s.cancelSync()
|
||||
return s.streamPool.Close()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *objectSync) LastUsage() time.Time {
|
||||
return s.streamPool.LastUsage()
|
||||
// TODO: [che]
|
||||
return time.Now()
|
||||
}
|
||||
|
||||
func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
@ -105,14 +88,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message
|
||||
return obj.HandleMessage(ctx, senderId, message)
|
||||
}
|
||||
|
||||
func (s *objectSync) StreamPool() StreamPool {
|
||||
func (s *objectSync) MessagePool() MessagePool {
|
||||
return s.streamPool
|
||||
}
|
||||
|
||||
func (s *objectSync) StreamChecker() StreamChecker {
|
||||
return s.checker
|
||||
}
|
||||
|
||||
func (s *objectSync) ActionQueue() ActionQueue {
|
||||
return s.actionQueue
|
||||
}
|
||||
|
||||
@ -1,146 +0,0 @@
|
||||
package objectsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/commonspace/confconnector"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/rpc/rpcerr"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"time"
|
||||
)
|
||||
|
||||
type StreamChecker interface {
|
||||
CheckResponsiblePeers()
|
||||
CheckPeerConnection(peerId string) (err error)
|
||||
FirstResponsiblePeer() (peerId string, err error)
|
||||
}
|
||||
|
||||
type streamChecker struct {
|
||||
spaceId string
|
||||
connector confconnector.ConfConnector
|
||||
streamPool StreamPool
|
||||
clientFactory spacesyncproto.ClientFactory
|
||||
log *zap.Logger
|
||||
syncCtx context.Context
|
||||
lastCheck *atomic.Time
|
||||
}
|
||||
|
||||
const streamCheckerInterval = time.Second * 5
|
||||
|
||||
func NewStreamChecker(
|
||||
spaceId string,
|
||||
connector confconnector.ConfConnector,
|
||||
streamPool StreamPool,
|
||||
clientFactory spacesyncproto.ClientFactory,
|
||||
syncCtx context.Context,
|
||||
log *zap.Logger) StreamChecker {
|
||||
return &streamChecker{
|
||||
spaceId: spaceId,
|
||||
connector: connector,
|
||||
streamPool: streamPool,
|
||||
clientFactory: clientFactory,
|
||||
log: log,
|
||||
syncCtx: syncCtx,
|
||||
lastCheck: atomic.NewTime(time.Time{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamChecker) CheckResponsiblePeers() {
|
||||
lastCheck := s.lastCheck.Load()
|
||||
now := time.Now()
|
||||
if lastCheck.Add(streamCheckerInterval).After(now) {
|
||||
return
|
||||
}
|
||||
s.lastCheck.Store(now)
|
||||
|
||||
var (
|
||||
activeNodeIds []string
|
||||
configuration = s.connector.Configuration()
|
||||
)
|
||||
nodeIds := configuration.NodeIds(s.spaceId)
|
||||
for _, nodeId := range nodeIds {
|
||||
if s.streamPool.HasActiveStream(nodeId) {
|
||||
s.log.Debug("has active stream for", zap.String("id", nodeId))
|
||||
activeNodeIds = append(activeNodeIds, nodeId)
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.log.Debug("total streams", zap.Int("total", len(activeNodeIds)))
|
||||
newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds)
|
||||
if err != nil {
|
||||
s.log.Error("failed to dial peers", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
for _, p := range newPeers {
|
||||
err := s.createStream(p)
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Error("failed to create stream")
|
||||
continue
|
||||
}
|
||||
s.log.Debug("reading stream for", zap.String("id", p.Id()))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamChecker) CheckPeerConnection(peerId string) (err error) {
|
||||
if s.streamPool.HasActiveStream(peerId) {
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
configuration = s.connector.Configuration()
|
||||
pool = s.connector.Pool()
|
||||
)
|
||||
nodeIds := configuration.NodeIds(s.spaceId)
|
||||
// we don't know the address of the peer
|
||||
if !slices.Contains(nodeIds, peerId) {
|
||||
err = fmt.Errorf("don't know the address of peer %s", peerId)
|
||||
return
|
||||
}
|
||||
|
||||
newPeer, err := pool.Dial(s.syncCtx, peerId)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.createStream(newPeer)
|
||||
}
|
||||
|
||||
func (s *streamChecker) createStream(p peer.Peer) (err error) {
|
||||
stream, err := s.clientFactory.Client(p).ObjectSyncStream(s.syncCtx)
|
||||
if err != nil {
|
||||
// so here probably the request is failed because there is no such space,
|
||||
// but diffService should handle such cases by sending pushSpace
|
||||
err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err))
|
||||
return
|
||||
}
|
||||
|
||||
// sending empty message for the server to understand from which space is it coming
|
||||
err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId})
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err))
|
||||
return
|
||||
}
|
||||
err = s.streamPool.AddAndReadStreamAsync(stream)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("failed to read from stream async: %w", err)
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) {
|
||||
nodeIds := s.connector.Configuration().NodeIds(s.spaceId)
|
||||
for _, nodeId := range nodeIds {
|
||||
if s.streamPool.HasActiveStream(nodeId) {
|
||||
peerId = nodeId
|
||||
return
|
||||
}
|
||||
}
|
||||
err = fmt.Errorf("no responsible peers are connected")
|
||||
return
|
||||
}
|
||||
@ -1,332 +0,0 @@
|
||||
package objectsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/app/ocache"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"go.uber.org/zap"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
var ErrEmptyPeer = errors.New("don't have such a peer")
|
||||
var ErrStreamClosed = errors.New("stream is already closed")
|
||||
|
||||
var maxStreamReaders = 10
|
||||
var syncWaitPeriod = 2 * time.Second
|
||||
|
||||
var ErrSyncTimeout = errors.New("too long wait on sync receive")
|
||||
|
||||
// StreamPool can be made generic to work with different streams
|
||||
type StreamPool interface {
|
||||
ocache.ObjectLastUsage
|
||||
AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error)
|
||||
AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error)
|
||||
|
||||
SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error)
|
||||
SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
|
||||
HasActiveStream(peerId string) bool
|
||||
Close() (err error)
|
||||
}
|
||||
|
||||
type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error)
|
||||
|
||||
type responseWaiter struct {
|
||||
ch chan *spacesyncproto.ObjectSyncMessage
|
||||
}
|
||||
|
||||
type streamPool struct {
|
||||
sync.Mutex
|
||||
peerStreams map[string]spacesyncproto.ObjectSyncStream
|
||||
messageHandler MessageHandler
|
||||
wg *sync.WaitGroup
|
||||
waiters map[string]responseWaiter
|
||||
waitersMx sync.Mutex
|
||||
counter atomic.Uint64
|
||||
lastUsage atomic.Int64
|
||||
}
|
||||
|
||||
func newStreamPool(messageHandler MessageHandler) StreamPool {
|
||||
s := &streamPool{
|
||||
peerStreams: make(map[string]spacesyncproto.ObjectSyncStream),
|
||||
messageHandler: messageHandler,
|
||||
waiters: make(map[string]responseWaiter),
|
||||
wg: &sync.WaitGroup{},
|
||||
}
|
||||
s.lastUsage.Store(time.Now().Unix())
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *streamPool) LastUsage() time.Time {
|
||||
return time.Unix(s.lastUsage.Load(), 0)
|
||||
}
|
||||
|
||||
func (s *streamPool) HasActiveStream(peerId string) (res bool) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
_, err := s.getOrDeleteStream(peerId)
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (s *streamPool) SendSync(
|
||||
peerId string,
|
||||
msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) {
|
||||
newCounter := s.counter.Add(1)
|
||||
msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter)
|
||||
|
||||
s.waitersMx.Lock()
|
||||
waiter := responseWaiter{
|
||||
ch: make(chan *spacesyncproto.ObjectSyncMessage, 1),
|
||||
}
|
||||
s.waiters[msg.ReplyId] = waiter
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
err = s.SendAsync([]string{peerId}, msg)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
delay := time.NewTimer(syncWaitPeriod)
|
||||
select {
|
||||
case <-delay.C:
|
||||
s.waitersMx.Lock()
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Error("time elapsed when waiting")
|
||||
err = ErrSyncTimeout
|
||||
case reply = <-waiter.ch:
|
||||
if !delay.Stop() {
|
||||
<-delay.C
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
s.lastUsage.Store(time.Now().Unix())
|
||||
getStreams := func() (streams []spacesyncproto.ObjectSyncStream) {
|
||||
for _, pId := range peers {
|
||||
stream, err := s.getOrDeleteStream(pId)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
streams = append(streams, stream)
|
||||
}
|
||||
return streams
|
||||
}
|
||||
|
||||
s.Lock()
|
||||
streams := getStreams()
|
||||
s.Unlock()
|
||||
|
||||
log.With(zap.String("objectId", message.ObjectId), zap.Int("existing peers len", len(streams)), zap.Strings("wanted peers", peers)).
|
||||
Debug("sending message to peers")
|
||||
for _, stream := range streams {
|
||||
err = stream.Send(message)
|
||||
if err != nil {
|
||||
log.Debug("error sending message to stream", zap.Error(err))
|
||||
}
|
||||
}
|
||||
if len(peers) != 1 {
|
||||
err = nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) {
|
||||
stream, exists := s.peerStreams[id]
|
||||
if !exists {
|
||||
err = ErrEmptyPeer
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
delete(s.peerStreams, id)
|
||||
err = ErrStreamClosed
|
||||
default:
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
Loop:
|
||||
for id, stream := range s.peerStreams {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
delete(s.peerStreams, id)
|
||||
continue Loop
|
||||
default:
|
||||
break
|
||||
}
|
||||
log.With(zap.String("id", id)).Debug("getting peer stream")
|
||||
streams = append(streams, stream)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
streams := s.getAllStreams()
|
||||
log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))).
|
||||
Debug("broadcasting message to peers")
|
||||
for _, stream := range streams {
|
||||
if err = stream.Send(message); err != nil {
|
||||
log.Debug("error sending message to stream", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
peerId, err := s.addStream(stream)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
go s.readPeerLoop(peerId, stream)
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
peerId, err := s.addStream(stream)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
return s.readPeerLoop(peerId, stream)
|
||||
}
|
||||
|
||||
func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) {
|
||||
s.Lock()
|
||||
peerId, err = peer.CtxPeerId(stream.Context())
|
||||
if err != nil {
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
log.With(zap.String("peer id", peerId)).Debug("adding stream")
|
||||
|
||||
if oldStream, ok := s.peerStreams[peerId]; ok {
|
||||
s.Unlock()
|
||||
oldStream.Close()
|
||||
s.Lock()
|
||||
log.With(zap.String("peer id", peerId)).Debug("closed old stream before adding")
|
||||
}
|
||||
|
||||
s.peerStreams[peerId] = stream
|
||||
s.wg.Add(1)
|
||||
s.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
func (s *streamPool) Close() (err error) {
|
||||
s.Lock()
|
||||
wg := s.wg
|
||||
s.Unlock()
|
||||
streams := s.getAllStreams()
|
||||
|
||||
log.Debug("closing streams on lock")
|
||||
for _, stream := range streams {
|
||||
stream.Close()
|
||||
}
|
||||
log.Debug("closed streams")
|
||||
|
||||
if wg != nil {
|
||||
wg.Wait()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
var (
|
||||
log = log.With(zap.String("peerId", peerId))
|
||||
queue = NewDefaultActionQueue()
|
||||
)
|
||||
queue.Run()
|
||||
|
||||
defer func() {
|
||||
log.Debug("stopped reading stream from peer")
|
||||
s.removePeer(peerId, stream)
|
||||
queue.Close()
|
||||
s.wg.Done()
|
||||
}()
|
||||
|
||||
log.Debug("started reading stream from peer")
|
||||
|
||||
stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool {
|
||||
s.waitersMx.Lock()
|
||||
waiter, exists := s.waiters[msg.ReplyId]
|
||||
if exists {
|
||||
delete(s.waiters, msg.ReplyId)
|
||||
s.waitersMx.Unlock()
|
||||
waiter.ch <- msg
|
||||
return true
|
||||
}
|
||||
s.waitersMx.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
process := func(msg *spacesyncproto.ObjectSyncMessage) error {
|
||||
log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId))
|
||||
log.Debug("getting message with reply id")
|
||||
err = s.messageHandler(stream.Context(), peerId, msg)
|
||||
if err != nil {
|
||||
log.With(zap.Error(err)).Debug("message handling failed")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stream.Context().Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
var msg *spacesyncproto.ObjectSyncMessage
|
||||
msg, err = stream.Recv()
|
||||
s.lastUsage.Store(time.Now().Unix())
|
||||
if err != nil {
|
||||
stream.Close()
|
||||
return
|
||||
}
|
||||
|
||||
if msg.ReplyId != "" {
|
||||
// then we can send it directly to waiters without adding to queue or starting a reader
|
||||
if stopWaiter(msg) {
|
||||
continue
|
||||
}
|
||||
log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist")
|
||||
}
|
||||
|
||||
queue.Send(func() error {
|
||||
return process(msg)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
mapStream, ok := s.peerStreams[peerId]
|
||||
if !ok {
|
||||
return ErrEmptyPeer
|
||||
}
|
||||
|
||||
// it can be the case that the stream was already replaced
|
||||
if mapStream == stream {
|
||||
delete(s.peerStreams, peerId)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func genStreamPoolKey(peerId, treeId string, counter uint64) string {
|
||||
return fmt.Sprintf("%s.%s.%d", peerId, treeId, counter)
|
||||
}
|
||||
@ -1,299 +0,0 @@
|
||||
package objectsync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/rpc/rpctest"
|
||||
"github.com/stretchr/testify/require"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
stream chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
|
||||
releaseStream chan error
|
||||
watchErrOnce bool
|
||||
}
|
||||
|
||||
func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testServer) SpacePush(ctx context.Context, request *spacesyncproto.SpacePushRequest) (*spacesyncproto.SpacePushResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testServer) SpacePull(ctx context.Context, request *spacesyncproto.SpacePullRequest) (*spacesyncproto.SpacePullResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (t *testServer) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error {
|
||||
t.stream <- stream
|
||||
return <-t.releaseStream
|
||||
}
|
||||
|
||||
func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream {
|
||||
select {
|
||||
case <-time.After(time.Second * 5):
|
||||
test.Fatalf("waiteStream timeout")
|
||||
case st := <-t.stream:
|
||||
return st
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type fixture struct {
|
||||
testServer *testServer
|
||||
drpcTS *rpctest.TesServer
|
||||
client spacesyncproto.DRPCSpaceSyncClient
|
||||
clientStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
|
||||
serverStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream
|
||||
pool *streamPool
|
||||
clientId string
|
||||
serverId string
|
||||
}
|
||||
|
||||
func newFixture(t *testing.T, clientId, serverId string, handler MessageHandler) *fixture {
|
||||
fx := &fixture{
|
||||
testServer: &testServer{},
|
||||
drpcTS: rpctest.NewTestServer(),
|
||||
clientId: clientId,
|
||||
serverId: serverId,
|
||||
}
|
||||
fx.testServer.stream = make(chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream, 1)
|
||||
require.NoError(t, spacesyncproto.DRPCRegisterSpaceSync(fx.drpcTS.Mux, fx.testServer))
|
||||
fx.client = spacesyncproto.NewDRPCSpaceSyncClient(fx.drpcTS.Dial(peer.CtxWithPeerId(context.Background(), clientId)))
|
||||
|
||||
var err error
|
||||
fx.clientStream, err = fx.client.ObjectSyncStream(peer.CtxWithPeerId(context.Background(), serverId))
|
||||
require.NoError(t, err)
|
||||
fx.serverStream = fx.testServer.waitStream(t)
|
||||
fx.pool = newStreamPool(handler).(*streamPool)
|
||||
|
||||
return fx
|
||||
}
|
||||
|
||||
func (fx *fixture) run(t *testing.T) chan error {
|
||||
waitCh := make(chan error)
|
||||
go func() {
|
||||
err := fx.pool.AddAndReadStreamSync(fx.clientStream)
|
||||
waitCh <- err
|
||||
}()
|
||||
|
||||
time.Sleep(time.Millisecond * 10)
|
||||
fx.pool.Lock()
|
||||
require.Equal(t, fx.pool.peerStreams[fx.serverId], fx.clientStream)
|
||||
fx.pool.Unlock()
|
||||
|
||||
return waitCh
|
||||
}
|
||||
|
||||
func TestStreamPool_AddAndReadStreamAsync(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
|
||||
t.Run("client close", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
waitCh := fx.run(t)
|
||||
|
||||
err := fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
t.Run("server close", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
waitCh := fx.run(t)
|
||||
|
||||
err := fx.serverStream.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
err = <-waitCh
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPool_Close(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
|
||||
t.Run("close", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
fx.run(t)
|
||||
fx.pool.Close()
|
||||
select {
|
||||
case <-fx.clientStream.Context().Done():
|
||||
break
|
||||
case <-time.After(time.Millisecond * 100):
|
||||
t.Fatal("context should be closed")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPool_ReceiveMessage(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
t.Run("pool receive message from server", func(t *testing.T) {
|
||||
objectId := "objectId"
|
||||
msg := &spacesyncproto.ObjectSyncMessage{
|
||||
ObjectId: objectId,
|
||||
}
|
||||
recvChan := make(chan struct{})
|
||||
fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) {
|
||||
require.Equal(t, msg, message)
|
||||
recvChan <- struct{}{}
|
||||
return nil
|
||||
})
|
||||
waitCh := fx.run(t)
|
||||
|
||||
err := fx.serverStream.Send(msg)
|
||||
require.NoError(t, err)
|
||||
<-recvChan
|
||||
err = fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPool_HasActiveStream(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
t.Run("pool has active stream", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
waitCh := fx.run(t)
|
||||
require.True(t, fx.pool.HasActiveStream(remId))
|
||||
|
||||
err := fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
t.Run("pool has no active stream", func(t *testing.T) {
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
waitCh := fx.run(t)
|
||||
err := fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
require.Error(t, err)
|
||||
require.False(t, fx.pool.HasActiveStream(remId))
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPool_SendAsync(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
t.Run("pool send async to server", func(t *testing.T) {
|
||||
objectId := "objectId"
|
||||
msg := &spacesyncproto.ObjectSyncMessage{
|
||||
ObjectId: objectId,
|
||||
}
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
recvChan := make(chan struct{})
|
||||
go func() {
|
||||
message, err := fx.serverStream.Recv()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msg, message)
|
||||
recvChan <- struct{}{}
|
||||
}()
|
||||
waitCh := fx.run(t)
|
||||
|
||||
err := fx.pool.SendAsync([]string{remId}, msg)
|
||||
require.NoError(t, err)
|
||||
<-recvChan
|
||||
err = fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPool_SendSync(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
t.Run("pool send sync to server", func(t *testing.T) {
|
||||
objectId := "objectId"
|
||||
payload := []byte("payload")
|
||||
msg := &spacesyncproto.ObjectSyncMessage{
|
||||
ObjectId: objectId,
|
||||
}
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
go func() {
|
||||
message, err := fx.serverStream.Recv()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msg.ObjectId, message.ObjectId)
|
||||
require.NotEmpty(t, message.ReplyId)
|
||||
message.Payload = payload
|
||||
err = fx.serverStream.Send(message)
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
waitCh := fx.run(t)
|
||||
res, err := fx.pool.SendSync(remId, msg)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, payload, res.Payload)
|
||||
err = fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
|
||||
t.Run("pool send sync timeout", func(t *testing.T) {
|
||||
objectId := "objectId"
|
||||
msg := &spacesyncproto.ObjectSyncMessage{
|
||||
ObjectId: objectId,
|
||||
}
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
syncWaitPeriod = time.Millisecond * 30
|
||||
go func() {
|
||||
message, err := fx.serverStream.Recv()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msg.ObjectId, message.ObjectId)
|
||||
require.NotEmpty(t, message.ReplyId)
|
||||
}()
|
||||
waitCh := fx.run(t)
|
||||
_, err := fx.pool.SendSync(remId, msg)
|
||||
require.Equal(t, ErrSyncTimeout, err)
|
||||
err = fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
}
|
||||
|
||||
func TestStreamPool_BroadcastAsync(t *testing.T) {
|
||||
remId := "remoteId"
|
||||
t.Run("pool broadcast async to server", func(t *testing.T) {
|
||||
objectId := "objectId"
|
||||
msg := &spacesyncproto.ObjectSyncMessage{
|
||||
ObjectId: objectId,
|
||||
}
|
||||
fx := newFixture(t, "", remId, nil)
|
||||
recvChan := make(chan struct{})
|
||||
go func() {
|
||||
message, err := fx.serverStream.Recv()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, msg, message)
|
||||
recvChan <- struct{}{}
|
||||
}()
|
||||
waitCh := fx.run(t)
|
||||
|
||||
err := fx.pool.BroadcastAsync(msg)
|
||||
require.NoError(t, err)
|
||||
<-recvChan
|
||||
err = fx.clientStream.Close()
|
||||
require.NoError(t, err)
|
||||
err = <-waitCh
|
||||
|
||||
require.Error(t, err)
|
||||
require.Nil(t, fx.pool.peerStreams[remId])
|
||||
})
|
||||
}
|
||||
@ -1,24 +0,0 @@
|
||||
package commonspace
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
)
|
||||
|
||||
type RpcHandler interface {
|
||||
HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error)
|
||||
Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error
|
||||
}
|
||||
|
||||
type rpcHandler struct {
|
||||
s *space
|
||||
}
|
||||
|
||||
func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) {
|
||||
return r.s.HeadSync().HandleRangeRequest(ctx, req)
|
||||
}
|
||||
|
||||
func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) (err error) {
|
||||
// TODO: if needed we can launch full sync here
|
||||
return r.s.ObjectSync().StreamPool().AddAndReadStreamSync(stream)
|
||||
}
|
||||
@ -81,8 +81,6 @@ type Space interface {
|
||||
DebugAllHeads() []headsync.TreeHeads
|
||||
Description() (SpaceDescription, error)
|
||||
|
||||
SpaceSyncRpc() RpcHandler
|
||||
|
||||
DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
|
||||
CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error)
|
||||
PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error)
|
||||
@ -90,6 +88,7 @@ type Space interface {
|
||||
DeleteTree(ctx context.Context, id string) (err error)
|
||||
|
||||
HeadSync() headsync.HeadSync
|
||||
ObjectSync() objectsync.ObjectSync
|
||||
SyncStatus() syncstatus.StatusUpdater
|
||||
Storage() spacestorage.SpaceStorage
|
||||
|
||||
@ -101,8 +100,6 @@ type space struct {
|
||||
mu sync.RWMutex
|
||||
header *spacesyncproto.RawSpaceHeaderWithId
|
||||
|
||||
rpc *rpcHandler
|
||||
|
||||
objectSync objectsync.ObjectSync
|
||||
headSync headsync.HeadSync
|
||||
syncStatus syncstatus.StatusUpdater
|
||||
@ -161,7 +158,6 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
return
|
||||
}
|
||||
s.header = header
|
||||
s.rpc = &rpcHandler{s: s}
|
||||
initialIds, err := s.storage.StoredIds()
|
||||
if err != nil {
|
||||
return
|
||||
@ -174,7 +170,7 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.StreamPool())
|
||||
s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool())
|
||||
|
||||
deletionState := deletionstate.NewDeletionState(s.storage)
|
||||
deps := settings.Deps{
|
||||
@ -208,10 +204,6 @@ func (s *space) Init(ctx context.Context) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *space) SpaceSyncRpc() RpcHandler {
|
||||
return s.rpc
|
||||
}
|
||||
|
||||
func (s *space) ObjectSync() objectsync.ObjectSync {
|
||||
return s.objectSync
|
||||
}
|
||||
|
||||
@ -13,6 +13,7 @@ import (
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacestorage"
|
||||
"github.com/anytypeio/any-sync/commonspace/spacesyncproto"
|
||||
"github.com/anytypeio/any-sync/commonspace/streammanager"
|
||||
"github.com/anytypeio/any-sync/commonspace/syncstatus"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/pool"
|
||||
@ -39,12 +40,13 @@ type SpaceService interface {
|
||||
}
|
||||
|
||||
type spaceService struct {
|
||||
config Config
|
||||
account accountservice.Service
|
||||
configurationService nodeconf.Service
|
||||
storageProvider spacestorage.SpaceStorageProvider
|
||||
treeGetter treegetter.TreeGetter
|
||||
pool pool.Pool
|
||||
config Config
|
||||
account accountservice.Service
|
||||
configurationService nodeconf.Service
|
||||
storageProvider spacestorage.SpaceStorageProvider
|
||||
streamManagerProvider streammanager.StreamManagerProvider
|
||||
treeGetter treegetter.TreeGetter
|
||||
pool pool.Pool
|
||||
}
|
||||
|
||||
func (s *spaceService) Init(a *app.App) (err error) {
|
||||
@ -53,6 +55,7 @@ func (s *spaceService) Init(a *app.App) (err error) {
|
||||
s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider)
|
||||
s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service)
|
||||
s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter)
|
||||
s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider)
|
||||
s.pool = a.MustComponent(pool.CName).(pool.Pool)
|
||||
return nil
|
||||
}
|
||||
@ -123,8 +126,15 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) {
|
||||
syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st))
|
||||
}
|
||||
|
||||
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, syncStatus, log)
|
||||
objectSync := objectsync.NewObjectSync(id, confConnector)
|
||||
// TODO: [che] remove *5
|
||||
headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log)
|
||||
|
||||
streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objectSync := objectsync.NewObjectSync(streamManager, id)
|
||||
sp := &space{
|
||||
id: id,
|
||||
objectSync: objectSync,
|
||||
|
||||
14
commonspace/streammanager/streammanager.go
Normal file
14
commonspace/streammanager/streammanager.go
Normal file
@ -0,0 +1,14 @@
|
||||
package streammanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/anytypeio/any-sync/app"
|
||||
"github.com/anytypeio/any-sync/commonspace/objectsync"
|
||||
)
|
||||
|
||||
const CName = "common.commonspace.streammanager"
|
||||
|
||||
type StreamManagerProvider interface {
|
||||
app.Component
|
||||
NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error)
|
||||
}
|
||||
@ -1,6 +1,7 @@
|
||||
package streampool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"go.uber.org/zap"
|
||||
"storj.io/drpc"
|
||||
"sync/atomic"
|
||||
@ -17,6 +18,9 @@ type stream struct {
|
||||
}
|
||||
|
||||
func (sr *stream) write(msg drpc.Message) (err error) {
|
||||
defer func() {
|
||||
sr.l.Debug("write", zap.String("msg", msg.(fmt.Stringer).String()), zap.Error(err))
|
||||
}()
|
||||
if err = sr.stream.MsgSend(msg, EncodingProto); err != nil {
|
||||
sr.l.Info("stream write error", zap.Error(err))
|
||||
sr.streamClose()
|
||||
@ -24,7 +28,7 @@ func (sr *stream) write(msg drpc.Message) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
func (sr *stream) readLoop() {
|
||||
func (sr *stream) readLoop() error {
|
||||
defer func() {
|
||||
sr.streamClose()
|
||||
}()
|
||||
@ -32,7 +36,12 @@ func (sr *stream) readLoop() {
|
||||
msg := sr.pool.handler.NewReadMessage()
|
||||
if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil {
|
||||
sr.l.Info("msg receive error", zap.Error(err))
|
||||
return
|
||||
return err
|
||||
}
|
||||
sr.l.Debug("read msg", zap.String("msg", msg.(fmt.Stringer).String()))
|
||||
if err := sr.pool.handler.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil {
|
||||
sr.l.Info("msg handle error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
package streampool
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/anytypeio/any-sync/net/peer"
|
||||
"github.com/anytypeio/any-sync/net/pool"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
"golang.org/x/net/context"
|
||||
@ -21,10 +23,14 @@ type StreamHandler interface {
|
||||
|
||||
// StreamPool keeps and read streams
|
||||
type StreamPool interface {
|
||||
// AddStream adds new incoming stream into the pool
|
||||
// AddStream adds new outgoing stream into the pool
|
||||
AddStream(peerId string, stream drpc.Stream, tags ...string)
|
||||
// ReadStream adds new incoming stream and synchronously read it
|
||||
ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error)
|
||||
// Send sends a message to given peers. A stream will be opened if it is not cached before. Works async.
|
||||
Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error)
|
||||
// SendById sends a message to given peerIds. Works only if stream exists
|
||||
SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error)
|
||||
// Broadcast sends a message to all peers with given tags. Works async.
|
||||
Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error)
|
||||
// Close closes all streams
|
||||
@ -42,7 +48,19 @@ type streamPool struct {
|
||||
lastStreamId uint32
|
||||
}
|
||||
|
||||
func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error {
|
||||
st := s.addStream(peerId, drpcStream, tags...)
|
||||
return st.readLoop()
|
||||
}
|
||||
|
||||
func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) {
|
||||
st := s.addStream(peerId, drpcStream, tags...)
|
||||
go func() {
|
||||
_ = st.readLoop()
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.lastStreamId++
|
||||
@ -60,7 +78,8 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st
|
||||
for _, tag := range tags {
|
||||
s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId)
|
||||
}
|
||||
go st.readLoop()
|
||||
st.l.Debug("stream added", zap.Strings("tags", st.tags))
|
||||
return st
|
||||
}
|
||||
|
||||
func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) {
|
||||
@ -75,6 +94,30 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P
|
||||
return s.exec.Add(ctx, funcs...)
|
||||
}
|
||||
|
||||
func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) {
|
||||
s.mu.Lock()
|
||||
var streams []*stream
|
||||
for _, peerId := range peerIds {
|
||||
for _, streamId := range s.streamIdsByPeer[peerId] {
|
||||
streams = append(streams, s.streams[streamId])
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
log.Debug("sendById", zap.String("msg", msg.(fmt.Stringer).String()), zap.Int("streams", len(streams)))
|
||||
var funcs []func()
|
||||
for _, st := range streams {
|
||||
funcs = append(funcs, func() {
|
||||
if e := st.write(msg); e != nil {
|
||||
log.Debug("sendById write error", zap.Error(e))
|
||||
}
|
||||
})
|
||||
}
|
||||
if len(funcs) == 0 {
|
||||
return pool.ErrUnableToConnect
|
||||
}
|
||||
return s.exec.Add(ctx, funcs...)
|
||||
}
|
||||
|
||||
func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) {
|
||||
// get all streams relates to peer
|
||||
streams, err := s.getStreams(ctx, p)
|
||||
@ -164,6 +207,9 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st
|
||||
}
|
||||
})
|
||||
}
|
||||
if len(funcs) == 0 {
|
||||
return
|
||||
}
|
||||
return s.exec.Add(ctx, funcs...)
|
||||
}
|
||||
|
||||
@ -195,6 +241,7 @@ func (s *streamPool) removeStream(streamId uint32) {
|
||||
}
|
||||
|
||||
delete(s.streams, streamId)
|
||||
st.l.Debug("stream removed", zap.Strings("tags", st.tags))
|
||||
}
|
||||
|
||||
func (s *streamPool) Close() (err error) {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user