From 125d8b46269fd16dae53dac63d62bbef604026a1 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 17 Apr 2023 21:49:51 +0200 Subject: [PATCH] Add test with random trees --- .../object/tree/synctree/syncclient_test.go | 441 +++-------------- .../object/tree/synctree/synctreehandler.go | 22 +- .../object/tree/synctree/utils_test.go | 446 ++++++++++++++++++ 3 files changed, 518 insertions(+), 391 deletions(-) create mode 100644 commonspace/object/tree/synctree/utils_test.go diff --git a/commonspace/object/tree/synctree/syncclient_test.go b/commonspace/object/tree/synctree/syncclient_test.go index 73db3f42..ed6dd710 100644 --- a/commonspace/object/tree/synctree/syncclient_test.go +++ b/commonspace/object/tree/synctree/syncclient_test.go @@ -8,325 +8,14 @@ import ( "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" - "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" - "github.com/anytypeio/any-sync/commonspace/spacesyncproto" - "github.com/anytypeio/any-sync/commonspace/syncstatus" - "github.com/anytypeio/any-sync/net/peer" - "github.com/cheggaaa/mb/v3" - "github.com/gogo/protobuf/proto" + "github.com/anytypeio/any-sync/util/slice" "github.com/stretchr/testify/require" - "golang.org/x/exp/slices" - "sync" + "math/rand" "testing" "time" ) -type processMsg struct { - msg *spacesyncproto.ObjectSyncMessage - senderId string - receiverId string - userMsg *objecttree.RawChangesPayload -} - -type msgDescription struct { - name string - from string - to string - heads []string - changes []*treechangeproto.RawTreeChangeWithId -} - -func (p *processMsg) description() (descr msgDescription) { - unmarshalled := &treechangeproto.TreeSyncMessage{} - err := proto.Unmarshal(p.msg.Payload, unmarshalled) - if err != nil { - panic(err) - } - descr = msgDescription{ - from: p.senderId, - to: p.receiverId, - } - switch { - case unmarshalled.GetContent().GetHeadUpdate() != nil: - cnt := unmarshalled.GetContent().GetHeadUpdate() - descr.name = "HeadUpdate" - descr.heads = cnt.Heads - descr.changes = unmarshalled.GetContent().GetHeadUpdate().Changes - case unmarshalled.GetContent().GetFullSyncRequest() != nil: - cnt := unmarshalled.GetContent().GetFullSyncRequest() - descr.name = "FullSyncRequest" - descr.heads = cnt.Heads - descr.changes = unmarshalled.GetContent().GetFullSyncRequest().Changes - case unmarshalled.GetContent().GetFullSyncResponse() != nil: - cnt := unmarshalled.GetContent().GetFullSyncResponse() - descr.name = "FullSyncResponse" - descr.heads = cnt.Heads - descr.changes = unmarshalled.GetContent().GetFullSyncResponse().Changes - } - return -} - -type messageLog struct { - batcher *mb.MB[processMsg] -} - -func newMessageLog() *messageLog { - return &messageLog{batcher: mb.New[processMsg](0)} -} - -func (m *messageLog) addMessage(msg processMsg) { - m.batcher.Add(context.Background(), msg) -} - -type processSyncHandler struct { - synchandler.SyncHandler - batcher *mb.MB[processMsg] - peerId string - aclList list.AclList - log *messageLog - syncClient SyncClient -} - -func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { - if p.SyncHandler != nil { - return p.SyncHandler.HandleMessage(ctx, senderId, request) - } - unmarshalled := &treechangeproto.TreeSyncMessage{} - err = proto.Unmarshal(request.Payload, unmarshalled) - if err != nil { - return - } - if unmarshalled.Content.GetFullSyncResponse() == nil { - newTreeRequest := GetRequestFactory().CreateNewTreeRequest() - var objMsg *spacesyncproto.ObjectSyncMessage - objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") - if err != nil { - return - } - return p.manager().SendPeer(context.Background(), senderId, objMsg) - } - fullSyncResponse := unmarshalled.Content.GetFullSyncResponse() - treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, fullSyncResponse.Heads, fullSyncResponse.Changes) - tree, err := createTestTree(p.aclList, treeStorage) - if err != nil { - return - } - netTree := &broadcastTree{ - ObjectTree: tree, - SyncClient: p.syncClient, - } - p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus()) - return -} - -func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { - batcher := mb.New[processMsg](0) - return &processSyncHandler{ - SyncHandler: syncHandler, - batcher: batcher, - peerId: peerId, - } -} - -func (p *processSyncHandler) manager() *processPeerManager { - if p.SyncHandler != nil { - return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) - } - return p.syncClient.(*syncClient).PeerManager.(*processPeerManager) -} - -func (p *processSyncHandler) tree() *broadcastTree { - return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree) -} - -func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { - return p.batcher.Add(ctx, msg) -} - -func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) { - p.batcher.Add(ctx, processMsg{userMsg: &changes}) -} - -func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) { - wg.Add(1) - go func() { - defer wg.Done() - for { - res, err := p.batcher.WaitOne(ctx) - if err != nil { - return - } - if res.userMsg != nil { - p.tree().Lock() - userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg) - require.NoError(t, err) - fmt.Println("user add result", userRes.Heads) - p.tree().Unlock() - continue - } - err = p.HandleMessage(ctx, res.senderId, res.msg) - if err != nil { - fmt.Println("error handling message", err.Error()) - continue - } - } - }() -} - -type processPeerManager struct { - peerId string - handlers map[string]*processSyncHandler - log *messageLog -} - -func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager { - return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log} -} - -func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) { - m.handlers[peerId] = handler -} - -func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - pMsg := processMsg{ - msg: msg, - senderId: m.peerId, - receiverId: peerId, - } - m.log.addMessage(pMsg) - return m.handlers[peerId].send(context.Background(), pMsg) -} - -func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { - for _, handler := range m.handlers { - pMsg := processMsg{ - msg: msg, - senderId: m.peerId, - receiverId: handler.peerId, - } - m.log.addMessage(pMsg) - handler.send(context.Background(), pMsg) - } - return -} - -func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { - panic("should not be called") -} - -type broadcastTree struct { - objecttree.ObjectTree - SyncClient -} - -func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) { - res, err := b.ObjectTree.AddRawChanges(ctx, changes) - if err != nil { - return objecttree.AddResult{}, err - } - upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added) - b.SyncClient.Broadcast(ctx, upd) - return res, nil -} - -func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler { - factory := GetRequestFactory() - syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) - netTree := &broadcastTree{ - ObjectTree: objTree, - SyncClient: syncClient, - } - handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus()) - return newProcessSyncHandler(peerId, handler) -} - -func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler { - factory := GetRequestFactory() - syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) - - batcher := mb.New[processMsg](0) - return &processSyncHandler{ - batcher: batcher, - peerId: peerId, - aclList: aclList, - log: log, - syncClient: syncClient, - } -} - -func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage { - changeCreator := objecttree.NewMockChangeCreator() - st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id) - return st -} - -func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) { - return objecttree.BuildTestableTree(aclList, storage) -} - -type fixtureDeps struct { - aclList list.AclList - initStorage *treestorage.InMemoryTreeStorage - connectionMap map[string][]string - emptyTrees []string -} - -type processFixture struct { - handlers map[string]*processSyncHandler - log *messageLog - wg *sync.WaitGroup - ctx context.Context - cancel context.CancelFunc -} - -func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture { - var ( - handlers = map[string]*processSyncHandler{} - log = newMessageLog() - wg = sync.WaitGroup{} - ctx, cancel = context.WithCancel(context.Background()) - ) - - for peerId := range deps.connectionMap { - var handler *processSyncHandler - if slices.Contains(deps.emptyTrees, peerId) { - handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log) - } else { - stCopy := deps.initStorage.Copy() - testTree, err := createTestTree(deps.aclList, stCopy) - require.NoError(t, err) - handler = createSyncHandler(peerId, spaceId, testTree, log) - } - handlers[peerId] = handler - } - for peerId, connectionMap := range deps.connectionMap { - handler := handlers[peerId] - manager := handler.manager() - for _, connectionId := range connectionMap { - manager.addHandler(connectionId, handlers[connectionId]) - } - } - return &processFixture{ - handlers: handlers, - log: log, - wg: &wg, - ctx: ctx, - cancel: cancel, - } -} - -func (p *processFixture) run(t *testing.T) { - for _, handler := range p.handlers { - handler.run(p.ctx, t, p.wg) - } -} - -func (p *processFixture) stop() { - p.cancel() - p.wg.Wait() -} - -func TestSend_EmptyClientGetsFullHistory(t *testing.T) { +func TestEmptyClientGetsFullHistory(t *testing.T) { treeId := "treeId" spaceId := "spaceId" keys, err := accountdata.NewRandom() @@ -356,70 +45,22 @@ func TestSend_EmptyClientGetsFullHistory(t *testing.T) { fx.stop() firstHeads := fx.handlers["peer1"].tree().Heads() secondHeads := fx.handlers["peer2"].tree().Heads() - slices.Sort(firstHeads) - slices.Sort(secondHeads) - require.Equal(t, firstHeads, secondHeads) + require.True(t, slice.SortedEquals(firstHeads, secondHeads)) require.Equal(t, []string{"1"}, firstHeads) logMsgs := fx.log.batcher.GetAll() - var fullResponseMsg *processMsg + var fullResponseMsg msgDescription for _, msg := range logMsgs { descr := msg.description() if descr.name == "FullSyncResponse" { - fullResponseMsg = &msg + fullResponseMsg = descr } } - require.NotNil(t, fullResponseMsg) // that means that we got not only the last snapshot, but also the first one - require.Len(t, fullResponseMsg.description().changes, 2) + require.Len(t, fullResponseMsg.changes, 2) } -func TestSimple_TwoPeers(t *testing.T) { - treeId := "treeId" - spaceId := "spaceId" - keys, err := accountdata.NewRandom() - require.NoError(t, err) - aclList, err := list.NewTestDerivedAcl(spaceId, keys) - require.NoError(t, err) - storage := createStorage(treeId, aclList) - changeCreator := objecttree.NewMockChangeCreator() - deps := fixtureDeps{ - aclList: aclList, - initStorage: storage.(*treestorage.InMemoryTreeStorage), - connectionMap: map[string][]string{ - "peer1": []string{"peer2"}, - "peer2": []string{"peer1"}, - }, - } - fx := newProcessFixture(t, spaceId, deps) - fx.run(t) - fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ - NewHeads: nil, - RawChanges: []*treechangeproto.RawTreeChangeWithId{ - changeCreator.CreateRaw("1", aclList.Id(), treeId, false, treeId), - }, - }) - fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ - NewHeads: nil, - RawChanges: []*treechangeproto.RawTreeChangeWithId{ - changeCreator.CreateRaw("2", aclList.Id(), treeId, false, treeId), - }, - }) - time.Sleep(100 * time.Millisecond) - fx.stop() - firstHeads := fx.handlers["peer1"].tree().Heads() - secondHeads := fx.handlers["peer2"].tree().Heads() - slices.Sort(firstHeads) - slices.Sort(secondHeads) - require.Equal(t, firstHeads, secondHeads) - require.Equal(t, []string{"1", "2"}, firstHeads) - logMsgs := fx.log.batcher.GetAll() - for _, msg := range logMsgs { - fmt.Println(msg.description()) - } -} - -func TestSimple_ThreePeers(t *testing.T) { +func TestRandomTreeMerge(t *testing.T) { treeId := "treeId" spaceId := "spaceId" keys, err := accountdata.NewRandom() @@ -427,6 +68,21 @@ func TestSimple_ThreePeers(t *testing.T) { aclList, err := list.NewTestDerivedAcl(spaceId, keys) storage := createStorage(treeId, aclList) changeCreator := objecttree.NewMockChangeCreator() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + params := genParams{ + prefix: "peer1", + aclId: aclList.Id(), + startIdx: 0, + levels: 10, + snapshotId: treeId, + prevHeads: []string{treeId}, + isSnapshot: func() bool { + return rnd.Intn(100) > 80 + }, + } + initialRes := genChanges(changeCreator, params) + err = storage.TransactionAdd(initialRes.changes, initialRes.heads) + require.NoError(t, err) deps := fixtureDeps{ aclList: aclList, initStorage: storage.(*treestorage.InMemoryTreeStorage), @@ -435,31 +91,44 @@ func TestSimple_ThreePeers(t *testing.T) { "peer2": []string{"node1"}, "node1": []string{"peer1", "peer2"}, }, + emptyTrees: []string{"peer2", "node1"}, } fx := newProcessFixture(t, spaceId, deps) fx.run(t) fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ - NewHeads: nil, - RawChanges: []*treechangeproto.RawTreeChangeWithId{ - changeCreator.CreateRaw("1", aclList.Id(), treeId, false, treeId), - }, + NewHeads: initialRes.heads, + RawChanges: initialRes.changes, }) - fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ - NewHeads: nil, - RawChanges: []*treechangeproto.RawTreeChangeWithId{ - changeCreator.CreateRaw("2", aclList.Id(), treeId, false, treeId), - }, - }) - time.Sleep(100 * time.Millisecond) - fx.stop() + time.Sleep(1000 * time.Millisecond) firstHeads := fx.handlers["peer1"].tree().Heads() secondHeads := fx.handlers["peer2"].tree().Heads() - slices.Sort(firstHeads) - slices.Sort(secondHeads) - require.Equal(t, firstHeads, secondHeads) - require.Equal(t, []string{"1", "2"}, firstHeads) - logMsgs := fx.log.batcher.GetAll() - for _, msg := range logMsgs { - fmt.Println(msg.description()) + require.True(t, slice.UnsortedEquals(firstHeads, secondHeads)) + params = genParams{ + prefix: "peer1", + aclId: aclList.Id(), + startIdx: 11, + levels: 10, + snapshotId: initialRes.snapshotId, + prevHeads: initialRes.heads, + isSnapshot: func() bool { + return rnd.Intn(100) > 80 + }, } + peer1Res := genChanges(changeCreator, params) + params.prefix = "peer2" + peer2Res := genChanges(changeCreator, params) + fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: peer1Res.heads, + RawChanges: peer1Res.changes, + }) + fx.handlers["peer2"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: peer2Res.heads, + RawChanges: peer2Res.changes, + }) + time.Sleep(1000 * time.Millisecond) + fx.stop() + firstHeads = fx.handlers["peer1"].tree().Heads() + secondHeads = fx.handlers["peer2"].tree().Heads() + fmt.Println(firstHeads) + fmt.Println(secondHeads) } diff --git a/commonspace/object/tree/synctree/synctreehandler.go b/commonspace/object/tree/synctree/synctreehandler.go index de79d0de..a41a2bad 100644 --- a/commonspace/object/tree/synctree/synctreehandler.go +++ b/commonspace/object/tree/synctree/synctreehandler.go @@ -83,13 +83,19 @@ func (s *syncTreeHandler) handleHeadUpdate( objTree = s.objTree ) - log := log.With(zap.Strings("heads", objTree.Heads()), zap.String("treeId", objTree.Id()), zap.String("spaceId", s.spaceId)) + log := log.With( + zap.Strings("update heads", update.Heads), + zap.String("treeId", objTree.Id()), + zap.String("spaceId", s.spaceId), + zap.Int("len(update changes)", len(update.Changes))) log.DebugCtx(ctx, "received head update message") defer func() { if err != nil { log.With(zap.Error(err)).Debug("head update finished with error") } else if fullRequest != nil { + cnt := fullRequest.Content.GetFullSyncRequest() + log = log.With(zap.Strings("request heads", cnt.Heads), zap.Int("len(request changes)", len(cnt.Changes))) log.DebugCtx(ctx, "sending full sync request") } else { if !isEmptyUpdate { @@ -151,19 +157,21 @@ func (s *syncTreeHandler) handleFullSyncRequest( ) log := log.With(zap.String("senderId", senderId), - zap.Strings("heads", request.Heads), + zap.Strings("request heads", request.Heads), zap.String("treeId", s.objTree.Id()), zap.String("replyId", replyId), - zap.String("spaceId", s.spaceId)) + zap.String("spaceId", s.spaceId), + zap.Int("len(request changes)", len(request.Changes))) log.DebugCtx(ctx, "received full sync request message") defer func() { if err != nil { log.With(zap.Error(err)).DebugCtx(ctx, "full sync request finished with error") - s.syncClient.SendWithReply(ctx, senderId, treechangeproto.WrapError(err, header), replyId) return } else if fullResponse != nil { + cnt := fullResponse.Content.GetFullSyncResponse() + log = log.With(zap.Strings("response heads", cnt.Heads), zap.Int("len(response changes)", len(cnt.Changes))) log.DebugCtx(ctx, "full sync response sent") } }() @@ -192,7 +200,11 @@ func (s *syncTreeHandler) handleFullSyncResponse( var ( objTree = s.objTree ) - log := log.With(zap.Strings("heads", response.Heads), zap.String("treeId", s.objTree.Id()), zap.String("spaceId", s.spaceId)) + log := log.With( + zap.Strings("heads", response.Heads), + zap.String("treeId", s.objTree.Id()), + zap.String("spaceId", s.spaceId), + zap.Int("len(changes)", len(response.Changes))) log.DebugCtx(ctx, "received full sync response message") defer func() { diff --git a/commonspace/object/tree/synctree/utils_test.go b/commonspace/object/tree/synctree/utils_test.go new file mode 100644 index 00000000..0399d239 --- /dev/null +++ b/commonspace/object/tree/synctree/utils_test.go @@ -0,0 +1,446 @@ +package synctree + +import ( + "context" + "fmt" + "github.com/anytypeio/any-sync/commonspace/object/accountdata" + "github.com/anytypeio/any-sync/commonspace/object/acl/list" + "github.com/anytypeio/any-sync/commonspace/object/tree/objecttree" + "github.com/anytypeio/any-sync/commonspace/object/tree/treechangeproto" + "github.com/anytypeio/any-sync/commonspace/object/tree/treestorage" + "github.com/anytypeio/any-sync/commonspace/objectsync/synchandler" + "github.com/anytypeio/any-sync/commonspace/spacesyncproto" + "github.com/anytypeio/any-sync/commonspace/syncstatus" + "github.com/anytypeio/any-sync/net/peer" + "github.com/cheggaaa/mb/v3" + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" + "math/rand" + "sync" + "testing" + "time" +) + +type processMsg struct { + msg *spacesyncproto.ObjectSyncMessage + senderId string + receiverId string + userMsg *objecttree.RawChangesPayload +} + +type msgDescription struct { + name string + from string + to string + heads []string + changes []*treechangeproto.RawTreeChangeWithId +} + +func (p *processMsg) description() (descr msgDescription) { + unmarshalled := &treechangeproto.TreeSyncMessage{} + err := proto.Unmarshal(p.msg.Payload, unmarshalled) + if err != nil { + panic(err) + } + descr = msgDescription{ + from: p.senderId, + to: p.receiverId, + } + switch { + case unmarshalled.GetContent().GetHeadUpdate() != nil: + cnt := unmarshalled.GetContent().GetHeadUpdate() + descr.name = "HeadUpdate" + descr.heads = cnt.Heads + descr.changes = unmarshalled.GetContent().GetHeadUpdate().Changes + case unmarshalled.GetContent().GetFullSyncRequest() != nil: + cnt := unmarshalled.GetContent().GetFullSyncRequest() + descr.name = "FullSyncRequest" + descr.heads = cnt.Heads + descr.changes = unmarshalled.GetContent().GetFullSyncRequest().Changes + case unmarshalled.GetContent().GetFullSyncResponse() != nil: + cnt := unmarshalled.GetContent().GetFullSyncResponse() + descr.name = "FullSyncResponse" + descr.heads = cnt.Heads + descr.changes = unmarshalled.GetContent().GetFullSyncResponse().Changes + } + return +} + +type messageLog struct { + batcher *mb.MB[processMsg] +} + +func newMessageLog() *messageLog { + return &messageLog{batcher: mb.New[processMsg](0)} +} + +func (m *messageLog) addMessage(msg processMsg) { + m.batcher.Add(context.Background(), msg) +} + +type processSyncHandler struct { + synchandler.SyncHandler + batcher *mb.MB[processMsg] + peerId string + aclList list.AclList + log *messageLog + syncClient SyncClient +} + +func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { + if p.SyncHandler != nil { + return p.SyncHandler.HandleMessage(ctx, senderId, request) + } + unmarshalled := &treechangeproto.TreeSyncMessage{} + err = proto.Unmarshal(request.Payload, unmarshalled) + if err != nil { + return + } + if unmarshalled.Content.GetFullSyncResponse() == nil { + newTreeRequest := GetRequestFactory().CreateNewTreeRequest() + var objMsg *spacesyncproto.ObjectSyncMessage + objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + if err != nil { + return + } + return p.manager().SendPeer(context.Background(), senderId, objMsg) + } + fullSyncResponse := unmarshalled.Content.GetFullSyncResponse() + treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil) + tree, err := createTestTree(p.aclList, treeStorage) + if err != nil { + return + } + netTree := &broadcastTree{ + ObjectTree: tree, + SyncClient: p.syncClient, + } + res, err := netTree.AddRawChanges(context.Background(), objecttree.RawChangesPayload{ + NewHeads: fullSyncResponse.Heads, + RawChanges: fullSyncResponse.Changes, + }) + if err != nil { + return + } + p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus()) + var objMsg *spacesyncproto.ObjectSyncMessage + newTreeRequest := GetRequestFactory().CreateHeadUpdate(netTree, res.Added) + objMsg, err = marshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") + if err != nil { + return + } + return p.manager().Broadcast(context.Background(), objMsg) +} + +func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { + batcher := mb.New[processMsg](0) + return &processSyncHandler{ + SyncHandler: syncHandler, + batcher: batcher, + peerId: peerId, + } +} + +func (p *processSyncHandler) manager() *processPeerManager { + if p.SyncHandler != nil { + return p.SyncHandler.(*syncTreeHandler).syncClient.(*syncClient).PeerManager.(*processPeerManager) + } + return p.syncClient.(*syncClient).PeerManager.(*processPeerManager) +} + +func (p *processSyncHandler) tree() *broadcastTree { + return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree) +} + +func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { + return p.batcher.Add(ctx, msg) +} + +func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) { + p.batcher.Add(ctx, processMsg{userMsg: &changes}) +} + +func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) { + wg.Add(1) + go func() { + defer wg.Done() + for { + res, err := p.batcher.WaitOne(ctx) + if err != nil { + return + } + if res.userMsg != nil { + p.tree().Lock() + userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg) + require.NoError(t, err) + fmt.Println("user add result", userRes.Heads) + p.tree().Unlock() + continue + } + err = p.HandleMessage(ctx, res.senderId, res.msg) + if err != nil { + fmt.Println("error handling message", err.Error()) + continue + } + } + }() +} + +type processPeerManager struct { + peerId string + handlers map[string]*processSyncHandler + log *messageLog +} + +func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager { + return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log} +} + +func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) { + m.handlers[peerId] = handler +} + +func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + pMsg := processMsg{ + msg: msg, + senderId: m.peerId, + receiverId: peerId, + } + m.log.addMessage(pMsg) + return m.handlers[peerId].send(context.Background(), pMsg) +} + +func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { + for _, handler := range m.handlers { + pMsg := processMsg{ + msg: msg, + senderId: m.peerId, + receiverId: handler.peerId, + } + m.log.addMessage(pMsg) + handler.send(context.Background(), pMsg) + } + return +} + +func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { + panic("should not be called") +} + +type broadcastTree struct { + objecttree.ObjectTree + SyncClient +} + +func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) (objecttree.AddResult, error) { + res, err := b.ObjectTree.AddRawChanges(ctx, changes) + if err != nil { + return objecttree.AddResult{}, err + } + upd := b.SyncClient.CreateHeadUpdate(b.ObjectTree, res.Added) + b.SyncClient.Broadcast(ctx, upd) + return res, nil +} + +func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler { + factory := GetRequestFactory() + syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) + netTree := &broadcastTree{ + ObjectTree: objTree, + SyncClient: syncClient, + } + handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus()) + return newProcessSyncHandler(peerId, handler) +} + +func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler { + factory := GetRequestFactory() + syncClient := newSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) + + batcher := mb.New[processMsg](0) + return &processSyncHandler{ + batcher: batcher, + peerId: peerId, + aclList: aclList, + log: log, + syncClient: syncClient, + } +} + +func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage { + changeCreator := objecttree.NewMockChangeCreator() + st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id) + return st +} + +func createTestTree(aclList list.AclList, storage treestorage.TreeStorage) (objecttree.ObjectTree, error) { + return objecttree.BuildTestableTree(aclList, storage) +} + +type fixtureDeps struct { + aclList list.AclList + initStorage *treestorage.InMemoryTreeStorage + connectionMap map[string][]string + emptyTrees []string +} + +type processFixture struct { + handlers map[string]*processSyncHandler + log *messageLog + wg *sync.WaitGroup + ctx context.Context + cancel context.CancelFunc +} + +func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture { + var ( + handlers = map[string]*processSyncHandler{} + log = newMessageLog() + wg = sync.WaitGroup{} + ctx, cancel = context.WithCancel(context.Background()) + ) + + for peerId := range deps.connectionMap { + var handler *processSyncHandler + if slices.Contains(deps.emptyTrees, peerId) { + handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log) + } else { + stCopy := deps.initStorage.Copy() + testTree, err := createTestTree(deps.aclList, stCopy) + require.NoError(t, err) + handler = createSyncHandler(peerId, spaceId, testTree, log) + } + handlers[peerId] = handler + } + for peerId, connectionMap := range deps.connectionMap { + handler := handlers[peerId] + manager := handler.manager() + for _, connectionId := range connectionMap { + manager.addHandler(connectionId, handlers[connectionId]) + } + } + return &processFixture{ + handlers: handlers, + log: log, + wg: &wg, + ctx: ctx, + cancel: cancel, + } +} + +func (p *processFixture) run(t *testing.T) { + for _, handler := range p.handlers { + handler.run(p.ctx, t, p.wg) + } +} + +func (p *processFixture) stop() { + p.cancel() + p.wg.Wait() +} + +type genParams struct { + prefix string + aclId string + startIdx int + levels int + snapshotId string + prevHeads []string + isSnapshot func() bool +} + +type genResult struct { + changes []*treechangeproto.RawTreeChangeWithId + heads []string + snapshotId string +} + +func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res genResult) { + src := rand.NewSource(time.Now().Unix()) + rnd := rand.New(src) + var ( + prevHeads []string + snapshotId = params.snapshotId + ) + prevHeads = append(prevHeads, params.prevHeads...) + + for i := 0; i < params.levels; i++ { + if params.isSnapshot() { + newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, 0) + newCh := creator.CreateRaw(newId, params.aclId, snapshotId, true, prevHeads...) + res.changes = append(res.changes, newCh) + prevHeads = []string{newId} + snapshotId = newId + continue + } + perLevel := rnd.Intn(10) + if perLevel == 0 { + perLevel = 1 + } + var ( + newHeads []string + usedIds = map[string]struct{}{} + ) + for j := 0; j < perLevel; j++ { + // if we didn't connect with all prev ones + prevConns := rnd.Intn(len(prevHeads)) + if prevConns == 0 { + prevConns = 1 + } + rnd.Shuffle(len(prevHeads), func(i, j int) { + prevHeads[i], prevHeads[j] = prevHeads[j], prevHeads[i] + }) + if j == perLevel-1 && len(usedIds) != len(prevHeads) { + var unusedIds []string + for _, id := range prevHeads { + if _, exists := usedIds[id]; !exists { + unusedIds = append(unusedIds, id) + } + } + prevHeads = unusedIds + prevConns = len(prevHeads) + } + var prevChId []string + for k := 0; k < prevConns; k++ { + prevChId = append(prevChId, prevHeads[k]) + usedIds[prevHeads[k]] = struct{}{} + } + newId := fmt.Sprintf("%s.%d.%d", params.prefix, params.startIdx+i, j) + newCh := creator.CreateRaw(newId, params.aclId, snapshotId, false, prevChId...) + res.changes = append(res.changes, newCh) + newHeads = append(newHeads, newId) + } + prevHeads = newHeads + } + res.heads = prevHeads + res.snapshotId = snapshotId + return +} + +func TestGenChanges(t *testing.T) { + treeId := "treeId" + spaceId := "spaceId" + keys, err := accountdata.NewRandom() + require.NoError(t, err) + aclList, err := list.NewTestDerivedAcl(spaceId, keys) + storage := createStorage(treeId, aclList) + creator := objecttree.NewMockChangeCreator() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + params := genParams{ + prefix: "peerId", + aclId: aclList.Id(), + startIdx: 0, + levels: 10, + snapshotId: treeId, + prevHeads: []string{treeId}, + isSnapshot: func() bool { + return rnd.Intn(100) > 80 + }, + } + res := genChanges(creator, params) + storage.TransactionAdd(res.changes, res.heads) + tr, err := createTestTree(aclList, storage) + require.NoError(t, err) + fmt.Println(tr.Debug(objecttree.NoOpDescriptionParser)) +}