Created
April 1, 2025 03:09
-
-
Save jacobparis/5c6f5c2c6a64cf2953b814e5ca71b772 to your computer and use it in GitHub Desktop.
React Router MCP Server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js" | |
import { SSEServerTransport } from "@modelcontextprotocol/sdk/server/sse.js" | |
import { Socket } from "net" | |
import { Readable } from "stream" | |
import { IncomingMessage, type ServerResponse } from "http" | |
import { | |
subscribeToChannel, | |
unsubscribeFromChannel, | |
publishMessage, | |
} from "../redis.server.js" | |
import { eventStream } from "remix-utils/sse/server" | |
let servers: McpServer[] = [] | |
class McpTransport extends SSEServerTransport { | |
send: (message: any) => Promise<void> | |
private response: ServerResponse | |
constructor(path: string, send: (message: any) => void) { | |
const response = { | |
setHeader: () => {}, | |
getHeader: () => undefined, | |
writeHead: () => response, | |
write: () => true, | |
end: () => {}, | |
once: () => response, | |
on: () => response, | |
off: () => response, | |
emit: () => true, | |
removeListener: () => response, | |
addListener: () => response, | |
statusCode: 200, | |
headersSent: false, | |
finished: false, | |
} as unknown as ServerResponse | |
super(path, response) | |
this.send = async (message) => { | |
send(message) | |
} | |
this.response = response | |
} | |
protected emitMessage(message: any) { | |
return this.send(message) | |
} | |
} | |
export function mcpStream( | |
serverFactory: () => McpServer, | |
options: { signal: AbortSignal }, | |
) { | |
const server = serverFactory() | |
servers.push(server) | |
server.server.onclose = () => { | |
servers = servers.filter((s) => s !== server) | |
} | |
const logs: { type: "log" | "error"; messages: string[] }[] = [] | |
const logInterval = setInterval(() => { | |
for (const log of logs) { | |
console[log.type].call(console, ...log.messages) | |
} | |
logs.length = 0 | |
}, 100) | |
return eventStream(options.signal, function setup(send) { | |
const cleanupController = new AbortController() | |
const transport = new McpTransport("/mcp/message", (message) => { | |
send({ | |
event: "message", | |
data: JSON.stringify(message), | |
}) | |
}) | |
const handleMessage = async (message: string) => { | |
logs.push({ | |
type: "log", | |
messages: ["Received message from Redis", message], | |
}) | |
const { requestId, url, method, headers, body } = JSON.parse(message) | |
let status = 100 | |
let responseBody = "" | |
try { | |
await transport.handlePostMessage( | |
createRequest({ method, url, headers, body }), | |
createResponse((r) => { | |
status = r.status | |
responseBody = r.body | |
}), | |
) | |
await publishMessage( | |
`responses:${transport.sessionId}:${requestId}`, | |
JSON.stringify({ | |
jsonrpc: "2.0", | |
id: requestId, | |
result: { | |
status, | |
body: responseBody, | |
}, | |
}), | |
) | |
logs.push({ | |
type: status >= 200 && status < 300 ? "log" : "error", | |
messages: [ | |
`Request ${transport.sessionId}:${requestId} ${ | |
status >= 200 && status < 300 ? "succeeded" : "failed" | |
}: ${responseBody}`, | |
], | |
}) | |
} catch (error) { | |
await publishMessage( | |
`responses:${transport.sessionId}:${requestId}`, | |
JSON.stringify({ | |
jsonrpc: "2.0", | |
id: requestId, | |
error: { | |
code: -32000, | |
message: | |
error instanceof Error ? error.message : "Internal error", | |
}, | |
}), | |
) | |
logs.push({ | |
type: "error", | |
messages: [ | |
`Request ${transport.sessionId}:${requestId} failed with error: ${error}`, | |
], | |
}) | |
} | |
} | |
const cleanup = async () => { | |
clearInterval(logInterval) | |
await unsubscribeFromChannel( | |
`requests:${transport.sessionId}`, | |
handleMessage, | |
) | |
await server.close() | |
servers = servers.filter((s) => s !== server) | |
cleanupController.abort() | |
} | |
// Set up the abort listener before any async operations | |
options.signal.addEventListener("abort", cleanup, { once: true }) | |
cleanupController.signal.addEventListener("abort", cleanup, { once: true }) | |
// Start the subscription and server connection | |
subscribeToChannel(`requests:${transport.sessionId}`, handleMessage).then( | |
() => { | |
return server.connect(transport) | |
}, | |
) | |
return cleanup | |
}) | |
} | |
function createRequest({ | |
method = "GET", | |
url = "/", | |
headers = {}, | |
body = null, | |
} = {}) { | |
const readable = new Readable() | |
readable._read = () => {} | |
if (body) { | |
readable.push(typeof body === "string" ? body : JSON.stringify(body)) | |
readable.push(null) | |
} | |
const req = new IncomingMessage(new Socket()) | |
req.method = method | |
req.url = url | |
req.headers = headers | |
req.push = readable.push.bind(readable) | |
req.read = readable.read.bind(readable) | |
req.on = readable.on.bind(readable) | |
req.pipe = readable.pipe.bind(readable) | |
return req | |
} | |
function createResponse( | |
callback: ({ status, body }: { status: number; body: string }) => void, | |
) { | |
return { | |
writeHead(code: number) { | |
callback({ status: code, body: "" }) | |
return this | |
}, | |
end(body: unknown) { | |
callback({ status: 200, body: body as string }) | |
return this | |
}, | |
} as unknown as ServerResponse | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { createClient } from "redis" | |
import { invariant } from "@epic-web/invariant" | |
import { remember } from "@epic-web/remember" | |
export const redis = remember("redis", createRedisClient) | |
export const redisPublisher = remember("redisPublisher", createRedisPublisher) | |
// Maximum duration for SSE connections in seconds | |
export const maxDuration = 60 | |
export function createRedisClient() { | |
const redisUrl = process.env.REDIS_URL || process.env.KV_URL | |
invariant(redisUrl, "REDIS_URL or KV_URL environment variable is not set") | |
const client = createClient({ url: redisUrl }) | |
client.on("error", (err) => { | |
console.error(`Redis error ${redisUrl}`, err) | |
}) | |
// Connect immediately | |
void client.connect().catch((err) => { | |
console.error(`Failed to connect to ${redisUrl}`, err) | |
}) | |
return client | |
} | |
export function createRedisPublisher() { | |
const redisUrl = process.env.REDIS_URL || process.env.KV_URL | |
invariant(redisUrl, "REDIS_URL or KV_URL environment variable is not set") | |
const client = createClient({ url: redisUrl }) | |
client.on("error", (err) => { | |
console.error(`Redis publisher error ${redisUrl}`, err) | |
}) | |
// Connect immediately | |
void client.connect().catch((err) => { | |
console.error(`Failed to connect to ${redisUrl}`, err) | |
}) | |
return client | |
} | |
// Helper function to ensure Redis clients are connected | |
export async function ensureRedisConnected() { | |
const redisConnected = redis.isOpen ? Promise.resolve() : redis.connect() | |
const publisherConnected = redisPublisher.isOpen | |
? Promise.resolve() | |
: redisPublisher.connect() | |
try { | |
await Promise.race([ | |
Promise.all([redisConnected, publisherConnected]), | |
new Promise((_, reject) => | |
setTimeout(() => reject(new Error("Redis connection timeout")), 5000), | |
), | |
]) | |
} catch (error) { | |
// Clean up connections if they were started | |
if (!redis.isOpen) { | |
await redis.disconnect() | |
} | |
if (!redisPublisher.isOpen) { | |
await redisPublisher.disconnect() | |
} | |
throw error | |
} | |
} | |
// Helper for publishing messages | |
export async function publishMessage(channel: string, message: unknown) { | |
await ensureRedisConnected() | |
return redisPublisher.publish( | |
channel, | |
typeof message === "string" ? message : JSON.stringify(message), | |
) | |
} | |
// Helper for subscribing to channels | |
export async function subscribeToChannel( | |
channel: string, | |
callback: (message: string) => void, | |
) { | |
await ensureRedisConnected() | |
return redis.subscribe(channel, callback) | |
} | |
// Helper for unsubscribing from channels | |
export async function unsubscribeFromChannel( | |
channel: string, | |
callback: (message: string) => void, | |
) { | |
if (redis.isOpen) { | |
return redis.unsubscribe(channel, callback) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { ActionFunctionArgs } from "react-router" | |
import type { IncomingHttpHeaders } from "http" | |
import { | |
ensureRedisConnected, | |
subscribeToChannel, | |
unsubscribeFromChannel, | |
publishMessage, | |
} from "../redis.server.js" | |
interface SerializedRequest { | |
requestId: string | |
url: string | |
method: string | |
body: string | |
headers: IncomingHttpHeaders | |
} | |
export async function action({ request }: ActionFunctionArgs) { | |
await ensureRedisConnected() | |
// Get the request body | |
const body = await request.text() | |
// Get the URL and extract sessionId | |
const url = new URL(request.url) | |
const sessionId = url.searchParams.get("sessionId") || "" | |
if (!sessionId) { | |
return new Response("No sessionId provided", { status: 400 }) | |
} | |
const requestId = crypto.randomUUID() | |
const serializedRequest: SerializedRequest = { | |
requestId, | |
url: request.url, | |
method: request.method, | |
body: body, | |
headers: Object.fromEntries( | |
request.headers.entries(), | |
) as IncomingHttpHeaders, | |
} | |
// Create a controller to handle timeouts and aborts | |
const controller = new AbortController() | |
// Create a promise that resolves when we get a response | |
const responsePromise = new Promise<Response>((resolve, reject) => { | |
// Handles responses from the /sse endpoint | |
subscribeToChannel(`responses:${sessionId}:${requestId}`, (message) => { | |
controller.abort() | |
const response = JSON.parse(message) as { | |
status: number | |
body: string | |
} | |
resolve(new Response(response.body, { status: response.status })) | |
}).catch(reject) | |
// Set a timeout | |
setTimeout(() => { | |
controller.abort() | |
reject(new Response("Request timed out", { status: 408 })) | |
}, 10 * 1000) | |
}) | |
// Queue the request in Redis so that a subscriber can pick it up | |
await publishMessage(`requests:${sessionId}`, serializedRequest) | |
// Handle cleanup when the request is aborted | |
const abortHandler = async () => { | |
controller.abort() | |
await unsubscribeFromChannel( | |
`responses:${sessionId}:${requestId}`, | |
() => {}, | |
) | |
} | |
request.signal.addEventListener("abort", abortHandler) | |
try { | |
// Wait for the response or timeout | |
const response = await responsePromise | |
await unsubscribeFromChannel( | |
`responses:${sessionId}:${requestId}`, | |
() => {}, | |
) | |
request.signal.removeEventListener("abort", abortHandler) | |
return response | |
} catch (error) { | |
await unsubscribeFromChannel( | |
`responses:${sessionId}:${requestId}`, | |
() => {}, | |
) | |
request.signal.removeEventListener("abort", abortHandler) | |
if (error instanceof Response) { | |
return error | |
} | |
console.error("Error handling message:", error) | |
return new Response("Internal server error", { status: 500 }) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js" | |
import { z } from "zod" | |
import type { LoaderFunctionArgs } from "react-router" | |
import { ensureRedisConnected } from "../redis.server.js" | |
import { mcpStream } from "#app/utils/mcp.server.js" | |
import { matchSorter } from "match-sorter" | |
import { cachified, lru } from "#app/cache.server.ts" | |
import { remember } from "@epic-web/remember" | |
import { getAllPkgConfigs } from "#app/db/queries/get-all-pkg-configs.ts" | |
const mcpServer = remember("mcp-server", () => { | |
const server = new McpServer( | |
{ | |
name: "pkgless", | |
version: "0.1.0", | |
}, | |
{ | |
capabilities: { | |
tools: { | |
"templates-search": { | |
description: "Search for project templates based on criteria", | |
}, | |
}, | |
}, | |
}, | |
) | |
server.tool( | |
"templates-search", | |
{ | |
query: z.string().optional(), | |
}, | |
async ({ query }) => { | |
console.log(`[MCP] templates-search`, { query }) | |
const templates = await cachified({ | |
key: "pkg-configs", | |
cache: lru, | |
ttl: 60 * 10 * 1000, // 10 minutes | |
swr: Infinity, | |
async getFreshValue() { | |
console.log(`[Cache] Getting fresh value for pkg-configs in MCP`) | |
return getAllPkgConfigs() | |
}, | |
}) | |
let filteredTemplates = templates | |
if (query?.trim()) { | |
filteredTemplates = matchSorter(filteredTemplates, query.trim(), { | |
keys: [ | |
"name", | |
"repo", | |
"environments", | |
"config.description", | |
"config.technologies", | |
], | |
}) | |
} | |
return { | |
content: [ | |
{ | |
type: "text", | |
text: `Found ${filteredTemplates.length} matching templates`, | |
}, | |
], | |
templates: filteredTemplates.map((template) => ({ | |
repo: template.repo, | |
path: template.path, | |
installCommand: `npx degit ${template.repo}${template.path ? `/${template.path}` : ""}`, | |
})), | |
} | |
}, | |
) | |
return server | |
}) | |
export async function loader({ request }: LoaderFunctionArgs) { | |
await ensureRedisConnected() | |
const timeoutSignal = AbortSignal.any([ | |
request.signal, | |
AbortSignal.timeout(30000), | |
]) | |
return mcpStream(() => mcpServer, { signal: timeoutSignal }) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment