Created
March 28, 2026 21:09
-
-
Save mrexodia/7b5555b9022d029cde2cd91b3fed865c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Minimal MCP server with pi-like tools for workshops. | |
| This version is intentionally small and easy to read. | |
| It is not a full behavioral match for pi's native tools. | |
| Each tool includes comments describing what a production version would need. | |
| """ | |
| import argparse | |
| import datetime | |
| import os | |
| import subprocess | |
| from typing import Annotated, Optional | |
| from urllib.parse import urlparse | |
| from zeromcp import McpServer, McpToolError | |
| mcp = McpServer("pi-minimal") | |
| SYSTEM_PROMPT_PATH = os.path.join(os.path.dirname(__file__), "system-prompt.md") | |
| WORKING_DIRECTORY = "" | |
| def resolve_path(path: str) -> str: | |
| """Resolve a relative path against the configured working directory.""" | |
| # Workshop simplification: | |
| # - strips a leading @ because models often include it for file references | |
| # - expands ~ for convenience | |
| # Missing compared to pi: | |
| # - no special handling for macOS screenshot filename variants | |
| # - no extra normalization for Unicode spaces / quote variants | |
| normalized = path[1:] if path.startswith("@") else path | |
| expanded = os.path.expanduser(normalized) | |
| if os.path.isabs(expanded): | |
| return expanded | |
| return os.path.abspath(os.path.join(WORKING_DIRECTORY, expanded)) | |
| @mcp.prompt | |
| def pi(request: Annotated[str, "User request to generate a prompt for"]) -> str: | |
| """System prompt from pi.""" | |
| today_str = datetime.date.today().isoformat() | |
| with open(SYSTEM_PROMPT_PATH, "r", encoding="utf-8") as file_handle: | |
| return ( | |
| file_handle.read().strip() | |
| + f""" | |
| Current date: {today_str} | |
| Current working directory: {WORKING_DIRECTORY} | |
| {request}""" | |
| ) | |
| @mcp.tool | |
| def read( | |
| path: Annotated[str, "Path to the file to read (relative or absolute)"], | |
| offset: Annotated[ | |
| Optional[int], "Line number to start reading from (1-indexed)" | |
| ] = None, | |
| limit: Annotated[Optional[int], "Maximum number of lines to read"] = None, | |
| ) -> str: | |
| """Read the contents of a text file.""" | |
| try: | |
| abs_path = resolve_path(path) | |
| with open(abs_path, "r", encoding="utf-8") as file_handle: | |
| lines = file_handle.read().splitlines() | |
| start = max(0, (offset or 1) - 1) | |
| if start >= len(lines): | |
| raise McpToolError( | |
| f"Offset {offset} is beyond end of file ({len(lines)} lines total)" | |
| ) | |
| selected = lines[start : start + limit] if limit is not None else lines[start:] | |
| output = "\n".join(selected) | |
| # Workshop simplification: | |
| # - no 2000-line / 50KB truncation | |
| # - no image support | |
| # - no continuation hints when output is large | |
| # - reads whole file into memory | |
| return output | |
| except McpToolError: | |
| raise | |
| except Exception as error: | |
| raise McpToolError(f"Failed to read file {path}: {error}") | |
| @mcp.tool | |
| def bash( | |
| command: Annotated[str, "Bash command to execute"], | |
| timeout: Annotated[ | |
| Optional[int], "Timeout in seconds (optional, no default timeout)" | |
| ] = None, | |
| ) -> str: | |
| """Execute a bash command in the current working directory.""" | |
| try: | |
| result = subprocess.run( | |
| command, | |
| shell=True, | |
| cwd=WORKING_DIRECTORY, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| encoding="utf-8", | |
| timeout=timeout, | |
| check=False, | |
| ) | |
| output = result.stdout | |
| # Workshop simplification: | |
| # - no streaming partial output to the client/model | |
| # - no temp-file fallback for very large output | |
| # - no byte-aware truncation like pi's native bash tool | |
| # If you use this in production, you should truncate large outputs. | |
| if result.returncode != 0: | |
| raise McpToolError( | |
| f"{output}\n\nCommand exited with code {result.returncode}" | |
| ) | |
| return output | |
| except McpToolError: | |
| raise | |
| except subprocess.TimeoutExpired: | |
| raise McpToolError(f"Command timed out after {timeout} seconds") | |
| except Exception as error: | |
| raise McpToolError(f"Failed to execute bash command '{command}': {error}") | |
| @mcp.tool | |
| def edit( | |
| path: Annotated[str, "Path to the file to edit (relative or absolute)"], | |
| oldText: Annotated[str, "Exact text to find and replace (must match exactly)"], | |
| newText: Annotated[str, "New text to replace the old text with"], | |
| ) -> str: | |
| """Edit a file by replacing one exact text match.""" | |
| try: | |
| abs_path = resolve_path(path) | |
| with open(abs_path, "r", encoding="utf-8") as file_handle: | |
| content = file_handle.read() | |
| count = content.count(oldText) | |
| if count == 0: | |
| raise McpToolError( | |
| f"Could not find the exact text in {path}. The old text must match exactly." | |
| ) | |
| if count > 1: | |
| raise McpToolError( | |
| f"Found {count} occurrences of the text in {path}. Provide more context to make it unique." | |
| ) | |
| new_content = content.replace(oldText, newText, 1) | |
| if new_content == content: | |
| raise McpToolError( | |
| f"No changes made to {path}. The replacement produced identical content." | |
| ) | |
| with open(abs_path, "w", encoding="utf-8") as file_handle: | |
| file_handle.write(new_content) | |
| # Workshop simplification: | |
| # - no fuzzy matching fallback | |
| # - no BOM handling | |
| # - no line-ending preservation logic | |
| # - no diff/details in the result | |
| return f"Successfully replaced text in {path}." | |
| except McpToolError: | |
| raise | |
| except Exception as error: | |
| raise McpToolError(f"Failed to edit file {path}: {error}") | |
| @mcp.tool | |
| def write( | |
| path: Annotated[str, "Path to the file to write (relative or absolute)"], | |
| content: Annotated[str, "Content to write to the file"], | |
| ) -> str: | |
| """Write content to a file, creating parent directories if needed.""" | |
| try: | |
| abs_path = resolve_path(path) | |
| parent = os.path.dirname(abs_path) | |
| if parent: | |
| os.makedirs(parent, exist_ok=True) | |
| with open(abs_path, "w", encoding="utf-8") as file_handle: | |
| file_handle.write(content) | |
| # Workshop simplification: | |
| # - no cancellation/abort support | |
| # - no extra metadata/details returned to the model | |
| return f"Successfully wrote {len(content)} bytes to {path}" | |
| except Exception as error: | |
| raise McpToolError(f"Failed to write file {path}: {error}") | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Minimal pi MCP") | |
| parser.add_argument( | |
| "--transport", | |
| help="Transport (stdio or http://host:port)", | |
| default="http://127.0.0.1:5001", | |
| ) | |
| parser.add_argument( | |
| "working_directory", | |
| help="Initial working directory for the server", | |
| ) | |
| args = parser.parse_args() | |
| WORKING_DIRECTORY = os.path.abspath(os.path.expanduser(args.working_directory)) | |
| if args.transport == "stdio": | |
| mcp.stdio() | |
| else: | |
| url = urlparse(args.transport) | |
| if url.hostname is None or url.port is None: | |
| raise Exception(f"Invalid transport URL: {args.transport}") | |
| print("Starting Minimal MCP Server...") | |
| print("\nAvailable tools:") | |
| for name in mcp.tools.methods.keys(): | |
| func = mcp.tools.methods[name] | |
| print(f" - {name}: {func.__doc__}") | |
| print() | |
| mcp.serve(url.hostname, url.port) | |
| try: | |
| input("\nServer is running, press Enter or Ctrl+C to stop...") | |
| except (KeyboardInterrupt, EOFError): | |
| print("\n\nStopping server...") | |
| mcp.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment