From e101389af072038743936e96afebd35eec4f1ba1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Thu, 13 Apr 2023 21:32:39 +0200 Subject: [PATCH] Add empty client logic --- .../object/tree/synctree/syncclient_test.go | 120 ++++++++++++++++-- 1 file changed, 111 insertions(+), 9 deletions(-) diff --git a/commonspace/object/tree/synctree/syncclient_test.go b/commonspace/object/tree/synctree/syncclient_test.go index 7feb15d5..086cb883 100644 --- a/commonspace/object/tree/synctree/syncclient_test.go +++ b/commonspace/object/tree/synctree/syncclient_test.go @@ -76,17 +76,59 @@ func (m *messageLog) addMessage(msg processMsg) { type processSyncHandler struct { synchandler.SyncHandler - batcher *mb.MB[processMsg] - peerId string + batcher *mb.MB[processMsg] + peerId string + aclList list.AclList + log *messageLog + syncClient SyncClient +} + +func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { + if p.SyncHandler != nil { + return p.SyncHandler.HandleMessage(ctx, senderId, request) + } + unmarshalled := &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(request.Payload, unmarshalled) + if err != nil { + return + } + if unmarshalled.Content.GetFullSyncResponse() == nil { + newTreeRequest := GetRequestFactory().CreateNewTreeRequest() + var objMsg *spacesyncproto.ObjectSyncMessage + objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + if err != nil { + return + } + return p.manager().SendPeer(context.Background(), senderId, objMsg) + } + fullSyncResponse := unmarshalled.Content.GetFullSyncResponse() + treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, fullSyncResponse.Heads, fullSyncResponse.Changes) + tree, err := createTestTree(p.aclList, treeStorage) + if err != nil { + return + } + netTree := &broadcastTree{ + ObjectTree: tree, + SyncClient: p.syncClient, + } + p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus()) + return } func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { batcher := mb.New[processMsg](0) - return &processSyncHandler{syncHandler, batcher, peerId} + return &processSyncHandler{ + SyncHandler: syncHandler, + batcher: batcher, + peerId: peerId, + } } func (p *processSyncHandler) manager() *processPeerManager { - return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) + if p.SyncHandler != nil { + return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) + } + return p.syncClient.(*syncClient).PeerManager.(*processPeerManager) } func (p *processSyncHandler) tree() *broadcastTree { @@ -118,7 +160,7 @@ func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.Wai p.tree().Unlock() continue } - err = p.SyncHandler.HandleMessage(ctx, res.senderId, res.msg) + err = p.HandleMessage(ctx, res.senderId, res.msg) if err != nil { fmt.Println("error handling message", err.Error()) continue @@ -194,6 +236,20 @@ func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, lo return newProcessSyncHandler(peerId, handler) } +func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler { + factory := GetRequestFactory() + syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) + + batcher := mb.New[processMsg](0) + return &processSyncHandler{ + batcher: batcher, + peerId: peerId, + aclList: aclList, + log: log, + syncClient: syncClient, + } +} + func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage { changeCreator := objecttree.NewMockChangeCreator() st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id) @@ -208,6 +264,7 @@ type fixtureDeps struct { aclList list.AclList initStorage *treestorage.InMemoryTreeStorage connectionMap map[string][]string + emptyTrees []string } type processFixture struct { @@ -227,10 +284,15 @@ func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processF ) for peerId := range deps.connectionMap { - stCopy := deps.initStorage.Copy() - testTree, err := createTestTree(deps.aclList, stCopy) - require.NoError(t, err) - handler := createSyncHandler(peerId, spaceId, testTree, log) + var handler *processSyncHandler + if slices.Contains(deps.emptyTrees, peerId) { + handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log) + } else { + stCopy := deps.initStorage.Copy() + testTree, err := createTestTree(deps.aclList, stCopy) + require.NoError(t, err) + handler = createSyncHandler(peerId, spaceId, testTree, log) + } handlers[peerId] = handler } for peerId, connectionMap := range deps.connectionMap { @@ -260,6 +322,46 @@ func (p *processFixture) stop() { p.wg.Wait() } +func TestSend_EmptyClient(t *testing.T) { + treeId := "treeId" + spaceId := "spaceId" + keys, err := accountdata.NewRandom() + require.NoError(t, err) + aclList, err := list.NewTestDerivedAcl(spaceId, keys) + require.NoError(t, err) + storage := createStorage(treeId, aclList) + changeCreator := objecttree.NewMockChangeCreator() + deps := fixtureDeps{ + aclList: aclList, + initStorage: storage.(*treestorage.InMemoryTreeStorage), + connectionMap: map[string][]string{ + "peer1": []string{"peer2"}, + "peer2": []string{"peer1"}, + }, + emptyTrees: []string{"peer2"}, + } + fx := newProcessFixture(t, spaceId, deps) + fx.run(t) + fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: nil, + RawChanges: []*treechangeproto.RawTreeChangeWithId{ + changeCreator.CreateRaw("1", aclList.Id(), treeId, false, treeId), + }, + }) + time.Sleep(100 * time.Millisecond) + fx.stop() + firstHeads := fx.handlers["peer1"].tree().Heads() + secondHeads := fx.handlers["peer2"].tree().Heads() + slices.Sort(firstHeads) + slices.Sort(secondHeads) + require.Equal(t, firstHeads, secondHeads) + require.Equal(t, []string{"1"}, firstHeads) + logMsgs := fx.log.batcher.GetAll() + for _, msg := range logMsgs { + fmt.Println(msg.description()) + } +} + func TestSimple_TwoPeers(t *testing.T) { treeId := "treeId" spaceId := "spaceId"