From 829159690fc4c91c30c5c405afdf66f4fa1c45c2 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 14 Mar 2023 10:33:59 +0100 Subject: [PATCH] Add more components for async communication --- .../object/tree/synctree/syncclient_test.go | 136 +++++++++++++++--- 1 file changed, 113 insertions(+), 23 deletions(-) diff --git a/commonspace/object/tree/synctree/syncclient_test.go b/commonspace/object/tree/synctree/syncclient_test.go index 14e8cacc..d7617987 100644 --- a/commonspace/object/tree/synctree/syncclient_test.go +++ b/commonspace/object/tree/synctree/syncclient_test.go @@ -2,32 +2,113 @@ package synctree import ( "context" + "fmt" "github.com/anytypeio/any-sync/commonspace/object/acl/list" "github.com/anytypeio/any-sync/commonspace/object/acl/testutils/acllistbuilder" "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" - "github.com/anytypeio/any-sync/commonspace/peermanager" "github.com/anytypeio/any-sync/commonspace/spacesyncproto" "github.com/anytypeio/any-sync/commonspace/syncstatus" "github.com/anytypeio/any-sync/net/peer" - "github.com/stretchr/testify/require" - "testing" + "github.com/cheggaaa/mb/v3" + "sync" ) -type mockPeerManager struct { +type processMsg struct { + msg *spacesyncproto.ObjectSyncMessage + senderId string + receiverId string } -func (m *mockPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - panic("implement me") +type messageLog struct { + batcher *mb.MB[processMsg] } -func (m *mockPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { - panic("implement me") +func newMessageLog() *messageLog { + return &messageLog{batcher: mb.New[processMsg](0)} } -func (m *mockPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { - panic("implement me") +func (m *messageLog) addMessage(msg processMsg) { + m.batcher.Add(context.Background(), msg) +} + +type processSyncHandler struct { + synchandler.SyncHandler + batcher *mb.MB[processMsg] + peerId string +} + +func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { + batcher := mb.New[processMsg](0) + return &processSyncHandler{syncHandler, batcher, peerId} +} + +func (p *processSyncHandler) manager() *processPeerManager { + return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) +} + +func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { + return p.batcher.Add(ctx, msg) +} + +func (p *processSyncHandler) run(ctx context.Context, wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + for { + res, err := p.batcher.WaitOne(ctx) + if err != nil { + return + } + err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg) + if err != nil { + fmt.Println("error handling message", err.Error()) + continue + } + } + }() +} + +type processPeerManager struct { + peerId string + handlers map[string]*processSyncHandler + log *messageLog +} + +func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager { + return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log} +} + +func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) { + m.handlers[peerId] = handler +} + +func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + pMsg := processMsg{ + msg: msg, + senderId: m.peerId, + receiverId: peerId, + } + m.log.addMessage(pMsg) + return m.handlers[peerId].send(context.Background(), pMsg) +} + +func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { + for _, handler := range m.handlers { + pMsg := processMsg{ + msg: msg, + senderId: m.peerId, + receiverId: handler.peerId, + } + m.log.addMessage(pMsg) + handler.send(context.Background(), pMsg) + } + return +} + +func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { + panic("should not be called") } type broadcastTree struct { @@ -45,14 +126,15 @@ func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.Ra return res, nil } -func createSyncHandler(spaceId string, objTree objecttree.ObjectTree, peerManager peermanager.PeerManager) synchandler.SyncHandler { +func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler { factory := GetRequestFactory() - syncClient := newSyncClient(spaceId, peerManager, factory) + syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) netTree := &broadcastTree{ ObjectTree: objTree, SyncClient: syncClient, } - return newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus()) + handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus()) + return newProcessSyncHandler(peerId, handler) } func createAclList() (list.AclList, error) { @@ -73,14 +155,22 @@ func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (obje return objecttree.BuildTestableTree(aclList, storage) } -func TestSyncProtocol(t *testing.T) { - aclList, err := createAclList() - require.NoError(t, err) - treeId := "treeId" - spaceId := "spaceId" - storage := createStorage(treeId, aclList) - testTree, err := createTestTree(aclList, storage) - require.NoError(t, err) - peerManager := &mockPeerManager{} - _ = createSyncHandler(spaceId, testTree, peerManager) +type processFixture struct { + handlers []*processSyncHandler + log *messageLog + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc } + +//func TestSyncProtocol(t *testing.T) { +// aclList, err := createAclList() +// require.NoError(t, err) +// treeId := "treeId" +// spaceId := "spaceId" +// storage := createStorage(treeId, aclList) +// testTree, err := createTestTree(aclList, storage) +// require.NoError(t, err) +// peerManager := &processPeerManager{} +// _ = createSyncHandler(spaceId, testTree, peerManager) +//}