diff --git a/commonspace/object/tree/synctree/syncprotocol_test.go b/commonspace/object/tree/synctree/syncprotocol_test.go index 4b81b166..a87c0055 100644 --- a/commonspace/object/tree/synctree/syncprotocol_test.go +++ b/commonspace/object/tree/synctree/syncprotocol_test.go @@ -32,7 +32,7 @@ func TestEmptyClientGetsFullHistory(t *testing.T) { }, emptyTrees: []string{"peer2"}, } - fx := newProcessFixture(t, spaceId, deps) + fx := newProtocolFixture(t, spaceId, deps) fx.run(t) fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ NewHeads: nil, @@ -107,7 +107,7 @@ func testTreeMerge(t *testing.T, levels, perlevel int, isSnapshot func() bool) { }, emptyTrees: []string{"peer2", "node1"}, } - fx := newProcessFixture(t, spaceId, deps) + fx := newProtocolFixture(t, spaceId, deps) fx.run(t) fx.handlers["peer1"].sendRawChanges(context.Background(), objecttree.RawChangesPayload{ NewHeads: initialRes.heads, diff --git a/commonspace/object/tree/synctree/utils_test.go b/commonspace/object/tree/synctree/utils_test.go index 8e2ffbe9..9d2c4f68 100644 --- a/commonspace/object/tree/synctree/utils_test.go +++ b/commonspace/object/tree/synctree/utils_test.go @@ -22,13 +22,15 @@ import ( "time" ) -type processMsg struct { +// protocolMsg is a message used in sync protocol tests +type protocolMsg struct { msg *spacesyncproto.ObjectSyncMessage senderId string receiverId string userMsg *objecttree.RawChangesPayload } +// msgDescription is a representation of message used for checking the results of the test type msgDescription struct { name string from string @@ -37,7 +39,7 @@ type msgDescription struct { changes []*treechangeproto.RawTreeChangeWithId } -func (p *processMsg) description() (descr msgDescription) { +func (p *protocolMsg) description() (descr msgDescription) { unmarshalled := &treechangeproto.TreeSyncMessage{} err := proto.Unmarshal(p.msg.Payload, unmarshalled) if err != nil { @@ -67,30 +69,68 @@ func (p *processMsg) description() (descr msgDescription) { return } +// messageLog saves all messages that were sent during sync test type messageLog struct { - batcher *mb.MB[processMsg] + batcher *mb.MB[protocolMsg] } func newMessageLog() *messageLog { - return &messageLog{batcher: mb.New[processMsg](0)} + return &messageLog{batcher: mb.New[protocolMsg](0)} } -func (m *messageLog) addMessage(msg processMsg) { +func (m *messageLog) addMessage(msg protocolMsg) { m.batcher.Add(context.Background(), msg) } -type processSyncHandler struct { +// testSyncHandler is the wrapper around individual tree to test sync protocol +type testSyncHandler struct { synchandler.SyncHandler - batcher *mb.MB[processMsg] + batcher *mb.MB[protocolMsg] peerId string aclList list.AclList log *messageLog syncClient objectsync.SyncClient } -func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { - if p.SyncHandler != nil { - return p.SyncHandler.HandleMessage(ctx, senderId, request) +// createSyncHandler creates a sync handler when a tree is already created +func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *testSyncHandler { + factory := objectsync.GetRequestFactory() + syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory) + netTree := &broadcastTree{ + ObjectTree: objTree, + SyncClient: syncClient, + } + handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus()) + return newTestSyncHandler(peerId, handler) +} + +// createEmptySyncHandler creates a sync handler when the tree will be provided later (this emulates the situation when we have no tree) +func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *testSyncHandler { + factory := objectsync.GetRequestFactory() + syncClient := objectsync.NewSyncClient(spaceId, newTestPeerManager(peerId, log), factory) + + batcher := mb.New[protocolMsg](0) + return &testSyncHandler{ + batcher: batcher, + peerId: peerId, + aclList: aclList, + log: log, + syncClient: syncClient, + } +} + +func newTestSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *testSyncHandler { + batcher := mb.New[protocolMsg](0) + return &testSyncHandler{ + SyncHandler: syncHandler, + batcher: batcher, + peerId: peerId, + } +} + +func (h *testSyncHandler) HandleMessage(ctx context.Context, senderId string, request *spacesyncproto.ObjectSyncMessage) (err error) { + if h.SyncHandler != nil { + return h.SyncHandler.HandleMessage(ctx, senderId, request) } unmarshalled := &treechangeproto.TreeSyncMessage{} err = proto.Unmarshal(request.Payload, unmarshalled) @@ -104,17 +144,17 @@ func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, if err != nil { return } - return p.manager().SendPeer(context.Background(), senderId, objMsg) + return h.manager().SendPeer(context.Background(), senderId, objMsg) } fullSyncResponse := unmarshalled.Content.GetFullSyncResponse() treeStorage, _ := treestorage.NewInMemoryTreeStorage(unmarshalled.RootChange, []string{unmarshalled.RootChange.Id}, nil) - tree, err := createTestTree(p.aclList, treeStorage) + tree, err := createTestTree(h.aclList, treeStorage) if err != nil { return } netTree := &broadcastTree{ ObjectTree: tree, - SyncClient: p.syncClient, + SyncClient: h.syncClient, } res, err := netTree.AddRawChanges(context.Background(), objecttree.RawChangesPayload{ NewHeads: fullSyncResponse.Heads, @@ -123,62 +163,53 @@ func (p *processSyncHandler) HandleMessage(ctx context.Context, senderId string, if err != nil { return } - p.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, p.syncClient, syncstatus.NewNoOpSyncStatus()) + h.SyncHandler = newSyncTreeHandler(request.SpaceId, netTree, h.syncClient, syncstatus.NewNoOpSyncStatus()) var objMsg *spacesyncproto.ObjectSyncMessage newTreeRequest := objectsync.GetRequestFactory().CreateHeadUpdate(netTree, res.Added) objMsg, err = objectsync.MarshallTreeMessage(newTreeRequest, request.SpaceId, request.ObjectId, "") if err != nil { return } - return p.manager().Broadcast(context.Background(), objMsg) + return h.manager().Broadcast(context.Background(), objMsg) } -func newProcessSyncHandler(peerId string, syncHandler synchandler.SyncHandler) *processSyncHandler { - batcher := mb.New[processMsg](0) - return &processSyncHandler{ - SyncHandler: syncHandler, - batcher: batcher, - peerId: peerId, +func (h *testSyncHandler) manager() *testPeerManager { + if h.SyncHandler != nil { + return h.SyncHandler.(*syncTreeHandler).syncClient.PeerManager().(*testPeerManager) } + return h.syncClient.PeerManager().(*testPeerManager) } -func (p *processSyncHandler) manager() *processPeerManager { - if p.SyncHandler != nil { - return p.SyncHandler.(*syncTreeHandler).syncClient.PeerManager().(*processPeerManager) - } - return p.syncClient.PeerManager().(*processPeerManager) +func (h *testSyncHandler) tree() *broadcastTree { + return h.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree) } -func (p *processSyncHandler) tree() *broadcastTree { - return p.SyncHandler.(*syncTreeHandler).objTree.(*broadcastTree) +func (h *testSyncHandler) send(ctx context.Context, msg protocolMsg) (err error) { + return h.batcher.Add(ctx, msg) } -func (p *processSyncHandler) send(ctx context.Context, msg processMsg) (err error) { - return p.batcher.Add(ctx, msg) +func (h *testSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) { + h.batcher.Add(ctx, protocolMsg{userMsg: &changes}) } -func (p *processSyncHandler) sendRawChanges(ctx context.Context, changes objecttree.RawChangesPayload) { - p.batcher.Add(ctx, processMsg{userMsg: &changes}) -} - -func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) { +func (h *testSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.WaitGroup) { wg.Add(1) go func() { defer wg.Done() for { - res, err := p.batcher.WaitOne(ctx) + res, err := h.batcher.WaitOne(ctx) if err != nil { return } if res.userMsg != nil { - p.tree().Lock() - userRes, err := p.tree().AddRawChanges(ctx, *res.userMsg) + h.tree().Lock() + userRes, err := h.tree().AddRawChanges(ctx, *res.userMsg) require.NoError(t, err) fmt.Println("user add result", userRes.Heads) - p.tree().Unlock() + h.tree().Unlock() continue } - err = p.HandleMessage(ctx, res.senderId, res.msg) + err = h.HandleMessage(ctx, res.senderId, res.msg) if err != nil { fmt.Println("error handling message", err.Error()) continue @@ -187,22 +218,23 @@ func (p *processSyncHandler) run(ctx context.Context, t *testing.T, wg *sync.Wai }() } -type processPeerManager struct { +// testPeerManager captures all other handlers and sends messages to them +type testPeerManager struct { peerId string - handlers map[string]*processSyncHandler + handlers map[string]*testSyncHandler log *messageLog } -func newProcessPeerManager(peerId string, log *messageLog) *processPeerManager { - return &processPeerManager{handlers: map[string]*processSyncHandler{}, peerId: peerId, log: log} +func newTestPeerManager(peerId string, log *messageLog) *testPeerManager { + return &testPeerManager{handlers: map[string]*testSyncHandler{}, peerId: peerId, log: log} } -func (m *processPeerManager) addHandler(peerId string, handler *processSyncHandler) { +func (m *testPeerManager) addHandler(peerId string, handler *testSyncHandler) { m.handlers[peerId] = handler } -func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { - pMsg := processMsg{ +func (m *testPeerManager) SendPeer(ctx context.Context, peerId string, msg *spacesyncproto.ObjectSyncMessage) (err error) { + pMsg := protocolMsg{ msg: msg, senderId: m.peerId, receiverId: peerId, @@ -211,9 +243,9 @@ func (m *processPeerManager) SendPeer(ctx context.Context, peerId string, msg *s return m.handlers[peerId].send(context.Background(), pMsg) } -func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { +func (m *testPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto.ObjectSyncMessage) (err error) { for _, handler := range m.handlers { - pMsg := processMsg{ + pMsg := protocolMsg{ msg: msg, senderId: m.peerId, receiverId: handler.peerId, @@ -224,10 +256,12 @@ func (m *processPeerManager) Broadcast(ctx context.Context, msg *spacesyncproto. return } -func (m *processPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { +func (m *testPeerManager) GetResponsiblePeers(ctx context.Context) (peers []peer.Peer, err error) { panic("should not be called") } +// broadcastTree is the tree that broadcasts changes to everyone when changes are added +// it is a simplified version of SyncTree which is easier to use in the test environment type broadcastTree struct { objecttree.ObjectTree objectsync.SyncClient @@ -243,31 +277,6 @@ func (b *broadcastTree) AddRawChanges(ctx context.Context, changes objecttree.Ra return res, nil } -func createSyncHandler(peerId, spaceId string, objTree objecttree.ObjectTree, log *messageLog) *processSyncHandler { - factory := objectsync.GetRequestFactory() - syncClient := objectsync.NewSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) - netTree := &broadcastTree{ - ObjectTree: objTree, - SyncClient: syncClient, - } - handler := newSyncTreeHandler(spaceId, netTree, syncClient, syncstatus.NewNoOpSyncStatus()) - return newProcessSyncHandler(peerId, handler) -} - -func createEmptySyncHandler(peerId, spaceId string, aclList list.AclList, log *messageLog) *processSyncHandler { - factory := objectsync.GetRequestFactory() - syncClient := objectsync.NewSyncClient(spaceId, newProcessPeerManager(peerId, log), factory) - - batcher := mb.New[processMsg](0) - return &processSyncHandler{ - batcher: batcher, - peerId: peerId, - aclList: aclList, - log: log, - syncClient: syncClient, - } -} - func createStorage(treeId string, aclList list.AclList) treestorage.TreeStorage { changeCreator := objecttree.NewMockChangeCreator() st := changeCreator.CreateNewTreeStorage(treeId, aclList.Head().Id) @@ -285,24 +294,25 @@ type fixtureDeps struct { emptyTrees []string } -type processFixture struct { - handlers map[string]*processSyncHandler +// protocolFixture is the test environment for sync protocol tests +type protocolFixture struct { + handlers map[string]*testSyncHandler log *messageLog wg *sync.WaitGroup ctx context.Context cancel context.CancelFunc } -func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processFixture { +func newProtocolFixture(t *testing.T, spaceId string, deps fixtureDeps) *protocolFixture { var ( - handlers = map[string]*processSyncHandler{} + handlers = map[string]*testSyncHandler{} log = newMessageLog() wg = sync.WaitGroup{} ctx, cancel = context.WithCancel(context.Background()) ) for peerId := range deps.connectionMap { - var handler *processSyncHandler + var handler *testSyncHandler if slices.Contains(deps.emptyTrees, peerId) { handler = createEmptySyncHandler(peerId, spaceId, deps.aclList, log) } else { @@ -320,7 +330,7 @@ func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processF manager.addHandler(connectionId, handlers[connectionId]) } } - return &processFixture{ + return &protocolFixture{ handlers: handlers, log: log, wg: &wg, @@ -329,18 +339,20 @@ func newProcessFixture(t *testing.T, spaceId string, deps fixtureDeps) *processF } } -func (p *processFixture) run(t *testing.T) { +func (p *protocolFixture) run(t *testing.T) { for _, handler := range p.handlers { handler.run(p.ctx, t, p.wg) } } -func (p *processFixture) stop() { +func (p *protocolFixture) stop() { p.cancel() p.wg.Wait() } +// genParams is the parameters for genChanges type genParams struct { + // prefix is the prefix which is added to change id prefix string aclId string startIdx int @@ -351,12 +363,14 @@ type genParams struct { isSnapshot func() bool } +// genResult is the result of genChanges type genResult struct { changes []*treechangeproto.RawTreeChangeWithId heads []string snapshotId string } +// genChanges generates several levels of tree changes where each level is connected with only previous one func genChanges(creator *objecttree.MockChangeCreator, params genParams) (res genResult) { src := rand.NewSource(time.Now().Unix()) rnd := rand.New(src)