Add comments to test

This commit is contained in:
mcrakhman 2023-04-19 11:49:04 +02:00 committed by Mikhail Iudin
parent a2e6fccac6
commit bb47f0aa21
No known key found for this signature in database
GPG Key ID: FAAAA8BAABDFF1C0
2 changed files with 99 additions and 85 deletions

View File

@ -32,7 +32,7 @@ func TestEmptyClientGetsFullHistory(t *testing.T) {
}, },
emptyTrees: []string{"peer2"}, emptyTrees: []string{"peer2"},
} }
fx := newProcessFixture(t, spaceId, deps) fx := newProtocolFixture(t, spaceId, deps)
fx.run(t) fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: nil, NewHeads: nil,
@ -107,7 +107,7 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) {
}, },
emptyTrees: []string{"peer2", "node1"}, emptyTrees: []string{"peer2", "node1"},
} }
fx := newProcessFixture(t, spaceId, deps) fx := newProtocolFixture(t, spaceId, deps)
fx.run(t) fx.run(t)
fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: initialRes.heads, NewHeads: initialRes.heads,

View File

@ -22,13 +22,15 @@ import (
"time" "time"
) )
type processMsg struct { // protocolMsg is a message used in sync protocol tests
type protocolMsg struct {
msg *spacesyncproto.ObjectSyncMessage msg *spacesyncproto.ObjectSyncMessage
senderId string senderId string
receiverId string receiverId string
userMsg *objecttree.RawChangesPayload userMsg *objecttree.RawChangesPayload
} }
// msgDescription is a representation of message used for checking the results of the test
type msgDescription struct { type msgDescription struct {
name string name string
from string from string
@ -37,7 +39,7 @@ type msgDescription struct {
changes []*treechangeproto.RawTreeChangeWithId changes []*treechangeproto.RawTreeChangeWithId
} }
func (p *processMsg) description() (descr msgDescription) { func (p *protocolMsg) description() (descr msgDescription) {
unmarshalled := &treechangeproto.TreeSyncMessage{} unmarshalled := &treechangeproto.TreeSyncMessage{}
err := proto.Unmarshal(p.msg.Payload, unmarshalled) err := proto.Unmarshal(p.msg.Payload, unmarshalled)
if err != nil { if err != nil {
@ -67,30 +69,68 @@ func (p *processMsg) description() (descr msgDescription) {
return return
} }
// messageLog saves all messages that were sent during sync test
type messageLog struct { type messageLog struct {
batcher *mb.MB[processMsg] batcher *mb.MB[protocolMsg]
} }
func newMessageLog() *messageLog { func newMessageLog() *messageLog {
return &messageLog{batcher: mb.New[processMsg](0)} return &messageLog{batcher: mb.New[protocolMsg](0)}
} }
func (m *messageLog) addMessage(msg processMsg) { func (m *messageLog) addMessage(msg protocolMsg) {
m.batcher.Add(context.Background(), msg) m.batcher.Add(context.Background(), msg)
} }
type processSyncHandler struct { // testSyncHandler is the wrapper around individual tree to test sync protocol
type testSyncHandler struct {
synchandler.SyncHandler synchandler.SyncHandler
batcher *mb.MB[processMsg] batcher *mb.MB[protocolMsg]
peerId string peerId string
aclList list.AclList aclList list.AclList
log *messageLog log *messageLog
syncClient objectsync.SyncClient syncClient objectsync.SyncClient
} }
func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { // createSyncHandler creates a sync handler when a tree is already created
if p.SyncHandler != nil { func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler {
return p.SyncHandler.HandleMessage(ctx, senderId, request) factory := objectsync.GetRequestFactory()
syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
}
handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus())
return newTestSyncHandler(peerId, handler)
}
// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree)
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *testSyncHandler {
factory := objectsync.GetRequestFactory()
syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory)
batcher := mb.New[protocolMsg](0)
return &testSyncHandler{
batcher: batcher,
peerId: peerId,
aclList: aclList,
log: log,
syncClient: syncClient,
}
}
func newTestSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *testSyncHandler {
batcher := mb.New[protocolMsg](0)
return &testSyncHandler{
SyncHandler: syncHandler,
batcher: batcher,
peerId: peerId,
}
}
func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) {
if h.SyncHandler != nil {
return h.SyncHandler.HandleMessage(ctx, senderId, request)
} }
unmarshalled := &treechangeproto.TreeSyncMessage{} unmarshalled := &treechangeproto.TreeSyncMessage{}
err = proto.Unmarshal(request.Payload, unmarshalled) err = proto.Unmarshal(request.Payload, unmarshalled)
@ -104,17 +144,17 @@ func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string,
if err != nil { if err != nil {
return return
} }
return p.manager().SendPeer(context.Background(), senderId, objMsg) return h.manager().SendPeer(context.Background(), senderId, objMsg)
} }
fullSyncResponse := unmarshalled.Content.GetFullSyncResponse() fullSyncResponse := unmarshalled.Content.GetFullSyncResponse()
treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil) treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil)
tree, err := createTestTree(p.aclList, treeStorage) tree, err := createTestTree(h.aclList, treeStorage)
if err != nil { if err != nil {
return return
} }
netTree := &broadcastTree{ netTree := &broadcastTree{
ObjectTree: tree, ObjectTree: tree,
SyncClient: p.syncClient, SyncClient: h.syncClient,
} }
res, err := netTree.AddRawChanges(context.Background(), objecttree.RawChangesPayload{ res, err := netTree.AddRawChanges(context.Background(), objecttree.RawChangesPayload{
NewHeads: fullSyncResponse.Heads, NewHeads: fullSyncResponse.Heads,
@ -123,62 +163,53 @@ func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string,
if err != nil { if err != nil {
return return
} }
p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus()) h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus())
var objMsg *spacesyncproto.ObjectSyncMessage var objMsg *spacesyncproto.ObjectSyncMessage
newTreeRequest := objectsync.GetRequestFactory().CreateHeadUpdate(netTree, res.Added) newTreeRequest := objectsync.GetRequestFactory().CreateHeadUpdate(netTree, res.Added)
objMsg, err = objectsync.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") objMsg, err = objectsync.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "")
if err != nil { if err != nil {
return return
} }
return p.manager().Broadcast(context.Background(), objMsg) return h.manager().Broadcast(context.Background(), objMsg)
} }
func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { func (h *testSyncHandler) manager() *testPeerManager {
batcher := mb.New[processMsg](0) if h.SyncHandler != nil {
return &processSyncHandler{ return h.SyncHandler.(*syncTreeHandler).syncClient.PeerManager().(*testPeerManager)
SyncHandler: syncHandler,
batcher: batcher,
peerId: peerId,
} }
return h.syncClient.PeerManager().(*testPeerManager)
} }
func (p *processSyncHandler) manager() *processPeerManager { func (h *testSyncHandler) tree() *broadcastTree {
if p.SyncHandler != nil { return h.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree)
return p.SyncHandler.(*syncTreeHandler).syncClient.PeerManager().(*processPeerManager)
}
return p.syncClient.PeerManager().(*processPeerManager)
} }
func (p *processSyncHandler) tree() *broadcastTree { func (h *testSyncHandler) send(ctx context.Context, msg protocolMsg) (err error) {
return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree) return h.batcher.Add(ctx, msg)
} }
func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { func (h *testSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) {
return p.batcher.Add(ctx, msg) h.batcher.Add(ctx, protocolMsg{userMsg: &changes})
} }
func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) { func (h *testSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) {
p.batcher.Add(ctx, processMsg{userMsg: &changes})
}
func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
for { for {
res, err := p.batcher.WaitOne(ctx) res, err := h.batcher.WaitOne(ctx)
if err != nil { if err != nil {
return return
} }
if res.userMsg != nil { if res.userMsg != nil {
p.tree().Lock() h.tree().Lock()
userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg) userRes, err := h.tree().AddRawChanges(ctx, *res.userMsg)
require.NoError(t, err) require.NoError(t, err)
fmt.Println("user add result", userRes.Heads) fmt.Println("user add result", userRes.Heads)
p.tree().Unlock() h.tree().Unlock()
continue continue
} }
err = p.HandleMessage(ctx, res.senderId, res.msg) err = h.HandleMessage(ctx, res.senderId, res.msg)
if err != nil { if err != nil {
fmt.Println("error handling message", err.Error()) fmt.Println("error handling message", err.Error())
continue continue
@ -187,22 +218,23 @@ func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.Wai
}() }()
} }
type processPeerManager struct { // testPeerManager captures all other handlers and sends messages to them
type testPeerManager struct {
peerId string peerId string
handlers map[string]*processSyncHandler handlers map[string]*testSyncHandler
log *messageLog log *messageLog
} }
func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager { func newTestPeerManager(peerId string, log *messageLog) *testPeerManager {
return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log} return &testPeerManager{handlers: map[string]*testSyncHandler{}, peerId: peerId, log: log}
} }
func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) { func (m *testPeerManager) addHandler(peerId string, handler *testSyncHandler) {
m.handlers[peerId] = handler m.handlers[peerId] = handler
} }
func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (m *testPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) {
pMsg := processMsg{ pMsg := protocolMsg{
msg: msg, msg: msg,
senderId: m.peerId, senderId: m.peerId,
receiverId: peerId, receiverId: peerId,
@ -211,9 +243,9 @@ func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *s
return m.handlers[peerId].send(context.Background(), pMsg) return m.handlers[peerId].send(context.Background(), pMsg)
} }
func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { func (m *testPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) {
for _, handler := range m.handlers { for _, handler := range m.handlers {
pMsg := processMsg{ pMsg := protocolMsg{
msg: msg, msg: msg,
senderId: m.peerId, senderId: m.peerId,
receiverId: handler.peerId, receiverId: handler.peerId,
@ -224,10 +256,12 @@ func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.
return return
} }
func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { func (m *testPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) {
panic("should not be called") panic("should not be called")
} }
// broadcastTree is the tree that broadcasts changes to everyone when changes are added
// it is a simplified version of SyncTree which is easier to use in the test environment
type broadcastTree struct { type broadcastTree struct {
objecttree.ObjectTree objecttree.ObjectTree
objectsync.SyncClient objectsync.SyncClient
@ -243,31 +277,6 @@ func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.Ra
return res, nil return res, nil
} }
func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler {
factory := objectsync.GetRequestFactory()
syncClient := objectsync.NewSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
netTree := &broadcastTree{
ObjectTree: objTree,
SyncClient: syncClient,
}
handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus())
return newProcessSyncHandler(peerId, handler)
}
func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler {
factory := objectsync.GetRequestFactory()
syncClient := objectsync.NewSyncClient(spaceId, newProcessPeerManager(peerId, log), factory)
batcher := mb.New[processMsg](0)
return &processSyncHandler{
batcher: batcher,
peerId: peerId,
aclList: aclList,
log: log,
syncClient: syncClient,
}
}
func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage { func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage {
changeCreator := objecttree.NewMockChangeCreator() changeCreator := objecttree.NewMockChangeCreator()
st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id) st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id)
@ -285,24 +294,25 @@ type fixtureDeps struct {
emptyTrees []string emptyTrees []string
} }
type processFixture struct { // protocolFixture is the test environment for sync protocol tests
handlers map[string]*processSyncHandler type protocolFixture struct {
handlers map[string]*testSyncHandler
log *messageLog log *messageLog
wg *sync.WaitGroup wg *sync.WaitGroup
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
} }
func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture { func newProtocolFixture(t *testing.T, spaceId string, deps fixtureDeps) *protocolFixture {
var ( var (
handlers = map[string]*processSyncHandler{} handlers = map[string]*testSyncHandler{}
log = newMessageLog() log = newMessageLog()
wg = sync.WaitGroup{} wg = sync.WaitGroup{}
ctx, cancel = context.WithCancel(context.Background()) ctx, cancel = context.WithCancel(context.Background())
) )
for peerId := range deps.connectionMap { for peerId := range deps.connectionMap {
var handler *processSyncHandler var handler *testSyncHandler
if slices.Contains(deps.emptyTrees, peerId) { if slices.Contains(deps.emptyTrees, peerId) {
handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log) handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log)
} else { } else {
@ -320,7 +330,7 @@ func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processF
manager.addHandler(connectionId, handlers[connectionId]) manager.addHandler(connectionId, handlers[connectionId])
} }
} }
return &processFixture{ return &protocolFixture{
handlers: handlers, handlers: handlers,
log: log, log: log,
wg: &wg, wg: &wg,
@ -329,18 +339,20 @@ func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processF
} }
} }
func (p *processFixture) run(t *testing.T) { func (p *protocolFixture) run(t *testing.T) {
for _, handler := range p.handlers { for _, handler := range p.handlers {
handler.run(p.ctx, t, p.wg) handler.run(p.ctx, t, p.wg)
} }
} }
func (p *processFixture) stop() { func (p *protocolFixture) stop() {
p.cancel() p.cancel()
p.wg.Wait() p.wg.Wait()
} }
// genParams is the parameters for genChanges
type genParams struct { type genParams struct {
// prefix is the prefix which is added to change id
prefix string prefix string
aclId string aclId string
startIdx int startIdx int
@ -351,12 +363,14 @@ type genParams struct {
isSnapshot func() bool isSnapshot func() bool
} }
// genResult is the result of genChanges
type genResult struct { type genResult struct {
changes []*treechangeproto.RawTreeChangeWithId changes []*treechangeproto.RawTreeChangeWithId
heads []string heads []string
snapshotId string snapshotId string
} }
// genChanges generates several levels of tree changes where each level is connected with only previous one
func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res genResult) { func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res genResult) {
src := rand.NewSource(time.Now().Unix()) src := rand.NewSource(time.Now().Unix())
rnd := rand.New(src) rnd := rand.New(src)