From 79d81662ce01bc9c8e3042f52f4ef783617a2c5a Mon Sep 17 00:00:00 2001 From: Sergey Cherepanov Date: Thu, 19 Jan 2023 15:17:04 +0300 Subject: [PATCH] commonspace with new streampool --- commonspace/headsync/diffsyncer.go | 7 +- commonspace/object/acl/syncacl/syncacl.go | 4 +- .../synctree/mock_synctree/mock_synctree.go | 40 +-- .../object/tree/synctree/queuedclient.go | 13 +- .../object/tree/synctree/syncclient.go | 51 ++- commonspace/object/tree/synctree/synctree.go | 32 +- .../object/tree/synctree/synctreehandler.go | 8 +- commonspace/objectsync/msgpool.go | 120 +++++++ commonspace/objectsync/objectsync.go | 41 +-- commonspace/objectsync/streamchecker.go | 146 -------- commonspace/objectsync/streampool.go | 332 ------------------ commonspace/objectsync/streampool_test.go | 299 ---------------- commonspace/rpchandler.go | 24 -- commonspace/space.go | 12 +- commonspace/spaceservice.go | 26 +- commonspace/streammanager/streammanager.go | 14 + net/streampool/stream.go | 13 +- net/streampool/streampool.go | 51 ++- 18 files changed, 294 insertions(+), 939 deletions(-) create mode 100644 commonspace/objectsync/msgpool.go delete mode 100644 commonspace/objectsync/streamchecker.go delete mode 100644 commonspace/objectsync/streampool.go delete mode 100644 commonspace/objectsync/streampool_test.go delete mode 100644 commonspace/rpchandler.go create mode 100644 commonspace/streammanager/streammanager.go diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index c58cb58e..76a3cae9 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -2,6 +2,7 @@ package headsync import ( "context" + "fmt" "github.com/anytypeio/any-sync/app/ldiff" "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/synctree" @@ -91,7 +92,7 @@ func (d *diffSyncer) Sync(ctx context.Context) error { return err } for _, p := range peers { - if err := d.syncWithPeer(ctx, p); err != nil { + if err = d.syncWithPeer(ctx, p); err != nil { d.log.Error("can't sync with peer", zap.String("peer", p.Id()), zap.Error(err)) } } @@ -110,7 +111,7 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) err = rpcerr.Unwrap(err) if err != nil && err != spacesyncproto.ErrSpaceMissing { d.syncStatus.SetNodesOnline(p.Id(), false) - return err + return fmt.Errorf("diff error: %v", err) } d.syncStatus.SetNodesOnline(p.Id(), true) @@ -148,7 +149,7 @@ func (d *diffSyncer) pingTreesInCache(ctx context.Context, trees []string) { // it may be already there (i.e. loaded) // and build func will not be called, thus we won't sync the tree // therefore we just do it manually - syncTree.Ping() + _ = syncTree.Ping(ctx) } } diff --git a/commonspace/object/acl/syncacl/syncacl.go b/commonspace/object/acl/syncacl/syncacl.go index 2a29cd0c..d18fea0e 100644 --- a/commonspace/object/acl/syncacl/syncacl.go +++ b/commonspace/object/acl/syncacl/syncacl.go @@ -9,10 +9,10 @@ import ( type SyncAcl struct { list.AclList synchandler.SyncHandler - streamPool objectsync.StreamPool + streamPool objectsync.MessagePool } -func NewSyncAcl(aclList list.AclList, streamPool objectsync.StreamPool) *SyncAcl { +func NewSyncAcl(aclList list.AclList, streamPool objectsync.MessagePool) *SyncAcl { return &SyncAcl{ AclList: aclList, SyncHandler: nil, diff --git a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go index 1ca1e659..abdbfb10 100644 --- a/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go +++ b/commonspace/object/tree/synctree/mock_synctree/mock_synctree.go @@ -38,32 +38,32 @@ func (m *MockSyncClient) EXPECT() *MockSyncClientMockRecorder { return m.recorder } -// BroadcastAsync mocks base method. -func (m *MockSyncClient) BroadcastAsync(arg0 *treechangeproto.TreeSyncMessage) error { +// Broadcast mocks base method. +func (m *MockSyncClient) Broadcast(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsync", arg0) + ret := m.ctrl.Call(m, "Broadcast", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } -// BroadcastAsync indicates an expected call of BroadcastAsync. -func (mr *MockSyncClientMockRecorder) BroadcastAsync(arg0 interface{}) *gomock.Call { +// Broadcast indicates an expected call of Broadcast. +func (mr *MockSyncClientMockRecorder) Broadcast(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsync", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsync), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Broadcast", reflect.TypeOf((*MockSyncClient)(nil).Broadcast), arg0, arg1) } // BroadcastAsyncOrSendResponsible mocks base method. -func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 *treechangeproto.TreeSyncMessage) error { +func (m *MockSyncClient) BroadcastAsyncOrSendResponsible(arg0 context.Context, arg1 *treechangeproto.TreeSyncMessage) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0) + ret := m.ctrl.Call(m, "BroadcastAsyncOrSendResponsible", arg0, arg1) ret0, _ := ret[0].(error) return ret0 } // BroadcastAsyncOrSendResponsible indicates an expected call of BroadcastAsyncOrSendResponsible. -func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0 interface{}) *gomock.Call { +func (mr *MockSyncClientMockRecorder) BroadcastAsyncOrSendResponsible(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastAsyncOrSendResponsible", reflect.TypeOf((*MockSyncClient)(nil).BroadcastAsyncOrSendResponsible), arg0, arg1) } // CreateFullSyncRequest mocks base method. @@ -124,18 +124,18 @@ func (mr *MockSyncClientMockRecorder) CreateNewTreeRequest() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateNewTreeRequest", reflect.TypeOf((*MockSyncClient)(nil).CreateNewTreeRequest)) } -// SendAsync mocks base method. -func (m *MockSyncClient) SendAsync(arg0 string, arg1 *treechangeproto.TreeSyncMessage, arg2 string) error { +// SendWithReply mocks base method. +func (m *MockSyncClient) SendWithReply(arg0 context.Context, arg1 string, arg2 *treechangeproto.TreeSyncMessage, arg3 string) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SendAsync", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "SendWithReply", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(error) return ret0 } -// SendAsync indicates an expected call of SendAsync. -func (mr *MockSyncClientMockRecorder) SendAsync(arg0, arg1, arg2 interface{}) *gomock.Call { +// SendWithReply indicates an expected call of SendWithReply. +func (mr *MockSyncClientMockRecorder) SendWithReply(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendAsync", reflect.TypeOf((*MockSyncClient)(nil).SendAsync), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendWithReply", reflect.TypeOf((*MockSyncClient)(nil).SendWithReply), arg0, arg1, arg2, arg3) } // MockSyncTree is a mock of SyncTree interface. @@ -364,17 +364,17 @@ func (mr *MockSyncTreeMockRecorder) Lock() *gomock.Call { } // Ping mocks base method. -func (m *MockSyncTree) Ping() error { +func (m *MockSyncTree) Ping(arg0 context.Context) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Ping") + ret := m.ctrl.Call(m, "Ping", arg0) ret0, _ := ret[0].(error) return ret0 } // Ping indicates an expected call of Ping. -func (mr *MockSyncTreeMockRecorder) Ping() *gomock.Call { +func (mr *MockSyncTreeMockRecorder) Ping(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockSyncTree)(nil).Ping), arg0) } // RLock mocks base method. diff --git a/commonspace/object/tree/synctree/queuedclient.go b/commonspace/object/tree/synctree/queuedclient.go index 5af9d95e..49ea3922 100644 --- a/commonspace/object/tree/synctree/queuedclient.go +++ b/commonspace/object/tree/synctree/queuedclient.go @@ -1,6 +1,7 @@ package synctree import ( + "context" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/objectsync" ) @@ -17,20 +18,20 @@ func newQueuedClient(client SyncClient, queue objectsync.ActionQueue) SyncClient } } -func (q *queuedClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { +func (q *queuedClient) Broadcast(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { return q.queue.Send(func() error { - return q.SyncClient.BroadcastAsync(message) + return q.SyncClient.Broadcast(ctx, message) }) } -func (q *queuedClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { +func (q *queuedClient) SendWithReply(ctx context.Context, peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { return q.queue.Send(func() error { - return q.SyncClient.SendAsync(peerId, message, replyId) + return q.SyncClient.SendWithReply(ctx, peerId, message, replyId) }) } -func (q *queuedClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { +func (q *queuedClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { return q.queue.Send(func() error { - return q.SyncClient.BroadcastAsyncOrSendResponsible(message) + return q.SyncClient.BroadcastAsyncOrSendResponsible(ctx, message) }) } diff --git a/commonspace/object/tree/synctree/syncclient.go b/commonspace/object/tree/synctree/syncclient.go index a610e2b5..5c7f2e2f 100644 --- a/commonspace/object/tree/synctree/syncclient.go +++ b/commonspace/object/tree/synctree/syncclient.go @@ -2,6 +2,7 @@ package synctree import ( + "context" "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/objectsync" @@ -11,72 +12,61 @@ import ( type SyncClient interface { RequestFactory - BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) - BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) - SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) + Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) + BroadcastAsyncOrSendResponsible(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) + SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) } type syncClient struct { - objectsync.StreamPool + objectsync.MessagePool RequestFactory spaceId string connector confconnector.ConfConnector configuration nodeconf.Configuration - - checker objectsync.StreamChecker } func newSyncClient( spaceId string, - pool objectsync.StreamPool, + pool objectsync.MessagePool, factory RequestFactory, - configuration nodeconf.Configuration, - checker objectsync.StreamChecker) SyncClient { + configuration nodeconf.Configuration) SyncClient { return &syncClient{ - StreamPool: pool, + MessagePool: pool, RequestFactory: factory, configuration: configuration, - checker: checker, spaceId: spaceId, } } -func (s *syncClient) BroadcastAsync(message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") +func (s *syncClient) Broadcast(ctx context.Context, msg *treechangeproto.TreeSyncMessage) (err error) { + objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, "") if err != nil { return } - s.checker.CheckResponsiblePeers() - return s.StreamPool.BroadcastAsync(objMsg) + return s.MessagePool.Broadcast(ctx, objMsg) } -func (s *syncClient) SendAsync(peerId string, message *treechangeproto.TreeSyncMessage, replyId string) (err error) { - err = s.checker.CheckPeerConnection(peerId) +func (s *syncClient) SendWithReply(ctx context.Context, peerId string, msg *treechangeproto.TreeSyncMessage, replyId string) (err error) { + objMsg, err := marshallTreeMessage(msg, s.spaceId, msg.RootChange.Id, replyId) if err != nil { return } - - objMsg, err := marshallTreeMessage(message, message.RootChange.Id, replyId) - if err != nil { - return - } - return s.StreamPool.SendAsync([]string{peerId}, objMsg) + return s.MessagePool.SendPeer(ctx, peerId, objMsg) } -func (s *syncClient) BroadcastAsyncOrSendResponsible(message *treechangeproto.TreeSyncMessage) (err error) { - objMsg, err := marshallTreeMessage(message, message.RootChange.Id, "") +func (s *syncClient) BroadcastAsyncOrSendResponsible(ctx context.Context, message *treechangeproto.TreeSyncMessage) (err error) { + objMsg, err := marshallTreeMessage(message, s.spaceId, message.RootChange.Id, "") if err != nil { return } if s.configuration.IsResponsible(s.spaceId) { - s.checker.CheckResponsiblePeers() - return s.StreamPool.SendAsync(s.configuration.NodeIds(s.spaceId), objMsg) + return s.MessagePool.SendResponsible(ctx, objMsg) } - return s.BroadcastAsync(message) + return s.Broadcast(ctx, message) } -func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { +func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, spaceId, objectId, replyId string) (objMsg *spacesyncproto.ObjectSyncMessage, err error) { payload, err := message.Marshal() if err != nil { return @@ -84,7 +74,8 @@ func marshallTreeMessage(message *treechangeproto.TreeSyncMessage, id, replyId s objMsg = &spacesyncproto.ObjectSyncMessage{ ReplyId: replyId, Payload: payload, - ObjectId: id, + ObjectId: objectId, + SpaceId: spaceId, } return } diff --git a/commonspace/object/tree/synctree/synctree.go b/commonspace/object/tree/synctree/synctree.go index c0f4c462..2b7e27f6 100644 --- a/commonspace/object/tree/synctree/synctree.go +++ b/commonspace/object/tree/synctree/synctree.go @@ -33,7 +33,7 @@ type HeadNotifiable interface { type SyncTree interface { objecttree.ObjectTree synchandler.SyncHandler - Ping() (err error) + Ping(ctx context.Context) (err error) } // SyncTree sends head updates to sync service and also sends new changes to update listener @@ -73,34 +73,24 @@ func newWrappedSyncClient( factory RequestFactory, objectSync objectsync.ObjectSync, configuration nodeconf.Configuration) SyncClient { - syncClient := newSyncClient(spaceId, objectSync.StreamPool(), factory, configuration, objectSync.StreamChecker()) + syncClient := newSyncClient(spaceId, objectSync.MessagePool(), factory, configuration) return newQueuedClient(syncClient, objectSync.ActionQueue()) } func BuildSyncTreeOrGetRemote(ctx context.Context, id string, deps BuildDeps) (t SyncTree, err error) { getTreeRemote := func() (msg *treechangeproto.TreeSyncMessage, err error) { - streamChecker := deps.ObjectSync.StreamChecker() peerId, err := peer.CtxPeerId(ctx) if err != nil { - streamChecker.CheckResponsiblePeers() - peerId, err = streamChecker.FirstResponsiblePeer() - if err != nil { - return - } + peerId = deps.Configuration.NodeIds(deps.SpaceId)[0] } newTreeRequest := GetRequestFactory().CreateNewTreeRequest() - objMsg, err := marshallTreeMessage(newTreeRequest, id, "") + objMsg, err := marshallTreeMessage(newTreeRequest, deps.SpaceId, id, "") if err != nil { return } - err = deps.ObjectSync.StreamChecker().CheckPeerConnection(peerId) - if err != nil { - return - } - - resp, err := deps.ObjectSync.StreamPool().SendSync(peerId, objMsg) + resp, err := deps.ObjectSync.MessagePool().SendSync(ctx, peerId, objMsg) if err != nil { return } @@ -213,7 +203,9 @@ func buildSyncTree(ctx context.Context, isFirstBuild bool, deps BuildDeps) (t Sy if isFirstBuild { headUpdate := syncTree.syncClient.CreateHeadUpdate(t, nil) // send to everybody, because everybody should know that the node or client got new tree - syncTree.syncClient.BroadcastAsync(headUpdate) + if e := syncTree.syncClient.Broadcast(ctx, headUpdate); e != nil { + log.Error("broadcast error", zap.Error(e)) + } } return } @@ -245,7 +237,7 @@ func (s *syncTree) AddContent(ctx context.Context, content objecttree.SignableCh } s.syncStatus.HeadsChange(s.Id(), res.Heads) headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - err = s.syncClient.BroadcastAsync(headUpdate) + err = s.syncClient.Broadcast(ctx, headUpdate) return } @@ -272,7 +264,7 @@ func (s *syncTree) AddRawChanges(ctx context.Context, changesPayload objecttree. s.notifiable.UpdateHeads(s.Id(), res.Heads) } headUpdate := s.syncClient.CreateHeadUpdate(s, res.Added) - err = s.syncClient.BroadcastAsync(headUpdate) + err = s.syncClient.Broadcast(ctx, headUpdate) } return } @@ -314,11 +306,11 @@ func (s *syncTree) checkAlive() (err error) { return } -func (s *syncTree) Ping() (err error) { +func (s *syncTree) Ping(ctx context.Context) (err error) { s.Lock() defer s.Unlock() headUpdate := s.syncClient.CreateHeadUpdate(s, nil) - return s.syncClient.BroadcastAsyncOrSendResponsible(headUpdate) + return s.syncClient.BroadcastAsyncOrSendResponsible(ctx, headUpdate) } func (s *syncTree) afterBuild() { diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index 1bfa7b34..11dbb235 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -111,7 +111,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendAsync(senderId, fullRequest, replyId) + return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId) } if s.alreadyHasHeads(objTree, update.Heads) { @@ -135,7 +135,7 @@ func (s *syncTreeHandler) handleHeadUpdate( return } - return s.syncClient.SendAsync(senderId, fullRequest, replyId) + return s.syncClient.SendWithReply(ctx, senderId, fullRequest, replyId) } func (s *syncTreeHandler) handleFullSyncRequest( @@ -159,7 +159,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( if err != nil { log.With(zap.Error(err)).Debug("full sync request finished with error") - s.syncClient.SendAsync(senderId, treechangeproto.WrapError(err, header), replyId) + s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId) return } else if fullResponse != nil { log.Debug("full sync response sent") @@ -180,7 +180,7 @@ func (s *syncTreeHandler) handleFullSyncRequest( return } - return s.syncClient.SendAsync(senderId, fullResponse, replyId) + return s.syncClient.SendWithReply(ctx, senderId, fullResponse, replyId) } func (s *syncTreeHandler) handleFullSyncResponse( diff --git a/commonspace/objectsync/msgpool.go b/commonspace/objectsync/msgpool.go new file mode 100644 index 00000000..fc629e5e --- /dev/null +++ b/commonspace/objectsync/msgpool.go @@ -0,0 +1,120 @@ +package objectsync + +import ( + "context" + "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "go.uber.org/zap" + "strconv" + "strings" + "sync" + "sync/atomic" +) + +type StreamManager interface { + SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) + SendResponsible(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) + Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) +} + +// MessagePool can be made generic to work with different streams +type MessagePool interface { + synchandler.SyncHandler + StreamManager + SendSync(ctx context.Context, peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) +} + +type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) + +type responseWaiter struct { + ch chan *spacesyncproto.ObjectSyncMessage +} + +type messagePool struct { + sync.Mutex + StreamManager + messageHandler MessageHandler + waiters map[string]responseWaiter + waitersMx sync.Mutex + counter atomic.Uint64 + queue ActionQueue +} + +func newMessagePool(streamManager StreamManager, messageHandler MessageHandler) MessagePool { + s := &messagePool{ + StreamManager: streamManager, + messageHandler: messageHandler, + waiters: make(map[string]responseWaiter), + queue: NewDefaultActionQueue(), + } + return s +} + +func (s *messagePool) SendSync(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { + newCounter := s.counter.Add(1) + msg.ReplyId = genReplyKey(peerId, msg.ObjectId, newCounter) + + s.waitersMx.Lock() + waiter := responseWaiter{ + ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), + } + s.waiters[msg.ReplyId] = waiter + s.waitersMx.Unlock() + + err = s.SendPeer(ctx, peerId, msg) + if err != nil { + return + } + select { + case <-ctx.Done(): + s.waitersMx.Lock() + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + + log.With(zap.String("replyId", msg.ReplyId)).Info("time elapsed when waiting") + err = ctx.Err() + case reply = <-waiter.ch: + // success + } + return +} + +func (s *messagePool) HandleMessage(ctx context.Context, senderId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + if msg.ReplyId != "" { + // we got reply, send it to waiter + if s.stopWaiter(msg) { + return + } + log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") + return + } + return s.queue.Send(func() error { + if e := s.messageHandler(ctx, senderId, msg); e != nil { + log.Info("handle message error", zap.Error(e)) + } + return nil + }) +} + +func (s *messagePool) stopWaiter(msg *spacesyncproto.ObjectSyncMessage) bool { + s.waitersMx.Lock() + waiter, exists := s.waiters[msg.ReplyId] + if exists { + delete(s.waiters, msg.ReplyId) + s.waitersMx.Unlock() + waiter.ch <- msg + return true + } + s.waitersMx.Unlock() + return false +} + +func genReplyKey(peerId, treeId string, counter uint64) string { + b := &strings.Builder{} + b.WriteString(peerId) + b.WriteString(".") + b.WriteString(treeId) + b.WriteString(".") + b.WriteString(strconv.FormatUint(counter, 36)) + return b.String() +} diff --git a/commonspace/objectsync/objectsync.go b/commonspace/objectsync/objectsync.go index 5a02f8f7..d9c9cc27 100644 --- a/commonspace/objectsync/objectsync.go +++ b/commonspace/objectsync/objectsync.go @@ -5,7 +5,6 @@ import ( "context" "github.com/anytypeio/any-sync/app/logger" "github.com/anytypeio/any-sync/app/ocache" - "github.com/anytypeio/any-sync/commonspace/confconnector" "github.com/anytypeio/any-sync/commonspace/object/syncobjectgetter" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" @@ -18,8 +17,7 @@ var log = logger.NewNamed("commonspace.objectsync") type ObjectSync interface { ocache.ObjectLastUsage synchandler.SyncHandler - StreamPool() StreamPool - StreamChecker() StreamChecker + MessagePool() MessagePool ActionQueue() ActionQueue Init(getter syncobjectgetter.SyncObjectGetter) @@ -29,8 +27,7 @@ type ObjectSync interface { type objectSync struct { spaceId string - streamPool StreamPool - checker StreamChecker + streamPool MessagePool objectGetter syncobjectgetter.SyncObjectGetter actionQueue ActionQueue @@ -38,26 +35,14 @@ type objectSync struct { cancelSync context.CancelFunc } -func NewObjectSync( - spaceId string, - confConnector confconnector.ConfConnector) (objectSync ObjectSync) { - streamPool := newStreamPool(func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { +func NewObjectSync(streamManager StreamManager, spaceId string) (objectSync ObjectSync) { + msgPool := newMessagePool(streamManager, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { return objectSync.HandleMessage(ctx, senderId, message) }) - clientFactory := spacesyncproto.ClientFactoryFunc(spacesyncproto.NewDRPCSpaceSyncClient) - syncLog := log.With(zap.String("id", spaceId)) syncCtx, cancel := context.WithCancel(context.Background()) - checker := NewStreamChecker( - spaceId, - confConnector, - streamPool, - clientFactory, - syncCtx, - syncLog) objectSync = newObjectSync( spaceId, - streamPool, - checker, + msgPool, syncCtx, cancel) return @@ -65,15 +50,13 @@ func NewObjectSync( func newObjectSync( spaceId string, - streamPool StreamPool, - checker StreamChecker, + streamPool MessagePool, syncCtx context.Context, cancel context.CancelFunc, ) *objectSync { return &objectSync{ streamPool: streamPool, spaceId: spaceId, - checker: checker, syncCtx: syncCtx, cancelSync: cancel, actionQueue: NewDefaultActionQueue(), @@ -83,17 +66,17 @@ func newObjectSync( func (s *objectSync) Init(objectGetter syncobjectgetter.SyncObjectGetter) { s.objectGetter = objectGetter s.actionQueue.Run() - go s.checker.CheckResponsiblePeers() } func (s *objectSync) Close() (err error) { s.actionQueue.Close() s.cancelSync() - return s.streamPool.Close() + return } func (s *objectSync) LastUsage() time.Time { - return s.streamPool.LastUsage() + // TODO: [che] + return time.Now() } func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { @@ -105,14 +88,10 @@ func (s *objectSync) HandleMessage(ctx context.Context, senderId string, message return obj.HandleMessage(ctx, senderId, message) } -func (s *objectSync) StreamPool() StreamPool { +func (s *objectSync) MessagePool() MessagePool { return s.streamPool } -func (s *objectSync) StreamChecker() StreamChecker { - return s.checker -} - func (s *objectSync) ActionQueue() ActionQueue { return s.actionQueue } diff --git a/commonspace/objectsync/streamchecker.go b/commonspace/objectsync/streamchecker.go deleted file mode 100644 index 5bf53e5e..00000000 --- a/commonspace/objectsync/streamchecker.go +++ /dev/null @@ -1,146 +0,0 @@ -package objectsync - -import ( - "context" - "fmt" - "github.com/anytypeio/any-sync/commonspace/confconnector" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/rpc/rpcerr" - "go.uber.org/atomic" - "go.uber.org/zap" - "golang.org/x/exp/slices" - "time" -) - -type StreamChecker interface { - CheckResponsiblePeers() - CheckPeerConnection(peerId string) (err error) - FirstResponsiblePeer() (peerId string, err error) -} - -type streamChecker struct { - spaceId string - connector confconnector.ConfConnector - streamPool StreamPool - clientFactory spacesyncproto.ClientFactory - log *zap.Logger - syncCtx context.Context - lastCheck *atomic.Time -} - -const streamCheckerInterval = time.Second * 5 - -func NewStreamChecker( - spaceId string, - connector confconnector.ConfConnector, - streamPool StreamPool, - clientFactory spacesyncproto.ClientFactory, - syncCtx context.Context, - log *zap.Logger) StreamChecker { - return &streamChecker{ - spaceId: spaceId, - connector: connector, - streamPool: streamPool, - clientFactory: clientFactory, - log: log, - syncCtx: syncCtx, - lastCheck: atomic.NewTime(time.Time{}), - } -} - -func (s *streamChecker) CheckResponsiblePeers() { - lastCheck := s.lastCheck.Load() - now := time.Now() - if lastCheck.Add(streamCheckerInterval).After(now) { - return - } - s.lastCheck.Store(now) - - var ( - activeNodeIds []string - configuration = s.connector.Configuration() - ) - nodeIds := configuration.NodeIds(s.spaceId) - for _, nodeId := range nodeIds { - if s.streamPool.HasActiveStream(nodeId) { - s.log.Debug("has active stream for", zap.String("id", nodeId)) - activeNodeIds = append(activeNodeIds, nodeId) - continue - } - } - s.log.Debug("total streams", zap.Int("total", len(activeNodeIds))) - newPeers, err := s.connector.DialInactiveResponsiblePeers(s.syncCtx, s.spaceId, activeNodeIds) - if err != nil { - s.log.Error("failed to dial peers", zap.Error(err)) - return - } - - for _, p := range newPeers { - err := s.createStream(p) - if err != nil { - log.With(zap.Error(err)).Error("failed to create stream") - continue - } - s.log.Debug("reading stream for", zap.String("id", p.Id())) - } - return -} - -func (s *streamChecker) CheckPeerConnection(peerId string) (err error) { - if s.streamPool.HasActiveStream(peerId) { - return - } - - var ( - configuration = s.connector.Configuration() - pool = s.connector.Pool() - ) - nodeIds := configuration.NodeIds(s.spaceId) - // we don't know the address of the peer - if !slices.Contains(nodeIds, peerId) { - err = fmt.Errorf("don't know the address of peer %s", peerId) - return - } - - newPeer, err := pool.Dial(s.syncCtx, peerId) - if err != nil { - return - } - return s.createStream(newPeer) -} - -func (s *streamChecker) createStream(p peer.Peer) (err error) { - stream, err := s.clientFactory.Client(p).ObjectSyncStream(s.syncCtx) - if err != nil { - // so here probably the request is failed because there is no such space, - // but diffService should handle such cases by sending pushSpace - err = fmt.Errorf("failed to open stream: %w", rpcerr.Unwrap(err)) - return - } - - // sending empty message for the server to understand from which space is it coming - err = stream.Send(&spacesyncproto.ObjectSyncMessage{SpaceId: s.spaceId}) - if err != nil { - err = fmt.Errorf("failed to send first message to stream: %w", rpcerr.Unwrap(err)) - return - } - err = s.streamPool.AddAndReadStreamAsync(stream) - if err != nil { - err = fmt.Errorf("failed to read from stream async: %w", err) - return - } - return -} - -func (s *streamChecker) FirstResponsiblePeer() (peerId string, err error) { - nodeIds := s.connector.Configuration().NodeIds(s.spaceId) - for _, nodeId := range nodeIds { - if s.streamPool.HasActiveStream(nodeId) { - peerId = nodeId - return - } - } - err = fmt.Errorf("no responsible peers are connected") - return -} diff --git a/commonspace/objectsync/streampool.go b/commonspace/objectsync/streampool.go deleted file mode 100644 index 2948dee8..00000000 --- a/commonspace/objectsync/streampool.go +++ /dev/null @@ -1,332 +0,0 @@ -package objectsync - -import ( - "context" - "errors" - "fmt" - "github.com/anytypeio/any-sync/app/ocache" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/net/peer" - "go.uber.org/zap" - "sync" - "sync/atomic" - "time" -) - -var ErrEmptyPeer = errors.New("don't have such a peer") -var ErrStreamClosed = errors.New("stream is already closed") - -var maxStreamReaders = 10 -var syncWaitPeriod = 2 * time.Second - -var ErrSyncTimeout = errors.New("too long wait on sync receive") - -// StreamPool can be made generic to work with different streams -type StreamPool interface { - ocache.ObjectLastUsage - AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) - AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) - - SendSync(peerId string, message *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) - SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) - BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) - - HasActiveStream(peerId string) bool - Close() (err error) -} - -type MessageHandler func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) - -type responseWaiter struct { - ch chan *spacesyncproto.ObjectSyncMessage -} - -type streamPool struct { - sync.Mutex - peerStreams map[string]spacesyncproto.ObjectSyncStream - messageHandler MessageHandler - wg *sync.WaitGroup - waiters map[string]responseWaiter - waitersMx sync.Mutex - counter atomic.Uint64 - lastUsage atomic.Int64 -} - -func newStreamPool(messageHandler MessageHandler) StreamPool { - s := &streamPool{ - peerStreams: make(map[string]spacesyncproto.ObjectSyncStream), - messageHandler: messageHandler, - waiters: make(map[string]responseWaiter), - wg: &sync.WaitGroup{}, - } - s.lastUsage.Store(time.Now().Unix()) - return s -} - -func (s *streamPool) LastUsage() time.Time { - return time.Unix(s.lastUsage.Load(), 0) -} - -func (s *streamPool) HasActiveStream(peerId string) (res bool) { - s.Lock() - defer s.Unlock() - _, err := s.getOrDeleteStream(peerId) - return err == nil -} - -func (s *streamPool) SendSync( - peerId string, - msg *spacesyncproto.ObjectSyncMessage) (reply *spacesyncproto.ObjectSyncMessage, err error) { - newCounter := s.counter.Add(1) - msg.ReplyId = genStreamPoolKey(peerId, msg.ObjectId, newCounter) - - s.waitersMx.Lock() - waiter := responseWaiter{ - ch: make(chan *spacesyncproto.ObjectSyncMessage, 1), - } - s.waiters[msg.ReplyId] = waiter - s.waitersMx.Unlock() - - err = s.SendAsync([]string{peerId}, msg) - if err != nil { - return - } - delay := time.NewTimer(syncWaitPeriod) - select { - case <-delay.C: - s.waitersMx.Lock() - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - - log.With(zap.String("replyId", msg.ReplyId)).Error("time elapsed when waiting") - err = ErrSyncTimeout - case reply = <-waiter.ch: - if !delay.Stop() { - <-delay.C - } - } - return -} - -func (s *streamPool) SendAsync(peers []string, message *spacesyncproto.ObjectSyncMessage) (err error) { - s.lastUsage.Store(time.Now().Unix()) - getStreams := func() (streams []spacesyncproto.ObjectSyncStream) { - for _, pId := range peers { - stream, err := s.getOrDeleteStream(pId) - if err != nil { - continue - } - streams = append(streams, stream) - } - return streams - } - - s.Lock() - streams := getStreams() - s.Unlock() - - log.With(zap.String("objectId", message.ObjectId), zap.Int("existing peers len", len(streams)), zap.Strings("wanted peers", peers)). - Debug("sending message to peers") - for _, stream := range streams { - err = stream.Send(message) - if err != nil { - log.Debug("error sending message to stream", zap.Error(err)) - } - } - if len(peers) != 1 { - err = nil - } - return err -} - -func (s *streamPool) getOrDeleteStream(id string) (stream spacesyncproto.ObjectSyncStream, err error) { - stream, exists := s.peerStreams[id] - if !exists { - err = ErrEmptyPeer - return - } - - select { - case <-stream.Context().Done(): - delete(s.peerStreams, id) - err = ErrStreamClosed - default: - } - - return -} - -func (s *streamPool) getAllStreams() (streams []spacesyncproto.ObjectSyncStream) { - s.Lock() - defer s.Unlock() -Loop: - for id, stream := range s.peerStreams { - select { - case <-stream.Context().Done(): - delete(s.peerStreams, id) - continue Loop - default: - break - } - log.With(zap.String("id", id)).Debug("getting peer stream") - streams = append(streams, stream) - } - - return -} - -func (s *streamPool) BroadcastAsync(message *spacesyncproto.ObjectSyncMessage) (err error) { - streams := s.getAllStreams() - log.With(zap.String("objectId", message.ObjectId), zap.Int("peers", len(streams))). - Debug("broadcasting message to peers") - for _, stream := range streams { - if err = stream.Send(message); err != nil { - log.Debug("error sending message to stream", zap.Error(err)) - } - } - - return nil -} - -func (s *streamPool) AddAndReadStreamAsync(stream spacesyncproto.ObjectSyncStream) (err error) { - peerId, err := s.addStream(stream) - if err != nil { - return - } - go s.readPeerLoop(peerId, stream) - return -} - -func (s *streamPool) AddAndReadStreamSync(stream spacesyncproto.ObjectSyncStream) (err error) { - peerId, err := s.addStream(stream) - if err != nil { - return - } - return s.readPeerLoop(peerId, stream) -} - -func (s *streamPool) addStream(stream spacesyncproto.ObjectSyncStream) (peerId string, err error) { - s.Lock() - peerId, err = peer.CtxPeerId(stream.Context()) - if err != nil { - s.Unlock() - return - } - log.With(zap.String("peer id", peerId)).Debug("adding stream") - - if oldStream, ok := s.peerStreams[peerId]; ok { - s.Unlock() - oldStream.Close() - s.Lock() - log.With(zap.String("peer id", peerId)).Debug("closed old stream before adding") - } - - s.peerStreams[peerId] = stream - s.wg.Add(1) - s.Unlock() - return -} - -func (s *streamPool) Close() (err error) { - s.Lock() - wg := s.wg - s.Unlock() - streams := s.getAllStreams() - - log.Debug("closing streams on lock") - for _, stream := range streams { - stream.Close() - } - log.Debug("closed streams") - - if wg != nil { - wg.Wait() - } - return nil -} - -func (s *streamPool) readPeerLoop(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { - var ( - log = log.With(zap.String("peerId", peerId)) - queue = NewDefaultActionQueue() - ) - queue.Run() - - defer func() { - log.Debug("stopped reading stream from peer") - s.removePeer(peerId, stream) - queue.Close() - s.wg.Done() - }() - - log.Debug("started reading stream from peer") - - stopWaiter := func(msg *spacesyncproto.ObjectSyncMessage) bool { - s.waitersMx.Lock() - waiter, exists := s.waiters[msg.ReplyId] - if exists { - delete(s.waiters, msg.ReplyId) - s.waitersMx.Unlock() - waiter.ch <- msg - return true - } - s.waitersMx.Unlock() - return false - } - - process := func(msg *spacesyncproto.ObjectSyncMessage) error { - log := log.With(zap.String("replyId", msg.ReplyId), zap.String("object id", msg.ObjectId)) - log.Debug("getting message with reply id") - err = s.messageHandler(stream.Context(), peerId, msg) - if err != nil { - log.With(zap.Error(err)).Debug("message handling failed") - } - return nil - } - - for { - select { - case <-stream.Context().Done(): - return - default: - } - var msg *spacesyncproto.ObjectSyncMessage - msg, err = stream.Recv() - s.lastUsage.Store(time.Now().Unix()) - if err != nil { - stream.Close() - return - } - - if msg.ReplyId != "" { - // then we can send it directly to waiters without adding to queue or starting a reader - if stopWaiter(msg) { - continue - } - log.With(zap.String("replyId", msg.ReplyId)).Debug("reply id does not exist") - } - - queue.Send(func() error { - return process(msg) - }) - } -} - -func (s *streamPool) removePeer(peerId string, stream spacesyncproto.ObjectSyncStream) (err error) { - s.Lock() - defer s.Unlock() - mapStream, ok := s.peerStreams[peerId] - if !ok { - return ErrEmptyPeer - } - - // it can be the case that the stream was already replaced - if mapStream == stream { - delete(s.peerStreams, peerId) - } - return -} - -func genStreamPoolKey(peerId, treeId string, counter uint64) string { - return fmt.Sprintf("%s.%s.%d", peerId, treeId, counter) -} diff --git a/commonspace/objectsync/streampool_test.go b/commonspace/objectsync/streampool_test.go deleted file mode 100644 index 056d11de..00000000 --- a/commonspace/objectsync/streampool_test.go +++ /dev/null @@ -1,299 +0,0 @@ -package objectsync - -import ( - "context" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/net/peer" - "github.com/anytypeio/any-sync/net/rpc/rpctest" - "github.com/stretchr/testify/require" - "testing" - "time" -) - -type testServer struct { - stream chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream - releaseStream chan error - watchErrOnce bool -} - -func (t *testServer) HeadSync(ctx context.Context, request *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { - panic("implement me") -} - -func (t *testServer) SpacePush(ctx context.Context, request *spacesyncproto.SpacePushRequest) (*spacesyncproto.SpacePushResponse, error) { - panic("implement me") -} - -func (t *testServer) SpacePull(ctx context.Context, request *spacesyncproto.SpacePullRequest) (*spacesyncproto.SpacePullResponse, error) { - panic("implement me") -} - -func (t *testServer) ObjectSyncStream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error { - t.stream <- stream - return <-t.releaseStream -} - -func (t *testServer) waitStream(test *testing.T) spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream { - select { - case <-time.After(time.Second * 5): - test.Fatalf("waiteStream timeout") - case st := <-t.stream: - return st - } - return nil -} - -type fixture struct { - testServer *testServer - drpcTS *rpctest.TesServer - client spacesyncproto.DRPCSpaceSyncClient - clientStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream - serverStream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream - pool *streamPool - clientId string - serverId string -} - -func newFixture(t *testing.T, clientId, serverId string, handler MessageHandler) *fixture { - fx := &fixture{ - testServer: &testServer{}, - drpcTS: rpctest.NewTestServer(), - clientId: clientId, - serverId: serverId, - } - fx.testServer.stream = make(chan spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream, 1) - require.NoError(t, spacesyncproto.DRPCRegisterSpaceSync(fx.drpcTS.Mux, fx.testServer)) - fx.client = spacesyncproto.NewDRPCSpaceSyncClient(fx.drpcTS.Dial(peer.CtxWithPeerId(context.Background(), clientId))) - - var err error - fx.clientStream, err = fx.client.ObjectSyncStream(peer.CtxWithPeerId(context.Background(), serverId)) - require.NoError(t, err) - fx.serverStream = fx.testServer.waitStream(t) - fx.pool = newStreamPool(handler).(*streamPool) - - return fx -} - -func (fx *fixture) run(t *testing.T) chan error { - waitCh := make(chan error) - go func() { - err := fx.pool.AddAndReadStreamSync(fx.clientStream) - waitCh <- err - }() - - time.Sleep(time.Millisecond * 10) - fx.pool.Lock() - require.Equal(t, fx.pool.peerStreams[fx.serverId], fx.clientStream) - fx.pool.Unlock() - - return waitCh -} - -func TestStreamPool_AddAndReadStreamAsync(t *testing.T) { - remId := "remoteId" - - t.Run("client close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - - err := fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) - t.Run("server close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - - err := fx.serverStream.Close() - require.NoError(t, err) - - err = <-waitCh - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_Close(t *testing.T) { - remId := "remoteId" - - t.Run("close", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - fx.run(t) - fx.pool.Close() - select { - case <-fx.clientStream.Context().Done(): - break - case <-time.After(time.Millisecond * 100): - t.Fatal("context should be closed") - } - }) -} - -func TestStreamPool_ReceiveMessage(t *testing.T) { - remId := "remoteId" - t.Run("pool receive message from server", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - recvChan := make(chan struct{}) - fx := newFixture(t, "", remId, func(ctx context.Context, senderId string, message *spacesyncproto.ObjectSyncMessage) (err error) { - require.Equal(t, msg, message) - recvChan <- struct{}{} - return nil - }) - waitCh := fx.run(t) - - err := fx.serverStream.Send(msg) - require.NoError(t, err) - <-recvChan - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_HasActiveStream(t *testing.T) { - remId := "remoteId" - t.Run("pool has active stream", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - require.True(t, fx.pool.HasActiveStream(remId)) - - err := fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) - t.Run("pool has no active stream", func(t *testing.T) { - fx := newFixture(t, "", remId, nil) - waitCh := fx.run(t) - err := fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - require.Error(t, err) - require.False(t, fx.pool.HasActiveStream(remId)) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_SendAsync(t *testing.T) { - remId := "remoteId" - t.Run("pool send async to server", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - recvChan := make(chan struct{}) - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg, message) - recvChan <- struct{}{} - }() - waitCh := fx.run(t) - - err := fx.pool.SendAsync([]string{remId}, msg) - require.NoError(t, err) - <-recvChan - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_SendSync(t *testing.T) { - remId := "remoteId" - t.Run("pool send sync to server", func(t *testing.T) { - objectId := "objectId" - payload := []byte("payload") - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg.ObjectId, message.ObjectId) - require.NotEmpty(t, message.ReplyId) - message.Payload = payload - err = fx.serverStream.Send(message) - require.NoError(t, err) - }() - waitCh := fx.run(t) - res, err := fx.pool.SendSync(remId, msg) - require.NoError(t, err) - require.Equal(t, payload, res.Payload) - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) - - t.Run("pool send sync timeout", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - syncWaitPeriod = time.Millisecond * 30 - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg.ObjectId, message.ObjectId) - require.NotEmpty(t, message.ReplyId) - }() - waitCh := fx.run(t) - _, err := fx.pool.SendSync(remId, msg) - require.Equal(t, ErrSyncTimeout, err) - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} - -func TestStreamPool_BroadcastAsync(t *testing.T) { - remId := "remoteId" - t.Run("pool broadcast async to server", func(t *testing.T) { - objectId := "objectId" - msg := &spacesyncproto.ObjectSyncMessage{ - ObjectId: objectId, - } - fx := newFixture(t, "", remId, nil) - recvChan := make(chan struct{}) - go func() { - message, err := fx.serverStream.Recv() - require.NoError(t, err) - require.Equal(t, msg, message) - recvChan <- struct{}{} - }() - waitCh := fx.run(t) - - err := fx.pool.BroadcastAsync(msg) - require.NoError(t, err) - <-recvChan - err = fx.clientStream.Close() - require.NoError(t, err) - err = <-waitCh - - require.Error(t, err) - require.Nil(t, fx.pool.peerStreams[remId]) - }) -} diff --git a/commonspace/rpchandler.go b/commonspace/rpchandler.go deleted file mode 100644 index 4bb33cd9..00000000 --- a/commonspace/rpchandler.go +++ /dev/null @@ -1,24 +0,0 @@ -package commonspace - -import ( - "context" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" -) - -type RpcHandler interface { - HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) - Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) error -} - -type rpcHandler struct { - s *space -} - -func (r *rpcHandler) HeadSync(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) { - return r.s.HeadSync().HandleRangeRequest(ctx, req) -} - -func (r *rpcHandler) Stream(stream spacesyncproto.DRPCSpaceSync_ObjectSyncStreamStream) (err error) { - // TODO: if needed we can launch full sync here - return r.s.ObjectSync().StreamPool().AddAndReadStreamSync(stream) -} diff --git a/commonspace/space.go b/commonspace/space.go index ec944fab..9dda2ece 100644 --- a/commonspace/space.go +++ b/commonspace/space.go @@ -81,8 +81,6 @@ type Space interface { DebugAllHeads() []headsync.TreeHeads Description() (SpaceDescription, error) - SpaceSyncRpc() RpcHandler - DeriveTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) CreateTree(ctx context.Context, payload objecttree.ObjectTreeCreatePayload) (res treestorage.TreeStorageCreatePayload, err error) PutTree(ctx context.Context, payload treestorage.TreeStorageCreatePayload, listener updatelistener.UpdateListener) (t objecttree.ObjectTree, err error) @@ -90,6 +88,7 @@ type Space interface { DeleteTree(ctx context.Context, id string) (err error) HeadSync() headsync.HeadSync + ObjectSync() objectsync.ObjectSync SyncStatus() syncstatus.StatusUpdater Storage() spacestorage.SpaceStorage @@ -101,8 +100,6 @@ type space struct { mu sync.RWMutex header *spacesyncproto.RawSpaceHeaderWithId - rpc *rpcHandler - objectSync objectsync.ObjectSync headSync headsync.HeadSync syncStatus syncstatus.StatusUpdater @@ -161,7 +158,6 @@ func (s *space) Init(ctx context.Context) (err error) { return } s.header = header - s.rpc = &rpcHandler{s: s} initialIds, err := s.storage.StoredIds() if err != nil { return @@ -174,7 +170,7 @@ func (s *space) Init(ctx context.Context) (err error) { if err != nil { return } - s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.StreamPool()) + s.aclList = syncacl.NewSyncAcl(aclList, s.objectSync.MessagePool()) deletionState := deletionstate.NewDeletionState(s.storage) deps := settings.Deps{ @@ -208,10 +204,6 @@ func (s *space) Init(ctx context.Context) (err error) { return nil } -func (s *space) SpaceSyncRpc() RpcHandler { - return s.rpc -} - func (s *space) ObjectSync() objectsync.ObjectSync { return s.objectSync } diff --git a/commonspace/spaceservice.go b/commonspace/spaceservice.go index 664b3e1e..52e44a4a 100644 --- a/commonspace/spaceservice.go +++ b/commonspace/spaceservice.go @@ -13,6 +13,7 @@ import ( "github.com/anytypeio/any-sync/commonspace/objectsync" "github.com/anytypeio/any-sync/commonspace/spacestorage" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "github.com/anytypeio/any-sync/commonspace/streammanager" "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" "github.com/anytypeio/any-sync/net/pool" @@ -39,12 +40,13 @@ type SpaceService interface { } type spaceService struct { - config Config - account accountservice.Service - configurationService nodeconf.Service - storageProvider spacestorage.SpaceStorageProvider - treeGetter treegetter.TreeGetter - pool pool.Pool + config Config + account accountservice.Service + configurationService nodeconf.Service + storageProvider spacestorage.SpaceStorageProvider + streamManagerProvider streammanager.StreamManagerProvider + treeGetter treegetter.TreeGetter + pool pool.Pool } func (s *spaceService) Init(a *app.App) (err error) { @@ -53,6 +55,7 @@ func (s *spaceService) Init(a *app.App) (err error) { s.storageProvider = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorageProvider) s.configurationService = a.MustComponent(nodeconf.CName).(nodeconf.Service) s.treeGetter = a.MustComponent(treegetter.CName).(treegetter.TreeGetter) + s.streamManagerProvider = a.MustComponent(streammanager.CName).(streammanager.StreamManagerProvider) s.pool = a.MustComponent(pool.CName).(pool.Pool) return nil } @@ -123,8 +126,15 @@ func (s *spaceService) NewSpace(ctx context.Context, id string) (Space, error) { syncStatus = syncstatus.NewSyncStatusProvider(st.Id(), syncstatus.DefaultDeps(lastConfiguration, st)) } - headSync := headsync.NewHeadSync(id, s.config.SyncPeriod, st, confConnector, s.treeGetter, syncStatus, log) - objectSync := objectsync.NewObjectSync(id, confConnector) + // TODO: [che] remove *5 + headSync := headsync.NewHeadSync(id, s.config.SyncPeriod*5, st, confConnector, s.treeGetter, syncStatus, log) + + streamManager, err := s.streamManagerProvider.NewStreamManager(ctx, id) + if err != nil { + return nil, err + } + + objectSync := objectsync.NewObjectSync(streamManager, id) sp := &space{ id: id, objectSync: objectSync, diff --git a/commonspace/streammanager/streammanager.go b/commonspace/streammanager/streammanager.go new file mode 100644 index 00000000..5fb15b4c --- /dev/null +++ b/commonspace/streammanager/streammanager.go @@ -0,0 +1,14 @@ +package streammanager + +import ( + "context" + "github.com/anytypeio/any-sync/app" + "github.com/anytypeio/any-sync/commonspace/objectsync" +) + +const CName = "common.commonspace.streammanager" + +type StreamManagerProvider interface { + app.Component + NewStreamManager(ctx context.Context, spaceId string) (sm objectsync.StreamManager, err error) +} diff --git a/net/streampool/stream.go b/net/streampool/stream.go index 4b129949..41e9307a 100644 --- a/net/streampool/stream.go +++ b/net/streampool/stream.go @@ -1,6 +1,7 @@ package streampool import ( + "fmt" "go.uber.org/zap" "storj.io/drpc" "sync/atomic" @@ -17,6 +18,9 @@ type stream struct { } func (sr *stream) write(msg drpc.Message) (err error) { + defer func() { + sr.l.Debug("write", zap.String("msg", msg.(fmt.Stringer).String()), zap.Error(err)) + }() if err = sr.stream.MsgSend(msg, EncodingProto); err != nil { sr.l.Info("stream write error", zap.Error(err)) sr.streamClose() @@ -24,7 +28,7 @@ func (sr *stream) write(msg drpc.Message) (err error) { return err } -func (sr *stream) readLoop() { +func (sr *stream) readLoop() error { defer func() { sr.streamClose() }() @@ -32,7 +36,12 @@ func (sr *stream) readLoop() { msg := sr.pool.handler.NewReadMessage() if err := sr.stream.MsgRecv(msg, EncodingProto); err != nil { sr.l.Info("msg receive error", zap.Error(err)) - return + return err + } + sr.l.Debug("read msg", zap.String("msg", msg.(fmt.Stringer).String())) + if err := sr.pool.handler.HandleMessage(sr.stream.Context(), sr.peerId, msg); err != nil { + sr.l.Info("msg handle error", zap.Error(err)) + return err } } } diff --git a/net/streampool/streampool.go b/net/streampool/streampool.go index 572c29c3..5957e056 100644 --- a/net/streampool/streampool.go +++ b/net/streampool/streampool.go @@ -1,7 +1,9 @@ package streampool import ( + "fmt" "github.com/anytypeio/any-sync/net/peer" + "github.com/anytypeio/any-sync/net/pool" "go.uber.org/zap" "golang.org/x/exp/slices" "golang.org/x/net/context" @@ -21,10 +23,14 @@ type StreamHandler interface { // StreamPool keeps and read streams type StreamPool interface { - // AddStream adds new incoming stream into the pool + // AddStream adds new outgoing stream into the pool AddStream(peerId string, stream drpc.Stream, tags ...string) + // ReadStream adds new incoming stream and synchronously read it + ReadStream(peerId string, stream drpc.Stream, tags ...string) (err error) // Send sends a message to given peers. A stream will be opened if it is not cached before. Works async. Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) + // SendById sends a message to given peerIds. Works only if stream exists + SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) // Broadcast sends a message to all peers with given tags. Works async. Broadcast(ctx context.Context, msg drpc.Message, tags ...string) (err error) // Close closes all streams @@ -42,7 +48,19 @@ type streamPool struct { lastStreamId uint32 } +func (s *streamPool) ReadStream(peerId string, drpcStream drpc.Stream, tags ...string) error { + st := s.addStream(peerId, drpcStream, tags...) + return st.readLoop() +} + func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...string) { + st := s.addStream(peerId, drpcStream, tags...) + go func() { + _ = st.readLoop() + }() +} + +func (s *streamPool) addStream(peerId string, drpcStream drpc.Stream, tags ...string) *stream { s.mu.Lock() defer s.mu.Unlock() s.lastStreamId++ @@ -60,7 +78,8 @@ func (s *streamPool) AddStream(peerId string, drpcStream drpc.Stream, tags ...st for _, tag := range tags { s.streamIdsByTag[tag] = append(s.streamIdsByTag[tag], streamId) } - go st.readLoop() + st.l.Debug("stream added", zap.Strings("tags", st.tags)) + return st } func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.Peer) (err error) { @@ -75,6 +94,30 @@ func (s *streamPool) Send(ctx context.Context, msg drpc.Message, peers ...peer.P return s.exec.Add(ctx, funcs...) } +func (s *streamPool) SendById(ctx context.Context, msg drpc.Message, peerIds ...string) (err error) { + s.mu.Lock() + var streams []*stream + for _, peerId := range peerIds { + for _, streamId := range s.streamIdsByPeer[peerId] { + streams = append(streams, s.streams[streamId]) + } + } + s.mu.Unlock() + log.Debug("sendById", zap.String("msg", msg.(fmt.Stringer).String()), zap.Int("streams", len(streams))) + var funcs []func() + for _, st := range streams { + funcs = append(funcs, func() { + if e := st.write(msg); e != nil { + log.Debug("sendById write error", zap.Error(e)) + } + }) + } + if len(funcs) == 0 { + return pool.ErrUnableToConnect + } + return s.exec.Add(ctx, funcs...) +} + func (s *streamPool) sendOne(ctx context.Context, p peer.Peer, msg drpc.Message) (err error) { // get all streams relates to peer streams, err := s.getStreams(ctx, p) @@ -164,6 +207,9 @@ func (s *streamPool) Broadcast(ctx context.Context, msg drpc.Message, tags ...st } }) } + if len(funcs) == 0 { + return + } return s.exec.Add(ctx, funcs...) } @@ -195,6 +241,7 @@ func (s *streamPool) removeStream(streamId uint32) { } delete(s.streams, streamId) + st.l.Debug("stream removed", zap.Strings("tags", st.tags)) } func (s *streamPool) Close() (err error) {