Skip to content

Instantly share code, notes, and snippets.

@engalar
Last active August 31, 2025 06:18
Show Gist options
  • Save engalar/e147001ce0dc46cc40be6b3bc0df59d6 to your computer and use it in GitHub Desktop.
Save engalar/e147001ce0dc46cc40be6b3bc0df59d6 to your computer and use it in GitHub Desktop.
Mendix Navigation Visualizer
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Mendix Traceability Analyzer (with React Flow)</title>
<script src="assets/react.development.js"></script>
<script src="assets/react-dom.development.js"></script>
<script src="assets/tailwindcss.js"></script>
<script src="assets/babel.min.js"></script>
<script src="assets/vconsole.min.js"></script>
<script src="assets/awilix.umd.js"></script>
<script>new VConsole();</script>
<!-- React Flow CDN assets -->
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/[email protected]/dist/style.min.css" />
<script src="https://cdn.jsdelivr.net/npm/[email protected]/dist/umd/index.min.js"></script>
</head>
<body class="bg-gray-100 font-sans">
<div id="app"></div>
<script type="text/babel">
const { useState, useMemo, useEffect, useCallback } = React;
const { createContainer, asClass, asValue, InjectionMode } = Awilix;
// Destructure React Flow from the global window object provided by the CDN
const {
default: ReactFlow,
Background,
Controls,
useNodesState,
useEdgesState,
MarkerType
} = window.ReactFlow;
// == 1. LOW-LEVEL MODULES (DETAILS) - [UNCHANGED] ==
class MendixRpcClient {
constructor() {
this.requestId = 0;
this.pendingRequests = new Map();
window.addEventListener('message', (event) => {
if (event.data.type === 'backendResponse') {
this.handleResponse(event.data.data);
}
});
}
handleResponse(response) {
try {
const data = typeof response === 'string' ? JSON.parse(response) : response;
const requestId = data.requestId;
const pendingRequest = this.pendingRequests.get(requestId);
if (pendingRequest) {
this.pendingRequests.delete(requestId);
if (data.error) {
pendingRequest.reject(new Error(data.error));
} else {
pendingRequest.resolve(data.result);
}
}
} catch (error) {
console.error('Error handling response:', error);
}
}
call(method, params = {}) {
const requestId = ++this.requestId;
return new Promise((resolve, reject) => {
this.pendingRequests.set(requestId, { resolve, reject });
window.parent.sendMessage("frontend:message", { jsonrpc: "2.0", method, params, id: requestId });
});
}
getTraceabilityGraph() { return this.call('getTraceabilityGraph'); }
// [NEW] Analysis RPC calls
findPaths(startNodeId, endNodeId) { return this.call('findPaths', { startNodeId, endNodeId }); }
findCommonUpstream(nodeIds) { return this.call('findCommonUpstream', { nodeIds }); }
findCommonDownstream(nodeIds) { return this.call('findCommonDownstream', { nodeIds }); }
getSubgraph(nodeIds) { return this.call('getSubgraph', { nodeIds }); }
}
// == 2. APPLICATION CORE (ABSTRACTIONS & LOGIC) - [EXTENDED] ==
class AppState {
constructor(onStateChange) {
this._onStateChange = onStateChange;
this.state = {
graphData: { nodes: [], edges: [] },
selectedNodeIds: new Set(),
analysisResult: null, // { type: 'graph', data: any, isPath: boolean }
loading: true,
analysisLoading: false,
};
}
_update(newState) { this.state = { ...this.state, ...newState }; this._onStateChange(this.state); }
setGraphData(graphData) { this._update({ graphData, loading: false }); }
toggleSelection(nodeId) {
const newSet = new Set(this.state.selectedNodeIds);
if (newSet.has(nodeId)) { newSet.delete(nodeId); }
else { newSet.add(nodeId); }
this._update({ selectedNodeIds: newSet });
}
clearSelection() { this._update({ selectedNodeIds: new Set(), analysisResult: null }); }
setAnalysisResult(result) { this._update({ analysisResult: result, analysisLoading: false }); }
setLoading(loading) { this._update({ loading }); }
setAnalysisLoading(loading) { this._update({ analysisLoading: loading }); }
}
class TraceabilityService {
constructor({ rpcClient, appState }) {
this.rpcClient = rpcClient;
this.appState = appState;
}
async fetchGraph() {
this.appState.setLoading(true);
this.appState.clearSelection(); // Clear selection when fetching new graph
try {
const response = await this.rpcClient.getTraceabilityGraph();
this.appState.setGraphData(response);
} catch (error) {
alert('Error fetching graph: ' + error.message);
this.appState.setLoading(false);
}
};
toggleNodeSelection = (nodeId) => this.appState.toggleSelection(nodeId);
clearNodeSelection = () => this.appState.clearSelection();
performAnalysis = async (type) => {
this.appState.setAnalysisLoading(true);
this.appState.setAnalysisResult(null); // Clear previous result
const selectedIds = Array.from(this.appState.state.selectedNodeIds);
try {
let resultData;
let isPath = false;
if (type === 'PATH' && selectedIds.length === 2) {
const paths = await this.rpcClient.findPaths(selectedIds[0], selectedIds[1]);
const nodesMap = new Map();
// Ensure all nodes in the path are captured
(paths[0] || []).forEach(node => nodesMap.set(node.id, node));
const edges = [];
if (paths && paths[0]) {
for (let i = 0; i < paths[0].length - 1; i++) {
edges.push({ source: paths[0][i].id, target: paths[0][i+1].id, type: 'FOLLOWS' });
}
}
resultData = { nodes: Array.from(nodesMap.values()), edges };
isPath = true;
} else if (type === 'UPSTREAM' && selectedIds.length > 0) {
resultData = await this.rpcClient.findCommonUpstream(selectedIds);
} else if (type === 'DOWNSTREAM' && selectedIds.length > 0) {
resultData = await this.rpcClient.findCommonDownstream(selectedIds);
} else if (type === 'SUBGRAPH' && selectedIds.length > 0) {
resultData = await this.rpcClient.getSubgraph(selectedIds);
}
if (resultData) {
this.appState.setAnalysisResult({ type: 'graph', data: resultData, isPath });
} else {
// If no data returned or invalid input, clear loading state
this.appState.setAnalysisLoading(false);
}
} catch (error) {
alert('Analysis Error: ' + error.message);
this.appState.setAnalysisLoading(false);
}
}
}
// == 3. UI LAYER (DUMB COMPONENTS) ==
const NODE_TYPE_CONFIG = {
PAGE: { icon: '📄', color: '#dbfde9', borderColor: '#6ee7b7' },
MICROFLOW: { icon: '⚙️', color: '#dbeafe', borderColor: '#93c5fd' },
NAVIGATION_ITEM: { icon: '🧭', color: '#e0e7ff', borderColor: '#a5b4fc' },
ENTITY: { icon: '📦', color: '#fef9c3', borderColor: '#fde047' },
UNKNOWN: { icon: '❓', color: '#e5e7eb', borderColor: '#9ca3af' },
};
// Component for individual nodes in the explorer list
function CompactNodeItem({ node, onSelect, isSelected }) {
const config = NODE_TYPE_CONFIG[node.type] || NODE_TYPE_CONFIG.UNKNOWN;
return (
<div onClick={() => onSelect(node.id)} className={`flex items-center p-1 mb-1 rounded cursor-pointer transition-all text-xs ${isSelected ? 'bg-blue-100 ring-1 ring-blue-400' : 'hover:bg-gray-200'}`}>
<input type="checkbox" checked={isSelected} readOnly className="mr-2 pointer-events-none" />
<span className="mr-2">{config.icon}</span>
<span className="truncate flex-grow">{node.name}</span>
</div>
);
}
// The left-hand panel for exploring and selecting nodes
function ExplorerPanel({ nodes, onSelect, onRefresh, loading, selectedNodeIds }) {
const [searchTerm, setSearchTerm] = useState('');
const groupedNodes = useMemo(() => {
const filtered = nodes.filter(n =>
n.name.toLowerCase().includes(searchTerm.toLowerCase()) ||
n.module.toLowerCase().includes(searchTerm.toLowerCase())
);
return filtered.reduce((acc, node) => {
(acc[node.module] = acc[node.module] || []).push(node);
return acc;
}, {});
}, [nodes, searchTerm]);
return (
<div className="h-full flex flex-col bg-white border-r">
<div className="p-1 border-b flex items-center space-x-1">
<button onClick={onRefresh} disabled={loading} className="p-1 rounded hover:bg-gray-100 disabled:opacity-50">
{loading ? '...' : '🔄'}
</button>
<input type="text" placeholder="Search..." className="w-full p-1 border rounded text-sm" value={searchTerm} onChange={e => setSearchTerm(e.target.value)} />
</div>
<div className="overflow-y-auto flex-grow p-1">
{Object.entries(groupedNodes).sort().map(([moduleName, nodesInModule]) => (
<details key={moduleName} open>
<summary className="font-bold text-sm cursor-pointer py-1 text-gray-700">{moduleName}</summary>
<div className="pl-2 ml-1">
{nodesInModule.sort((a,b) => a.name.localeCompare(b.name)).map(node => <CompactNodeItem key={node.id} node={node} onSelect={onSelect} isSelected={selectedNodeIds.has(node.id)} />)}
</div>
</details>
))}
</div>
</div>
);
}
// Component to display the graph visualization using React Flow
function GraphViewer({ graphData, isPath }) {
const { nodes: rawNodes, edges: rawEdges } = graphData;
const layoutedElements = useMemo(() => {
if (!rawNodes || rawNodes.length === 0) {
return { nodes: [], edges: [] };
}
const nodes = rawNodes.map((node, index) => {
const config = NODE_TYPE_CONFIG[node.type] || NODE_TYPE_CONFIG.UNKNOWN;
const nodeWidth = 170;
const nodeHeight = 40;
const nodesPerRow = 4; // Number of nodes per row for basic layout
return {
id: node.id,
position: {
x: (index % nodesPerRow) * (nodeWidth + 50), // Basic horizontal spacing
y: Math.floor(index / nodesPerRow) * (nodeHeight + 40), // Basic vertical spacing
},
data: {
label: (
<div className="flex items-center">
<span className="mr-2 text-base">{config.icon}</span>
<div className="flex flex-col">
<span className="font-bold text-xs">{node.name}</span>
<span className="text-gray-500" style={{ fontSize: '10px' }}>{node.module}</span>
</div>
</div>
),
},
style: {
backgroundColor: config.color,
borderColor: config.borderColor,
borderWidth: '1px',
width: nodeWidth,
height: nodeHeight,
fontSize: '12px',
padding: '5px',
borderRadius: '8px', // Slightly rounded corners
boxShadow: '0 1px 3px rgba(0,0,0,0.1)',
},
};
});
const edges = rawEdges.map(edge => ({
id: `e-${edge.source}-${edge.target}`,
source: edge.source,
target: edge.target,
type: 'smoothstep', // Use smooth curves for edges
animated: isPath, // Animate edges if it's a path analysis
markerEnd: {
type: MarkerType.ArrowClosed, // Add arrow marker to indicate direction
},
style: {
strokeWidth: 2,
stroke: '#9ca3af', // Default edge color
},
}));
return { nodes, edges };
}, [rawNodes, rawEdges, isPath]);
const [nodes, setNodes, onNodesChange] = useNodesState(layoutedElements.nodes);
const [edges, setEdges, onEdgesChange] = useEdgesState(layoutedElements.edges);
// Update internal state when graphData changes
useEffect(() => {
setNodes(layoutedElements.nodes);
setEdges(layoutedElements.edges);
}, [layoutedElements, setNodes, setEdges]);
if (rawNodes.length === 0) {
return <div className="text-center text-gray-400 mt-8 text-sm">No results to display. Run an analysis to see the graph.</div>;
}
return (
<div style={{ height: '100%', width: '100%' }}>
<ReactFlow
nodes={nodes}
edges={edges}
onNodesChange={onNodesChange}
onEdgesChange={onEdgesChange}
fitView // Automatically zooms to fit all elements
fitViewOptions={{ padding: 0.1 }} // Add some padding around elements
proOptions={{ hideAttribution: true }} // Hides the "React Flow" attribution
>
<Controls /> {/* Adds zoom and pan controls */}
<Background variant="dots" gap={12} size={1} color="#e5e7eb" /> {/* Light grey dots background */}
</ReactFlow>
</div>
);
}
// The main right-hand panel that hosts the analysis controls and results
function AnalysisCanvas({ allNodes, selectedNodeIds, analysisResult, onAnalyze, onClear, analysisLoading }) {
const selectedNodes = useMemo(() => allNodes.filter(n => selectedNodeIds.has(n.id)), [allNodes, selectedNodeIds]);
// Reusable Button component with disabled state tied to analysis loading
const Button = ({ onClick, disabled, children }) => (
<button
onClick={onClick}
disabled={disabled || analysisLoading}
className="text-xs px-2 py-1 bg-gray-200 rounded hover:bg-gray-300 disabled:opacity-50 disabled:cursor-not-allowed transition-colors
focus:outline-none focus:ring-2 focus:ring-blue-400 focus:ring-opacity-75"
>
{children}
</button>
);
return (
<div className="h-full flex flex-col p-2 bg-gray-50">
{/* -- Top Control Panel: Selection and Actions -- */}
<div className="flex-shrink-0 border rounded-lg p-2 bg-white mb-2 shadow-sm">
<div className="flex justify-between items-center mb-1">
<h3 className="font-semibold text-sm">Analysis Set ({selectedNodes.length})</h3>
<button onClick={onClear} disabled={selectedNodes.length === 0 || analysisLoading} className="text-xs text-red-500 hover:underline disabled:opacity-50">Clear</button>
</div>
{/* Display selected nodes */}
<div className="max-h-24 overflow-y-auto text-xs text-gray-600 mb-2 border-t pt-1">
{selectedNodes.length > 0 ? selectedNodes.map(n => <div key={n.id}>- {n.name}</div>) : <div className="italic text-gray-400">Select nodes from the explorer.</div>}
</div>
{/* Analysis Action Buttons */}
<div className="flex flex-wrap space-x-2 space-y-1 border-t pt-2">
<Button onClick={() => onAnalyze('PATH')} disabled={selectedNodes.length !== 2}>Find Path</Button>
<Button onClick={() => onAnalyze('SUBGRAPH')} disabled={selectedNodes.length < 1}>Show Relations</Button>
<Button onClick={() => onAnalyze('UPSTREAM')} disabled={selectedNodes.length < 1}>Common Deps</Button>
<Button onClick={() => onAnalyze('DOWNSTREAM')} disabled={selectedNodes.length < 1}>Common Impacts</Button>
</div>
</div>
{/* -- Bottom Results Panel: Graph Visualization -- */}
<div className="flex-grow border rounded-lg bg-white shadow-sm relative overflow-hidden">
{/* Loading overlay */}
{analysisLoading && <div className="absolute inset-0 bg-white bg-opacity-75 flex items-center justify-center z-10 text-lg font-semibold">
Analyzing...
</div>}
{/* Display analysis results */}
{analysisResult && analysisResult.type === 'graph' && (
<GraphViewer graphData={analysisResult.data} isPath={analysisResult.isPath} />
)}
{/* Placeholder when no analysis has been run */}
{!analysisResult && !analysisLoading && <div className="text-center text-gray-400 mt-8 text-sm">Run an analysis to see the results here.</div>}
</div>
</div>
);
}
// Main App component
function App({ appState, traceabilityService }) {
useEffect(() => { traceabilityService.fetchGraph(); }, [traceabilityService]); // Fetch graph on mount
const [leftWidth, setLeftWidth] = useState(280); // Initial width for the explorer panel
// Logic for resizing the explorer panel
const handleMouseDown = (e) => {
const startX = e.clientX;
const startWidth = leftWidth;
// Handler for moving the mouse while dragging
const handleMouseMove = (me) => setLeftWidth(Math.max(200, startWidth + me.clientX - startX)); // Min width 200px
// Handler for releasing the mouse button
const handleMouseUp = () => {
document.removeEventListener('mousemove', handleMouseMove);
document.removeEventListener('mouseup', handleMouseUp);
};
document.addEventListener('mousemove', handleMouseMove);
document.addEventListener('mouseup', handleMouseUp, { once: true }); // Use once to automatically clean up
};
return (
<div className="flex h-screen w-screen bg-white text-gray-800">
{/* Explorer Panel */}
<div style={{ width: `${leftWidth}px` }} className="flex-shrink-0 h-full border-r">
<ExplorerPanel
nodes={appState.graphData.nodes}
onSelect={traceabilityService.toggleNodeSelection}
onRefresh={traceabilityService.fetchGraph}
loading={appState.loading}
selectedNodeIds={appState.selectedNodeIds}
/>
</div>
{/* Resizer Handle */}
<div onMouseDown={handleMouseDown} className="w-1 cursor-col-resize bg-gray-200 hover:bg-blue-500 transition-colors" />
{/* Analysis Canvas Panel */}
<div className="flex-grow h-full overflow-hidden">
<AnalysisCanvas
allNodes={appState.graphData.nodes}
selectedNodeIds={appState.selectedNodeIds}
analysisResult={appState.analysisResult}
onAnalyze={traceabilityService.performAnalysis}
onClear={traceabilityService.clearNodeSelection}
analysisLoading={appState.analysisLoading}
/>
</div>
</div>
);
}
// == 4. COMPOSITION ROOT ==
function AppContainer() {
// Setup dependency injection container
const container = useMemo(() => createContainer({ injectionMode: InjectionMode.PROXY }), []);
// Initialize application state
const [appState, setAppState] = useState({
graphData: { nodes: [], edges: [] },
selectedNodeIds: new Set(),
analysisResult: null,
loading: true,
analysisLoading: false,
});
// Initialize and register services
const traceabilityService = useMemo(() => {
const appStateInstance = new AppState(setAppState);
container.register({
rpcClient: asClass(MendixRpcClient).singleton(),
appState: asValue(appStateInstance),
traceabilityService: asClass(TraceabilityService).singleton(),
});
return container.resolve('traceabilityService');
}, [container]); // Dependency on container ensures it's created once
// Render the main App component
return <App appState={appState} traceabilityService={traceabilityService} />;
}
// Render the React application
const root = ReactDOM.createRoot(document.getElementById('app'));
root.render(<AppContainer />);
</script>
</html>
import json
from typing import Any, Dict, List, Type, Set
import traceback
from collections import deque
PostMessage("backend:clear",'')
# ShowDevTools()
# --- 1. LIBRARY IMPORTS ---
from dependency_injector import containers, providers
import clr
clr.AddReference("Mendix.StudioPro.ExtensionsAPI")
from Mendix.StudioPro.ExtensionsAPI.Model.UntypedModel import PropertyType
from Mendix.StudioPro.ExtensionsAPI.Model.Microflows import IMicroflow
from Mendix.StudioPro.ExtensionsAPI.Model.Pages import IPage
from abc import ABC, abstractmethod
# --- Core Utilities (Unchanged) ---
def serialize_json_object(json_object: Any) -> str:
import System.Text.Json
return System.Text.Json.JsonSerializer.Serialize(json_object)
def deserialize_json_string(json_string: str) -> Any:
return json.loads(json_string)
def post_message(channel: str, message: str):
PostMessage(channel, message)
# === 2. APPLICATION COMPONENTS (Interfaces and Implementations) ===
#region Abstractions (Interfaces)
class IElementMapper(ABC):
# ... (Unchanged)
@abstractmethod
def map_summary_from_unit(self, unit: Any, module_name: str) -> Dict[str, Any]: pass
@abstractmethod
def map_summary_from_module(self, module: Any) -> Dict[str, Any]: pass
@abstractmethod
def map_details_from_element(self, element: Any) -> Dict[str, Any]: pass
class IElementRetriever(ABC):
# ... (Unchanged)
@abstractmethod
def get_all_elements(self) -> List[Dict[str, Any]]: pass
@abstractmethod
def get_element_by_id_and_type(self, element_id: str, element_type: str) -> Any: pass
class IEditorActions(ABC):
# ... (Unchanged)
@abstractmethod
def locate_element(self, qualified_name: str, element_type: str) -> Dict[str, Any]: pass
# --- [NEW] Traceability Analyzer Abstraction ---
# --- [MODIFIED] Traceability Analyzer Abstraction ---
class ITraceabilityAnalyzer(ABC):
@abstractmethod
def get_full_graph(self) -> Dict[str, Any]:
"""Returns the complete traceability graph."""
pass
@abstractmethod
def find_paths(self, start_node_id: str, end_node_id: str) -> List[List[Dict[str, Any]]]:
"""Finds all paths between two nodes."""
pass
@abstractmethod
def find_common_upstream(self, node_ids: List[str]) -> Dict[str, Any]:
"""Finds common ancestors (dependencies) for a set of nodes."""
pass
@abstractmethod
def find_common_downstream(self, node_ids: List[str]) -> Dict[str, Any]:
"""Finds common descendants (impacts) for a set of nodes."""
pass
@abstractmethod
def get_subgraph(self, node_ids: List[str]) -> Dict[str, Any]:
"""Returns a subgraph containing only the specified nodes and their direct connections."""
pass
#endregion
#region Concrete Implementations ---
class ElementMapper(IElementMapper):
# ... (Unchanged, implementation is identical)
def map_summary_from_unit(self, unit: Any, module_name: str) -> Dict[str, Any]:
return {
"id": str(unit.ID), "name": f"{module_name}.{unit.Name}",
"type": unit.Type.split("$")[-1],
"qualifiedName": unit.QualifiedName if hasattr(unit, "QualifiedName") else None
}
def map_summary_from_module(self, module: Any) -> Dict[str, Any]:
return {"id": str(module.ID), "name": f"{module.Name}", "type": "module"}
def map_details_from_element(self, element: Any) -> Dict[str, Any]:
return {
"name": element.Name, "type": element.Type, "qualifiedName": element.QualifiedName,
"properties": [], "children": [] # Simplified for brevity
}
class MendixElementRetriever(IElementRetriever):
# ... (Unchanged, implementation is identical)
def __init__(self, root: Any, mapper: IElementMapper):
self._root = root
self._mapper = mapper
self._unit_type_map = {"Microflow": "Microflows$Microflow", "Page": "Pages$Page"}
def get_all_elements(self) -> List[Dict[str, Any]]:
return [self._mapper.map_summary_from_module(m) for m in self._root.GetUnitsOfType("Projects$Module")]
def get_element_by_id_and_type(self, element_id: str, element_type: str) -> Any:
# ... simplified for brevity
return None
class MendixEditorActions(IEditorActions):
# ... (Unchanged, implementation is identical)
def locate_element(self, qualifiedName: str, elementType: str) -> Dict[str, Any]:
# ... implementation
return {"success": True}
# --- [HEAVILY MODIFIED] Traceability Analyzer Implementation ---
class MendixTraceabilityAnalyzer(ITraceabilityAnalyzer):
"""
Encapsulates graph building and querying logic.
Caches the built graph for performance.
"""
def __init__(self, root: Any):
self._root = root
self._full_graph_cache = None
def _build_graph_if_needed(self):
if self._full_graph_cache:
return
# --- (The existing graph building logic from your code) ---
nodes_map = {}
edges_list = []
page_lookup = {p.QualifiedName: p for m in self._root.GetUnitsOfType('Projects$Module') for p in m.GetUnitsOfType('Pages$Page')}
microflow_lookup = {mf.QualifiedName: mf for m in self._root.GetUnitsOfType('Projects$Module') for mf in m.GetUnitsOfType('Microflows$Microflow')}
# Helper functions adapted to be local
def _get_or_create_node(q_name, type_override=None, name=None):
# ... (Same as your _get_or_create_node, but uses local nodes_map)
if not q_name or q_name in nodes_map: return
parts = q_name.split('.')
node = {"id": q_name, "type": type_override or "UNKNOWN", "name": name or parts[-1], "module": parts[0]}
if not type_override:
if page_lookup.get(q_name): node["type"] = "PAGE"
elif microflow_lookup.get(q_name): node["type"] = "MICROFLOW"
nodes_map[q_name] = node
def _add_edge(source, target, type):
if source and target: edges_list.append({"source": source, "target": target, "type": type})
def _get_property_value(el, prop):
p = el.GetProperty(prop)
return p.Value if p and p.Value else None
def _get_references_from_unit(unit, is_page):
# ... (Same as your _get_references_from_unit)
refs = []
if is_page:
for source in unit.GetElementsOfType('Pages$MicroflowSource'):
microflowSettings = _get_property_value(source, 'microflowSettings')
if microflowSettings:
microflow = _get_property_value(microflowSettings, 'microflow')
refs.append((microflow, "CALLS"))
else: # Is Microflow
for call in unit.GetElementsOfType('Microflows$MicroflowCall'):
if mf := _get_property_value(call, 'microflow'): refs.append((mf, "CALLS"))
for act in unit.GetElementsOfType('Microflows$ActionActivity'):
action = _get_property_value(act, 'action')
if action and action.Type == 'Microflows$ShowPageAction':
if page := _get_property_value(_get_property_value(action, 'pageSettings'), 'page'):
refs.append((page, "SHOWS"))
return list(set(refs))
# --- (The rest of the graph traversal logic) ---
queue = []
processed_items = set()
# Seeding from navigation... (simplified)
nav_docs = self._root.GetUnitsOfType('Navigation$NavigationDocument')
for nav_doc in nav_docs:
for profile in nav_doc.GetElementsOfType('Navigation$NavigationProfile'):
homePage = _get_property_value(profile, 'homePage')
home_page_name = _get_property_value(homePage, 'page')
if home_page_name:
nav_id = f"Navigation.{profile.Name}"
_get_or_create_node(nav_id, "NAVIGATION_ITEM", name=f"Home Page ({profile.Name})")
_get_or_create_node(home_page_name, "PAGE")
_add_edge(nav_id, home_page_name, "SHOWS")
if home_page_name not in processed_items:
queue.append(home_page_name)
processed_items.add(home_page_name)
# Traversing...
head = 0
while head < len(queue):
current_id = queue[head]; head += 1
unit = page_lookup.get(current_id) or microflow_lookup.get(current_id)
if not unit: continue
references = _get_references_from_unit(unit, current_id in page_lookup)
for ref_id, edge_type in references:
if ref_id:
_get_or_create_node(ref_id)
_add_edge(current_id, ref_id, edge_type)
if ref_id not in processed_items:
queue.append(ref_id)
processed_items.add(ref_id)
# --- Create efficient lookup structures and cache ---
self._full_graph_cache = {
"nodes": list(nodes_map.values()),
"edges": edges_list
}
self._nodes_by_id = {node['id']: node for node in self._full_graph_cache['nodes']}
# Adjacency lists for fast traversal
self._adj = {node['id']: [] for node in self._full_graph_cache['nodes']}
self._rev_adj = {node['id']: [] for node in self._full_graph_cache['nodes']}
for edge in edges_list:
if edge['source'] in self._adj: self._adj[edge['source']].append(edge['target'])
if edge['target'] in self._rev_adj: self._rev_adj[edge['target']].append(edge['source'])
# --- Public API Methods ---
def get_full_graph(self) -> Dict[str, Any]:
self._build_graph_if_needed()
return self._full_graph_cache
def find_paths(self, start_node_id: str, end_node_id: str) -> List[List[Dict[str, Any]]]:
self._build_graph_if_needed()
# Using BFS to find the shortest path for simplicity
if start_node_id not in self._adj or end_node_id not in self._adj:
return []
queue = deque([(start_node_id, [start_node_id])])
visited = {start_node_id}
while queue:
current_node, path = queue.popleft()
if current_node == end_node_id:
return [[self._nodes_by_id.get(node_id) for node_id in path if self._nodes_by_id.get(node_id)]]
for neighbor in self._adj.get(current_node, []):
if neighbor not in visited:
visited.add(neighbor)
new_path = list(path)
new_path.append(neighbor)
queue.append((neighbor, new_path))
return [] # No path found
def _traverse(self, start_nodes: List[str], forward: bool = True) -> Set[str]:
adj_list = self._adj if forward else self._rev_adj
all_related = set()
for start_node in start_nodes:
q = deque([start_node])
visited = {start_node}
while q:
curr = q.popleft()
for neighbor in adj_list.get(curr, []):
if neighbor not in visited:
visited.add(neighbor)
q.append(neighbor)
all_related.update(visited)
return all_related
def find_common_upstream(self, node_ids: List[str]) -> Dict[str, Any]:
self._build_graph_if_needed()
if not node_ids: return {"nodes": [], "edges": []}
ancestor_sets = []
for node_id in node_ids:
ancestors = self._traverse([node_id], forward=False)
ancestor_sets.append(ancestors)
common_ancestors_ids = set.intersection(*ancestor_sets)
# We should not include the selected nodes themselves in the result
common_ancestors_ids = common_ancestors_ids - set(node_ids)
return self.get_subgraph(list(common_ancestors_ids))
def find_common_downstream(self, node_ids: List[str]) -> Dict[str, Any]:
self._build_graph_if_needed()
if not node_ids: return {"nodes": [], "edges": []}
descendant_sets = []
for node_id in node_ids:
descendants = self._traverse([node_id], forward=True)
descendant_sets.append(descendants)
common_descendants_ids = set.intersection(*descendant_sets)
common_descendants_ids = common_descendants_ids - set(node_ids)
return self.get_subgraph(list(common_descendants_ids))
def get_subgraph(self, node_ids: List[str]) -> Dict[str, Any]:
self._build_graph_if_needed()
node_id_set = set(node_ids)
subgraph_nodes = [node for node in self._full_graph_cache['nodes'] if node['id'] in node_id_set]
subgraph_edges = [edge for edge in self._full_graph_cache['edges'] if edge['source'] in node_id_set and edge['target'] in node_id_set]
return {"nodes": subgraph_nodes, "edges": subgraph_edges}
#endregion
#region rpc
class RpcHandler:
def __init__(self, retriever: IElementRetriever, editor: IEditorActions, mapper: IElementMapper, analyzer: ITraceabilityAnalyzer):
self._retriever = retriever
self._editor = editor
self._mapper = mapper
self._analyzer = analyzer # [NEW] Injected dependency
# ... (Existing methods are unchanged)
def get_all_elements(self) -> List[Dict[str, Any]]: return self._retriever.get_all_elements()
def get_element_details(self, elementId: str, elementType: str) -> Dict[str, Any]:
# ... implementation
return {}
def locate_element(self, qualifiedName: str, elementType: str) -> Dict[str, Any]:
return self._editor.locate_element(qualifiedName, elementType)
# --- [NEW] RPC Method Implementation ---
def get_traceability_graph(self) -> Dict[str, Any]:
return self._analyzer.analyze()
def get_traceability_graph(self) -> Dict[str, Any]:
# This now just serves to get the full graph initially
return self._analyzer.get_full_graph()
# --- [NEW] RPC Methods for Analysis ---
def find_paths(self, startNodeId: str, endNodeId: str) -> List[List[Dict[str, Any]]]:
return self._analyzer.find_paths(startNodeId, endNodeId)
def find_common_upstream(self, nodeIds: List[str]) -> Dict[str, Any]:
return self._analyzer.find_common_upstream(nodeIds)
def find_common_downstream(self, nodeIds: List[str]) -> Dict[str, Any]:
return self._analyzer.find_common_downstream(nodeIds)
def get_subgraph(self, nodeIds: List[str]) -> Dict[str, Any]:
return self._analyzer.get_subgraph(nodeIds)
class RpcDispatcher:
# ... (Unchanged, implementation is identical)
def __init__(self): self._methods = {}
def register_method(self, name: str, func): self._methods[name] = func
def handle_request(self, request: Dict[str, Any]):
try:
result = self._methods[request.get('method')](**request.get('params', {}))
return {'jsonrpc': '2.0', 'result': result, 'requestId': request.get('id')}
except Exception as e:
error_message = f"An error occurred: {e}\n{traceback.format_exc()}"
PostMessage("backend:info", error_message)
return {'jsonrpc': '2.0', 'error': error_message, 'requestId': request.get('id')}
#endregion
#region IoC CONTAINER CONFIGURATION ===
class AppContainer(containers.DeclarativeContainer):
config = providers.Configuration()
# Data Mapping Layer
element_mapper: providers.Provider[IElementMapper] = providers.Singleton(ElementMapper)
# Platform Layer
editor_actions: providers.Provider[IEditorActions] = providers.Singleton(MendixEditorActions)
element_retriever: providers.Provider[IElementRetriever] = providers.Singleton(
MendixElementRetriever, root=config.mendix_root, mapper=element_mapper
)
# [NEW] Analysis Layer
traceability_analyzer: providers.Provider[ITraceabilityAnalyzer] = providers.Singleton(
MendixTraceabilityAnalyzer, root=config.mendix_root
)
# Application Layer
rpc_handler = providers.Singleton(
RpcHandler,
retriever=element_retriever,
editor=editor_actions,
mapper=element_mapper,
analyzer=traceability_analyzer, # [NEW] Injecting the analyzer
)
# Dispatcher Layer
dispatcher = providers.Singleton(RpcDispatcher)
#endregion
#region COMPOSITION ROOT & EVENT HANDLING ===
container = AppContainer()
container.config.mendix_root.from_value(root)
rpc_handler_instance = container.rpc_handler()
dispatcher_instance = container.dispatcher()
# f=container.traceability_analyzer().analyze()
# PostMessage("backend:info", f"{f}")
# Register both old and new RPC methods
rpc_methods = {
'getAllElements': rpc_handler_instance.get_all_elements,
'getElementDetails': rpc_handler_instance.get_element_details,
'locateElement': rpc_handler_instance.locate_element,
'getTraceabilityGraph': rpc_handler_instance.get_traceability_graph, # [NEW]
'findPaths': rpc_handler_instance.find_paths,
'findCommonUpstream': rpc_handler_instance.find_common_upstream,
'findCommonDownstream': rpc_handler_instance.find_common_downstream,
'getSubgraph': rpc_handler_instance.get_subgraph,
}
for name, method in rpc_methods.items():
dispatcher_instance.register_method(name, method)
def onMessage(e):
if e.Message == "frontend:message":
message_data = deserialize_json_string(serialize_json_object(e))
# PostMessage("backend:info", serialize_json_object(e))
response = dispatcher_instance.handle_request(message_data["Data"])
post_message("backend:response", json.dumps(response))
#endregion
{
"name": "Mendix Navigation Visualizer",
"author": "wengao",
"email": "[email protected]",
"ui": "index.html",
"plugin": "main.py"
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment