Created
July 23, 2023 16:10
-
-
Save msacar/b47bc8493244a48e00fc24bed185f5bf to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import type { Server, ServerWebSocket } from "bun"; | |
import { AwsClient } from "aws4fetch"; | |
type Lambda = { | |
fetch: (request: Request, server: Server) => Promise<Response | undefined>; | |
error?: (error: unknown) => Promise<Response>; | |
websocket?: { | |
open?: (ws: ServerWebSocket) => Promise<void>; | |
message?: (ws: ServerWebSocket, message: string) => Promise<void>; | |
close?: (ws: ServerWebSocket, code: number, reason: string) => Promise<void>; | |
}; | |
}; | |
let requestId: string | undefined; | |
let traceId: string | undefined; | |
let functionArn: string | undefined; | |
let aws: AwsClient | undefined; | |
let logger = console.log; | |
function log(level: string, ...args: any[]): void { | |
if (!args.length) { | |
return; | |
} | |
const message = Bun.inspect(...args).replace(/\n/g, "\r"); | |
if (requestId === undefined) { | |
logger(level, message); | |
} else { | |
logger(level, `RequestId: ${requestId}`, message); | |
} | |
} | |
console.log = (...args: any[]) => log("INFO", ...args); | |
console.info = (...args: any[]) => log("INFO", ...args); | |
console.warn = (...args: any[]) => log("WARN", ...args); | |
console.error = (...args: any[]) => log("ERROR", ...args); | |
console.debug = (...args: any[]) => log("DEBUG", ...args); | |
console.trace = (...args: any[]) => log("TRACE", ...args); | |
let warnings: Set<string> | undefined; | |
function warnOnce(message: string, ...args: any[]): void { | |
if (warnings === undefined) { | |
warnings = new Set(); | |
} | |
if (warnings.has(message)) { | |
return; | |
} | |
warnings.add(message); | |
console.warn(message, ...args); | |
} | |
function reset(): void { | |
requestId = undefined; | |
traceId = undefined; | |
warnings = undefined; | |
} | |
function exit(...cause: any[]): never { | |
console.error(...cause); | |
process.exit(1); | |
} | |
function env(name: string, fallback?: string): string { | |
const value = process.env[name] ?? fallback ?? null; | |
if (value === null) { | |
exit(`Runtime failed to find the '${name}' environment variable`); | |
} | |
return value; | |
} | |
const runtimeUrl = new URL(`http://${env("AWS_LAMBDA_RUNTIME_API")}/2018-06-01/`); | |
async function fetch(url: string, options?: RequestInit): Promise<Response> { | |
const { href } = new URL(url, runtimeUrl); | |
const response = await globalThis.fetch(href, { | |
...options, | |
timeout: false, | |
}); | |
if (!response.ok) { | |
exit(`Runtime failed to send request to Lambda [status: ${response.status}]`); | |
} | |
return response; | |
} | |
async function fetchAws(url: string, options?: RequestInit): Promise<Response> { | |
if (aws === undefined) { | |
aws = new AwsClient({ | |
accessKeyId: env("AWS_ACCESS_KEY_ID"), | |
secretAccessKey: env("AWS_SECRET_ACCESS_KEY"), | |
sessionToken: env("AWS_SESSION_TOKEN"), | |
region: env("AWS_REGION"), | |
}); | |
} | |
return aws.fetch(url, options); | |
} | |
type LambdaError = { | |
readonly errorType: string; | |
readonly errorMessage: string; | |
readonly stackTrace?: string[]; | |
}; | |
function formatError(error: unknown): LambdaError { | |
if (error instanceof Error) { | |
return { | |
errorType: error.name, | |
errorMessage: error.message, | |
stackTrace: error.stack?.split("\n").filter(line => !line.includes(" /opt/runtime.ts")), | |
}; | |
} | |
return { | |
errorType: "Error", | |
errorMessage: Bun.inspect(error), | |
}; | |
} | |
async function sendError(type: string, cause: unknown): Promise<void> { | |
console.error(cause); | |
await fetch(requestId === undefined ? "runtime/init/error" : `runtime/invocation/${requestId}/error`, { | |
method: "POST", | |
headers: { | |
"Content-Type": "application/vnd.aws.lambda.error+json", | |
"Lambda-Runtime-Function-Error-Type": `Bun.${type}`, | |
}, | |
body: JSON.stringify(formatError(cause)), | |
}); | |
} | |
async function throwError(type: string, cause: unknown): Promise<never> { | |
await sendError(type, cause); | |
exit(); | |
} | |
async function init(): Promise<Lambda> { | |
const handlerName = env("_HANDLER"); | |
const index = handlerName.lastIndexOf("."); | |
const fileName = handlerName.substring(0, index); | |
const filePath = `${env("LAMBDA_TASK_ROOT")}/${fileName}`; | |
let file; | |
try { | |
file = await import(filePath); | |
} catch (cause) { | |
if (cause instanceof Error && cause.message.startsWith("Cannot find module")) { | |
return throwError("FileDoesNotExist", `Did not find a file named '${fileName}'`); | |
} | |
return throwError("InitError", cause); | |
} | |
const moduleName = handlerName.substring(index + 1) || "fetch"; | |
let module = file["default"] ?? file[moduleName] ?? {}; | |
if (typeof module === "function") { | |
module = { | |
fetch: module, | |
}; | |
} else if (typeof module === "object" && moduleName !== "fetch") { | |
module = { | |
...module, | |
fetch: module[moduleName], | |
}; | |
} | |
const { fetch, websocket } = module; | |
if (typeof fetch !== "function") { | |
return throwError( | |
fetch === undefined ? "MethodDoesNotExist" : "MethodIsNotAFunction", | |
`${fileName} does not have a default export with a function named '${moduleName}'`, | |
); | |
} | |
if (websocket === undefined) { | |
return module; | |
} | |
for (const name of ["open", "message", "close"]) { | |
const method = websocket[name]; | |
if (method === undefined) { | |
continue; | |
} | |
if (typeof method !== "function") { | |
return throwError( | |
"MethodIsNotAFunction", | |
`${fileName} does not have a function named '${name}' on the default 'websocket' property`, | |
); | |
} | |
} | |
return module; | |
} | |
type LambdaRequest<E = any> = { | |
readonly requestId: string; | |
readonly traceId: string; | |
readonly functionArn: string; | |
readonly deadlineMs: number | null; | |
readonly event: E; | |
}; | |
async function receiveRequest(): Promise<LambdaRequest> { | |
const response = await fetch("runtime/invocation/next"); | |
requestId = response.headers.get("Lambda-Runtime-Aws-Request-Id") ?? undefined; | |
if (requestId === undefined) { | |
exit("Runtime received a request without a request ID"); | |
} | |
traceId = response.headers.get("Lambda-Runtime-Trace-Id") ?? undefined; | |
if (traceId === undefined) { | |
exit("Runtime received a request without a trace ID"); | |
} | |
process.env["_X_AMZN_TRACE_ID"] = traceId; | |
functionArn = response.headers.get("Lambda-Runtime-Invoked-Function-Arn") ?? undefined; | |
if (functionArn === undefined) { | |
exit("Runtime received a request without a function ARN"); | |
} | |
const deadlineMs = parseInt(response.headers.get("Lambda-Runtime-Deadline-Ms") ?? "0") || null; | |
let event; | |
try { | |
event = await response.json(); | |
} catch (cause) { | |
exit("Runtime received a request with invalid JSON", cause); | |
} | |
return { | |
requestId, | |
traceId, | |
functionArn, | |
deadlineMs, | |
event, | |
}; | |
} | |
type LambdaResponse = { | |
readonly statusCode: number; | |
readonly headers?: Record<string, string>; | |
readonly isBase64Encoded?: boolean; | |
readonly body?: string; | |
readonly multiValueHeaders?: Record<string, string[]>; | |
readonly cookies?: string[]; | |
}; | |
async function formatResponse(response: Response): Promise<LambdaResponse> { | |
const statusCode = response.status; | |
const headers = response.headers.toJSON(); | |
if (statusCode === 101) { | |
const protocol = headers["sec-websocket-protocol"]; | |
if (protocol === undefined) { | |
return { | |
statusCode: 200, | |
}; | |
} | |
return { | |
statusCode: 200, | |
headers: { | |
"Sec-WebSocket-Protocol": protocol, | |
}, | |
}; | |
} | |
const mime = headers["content-type"]; | |
const isBase64Encoded = !mime || (!mime.startsWith("text/") && !mime.startsWith("application/json")); | |
const body = isBase64Encoded ? Buffer.from(await response.arrayBuffer()).toString("base64") : await response.text(); | |
delete headers["set-cookie"]; | |
const cookies = response.headers.getAll("Set-Cookie"); | |
if (cookies.length === 0) { | |
return { | |
statusCode, | |
headers, | |
isBase64Encoded, | |
body, | |
}; | |
} | |
return { | |
statusCode, | |
headers, | |
cookies, | |
multiValueHeaders: { | |
"Set-Cookie": cookies, | |
}, | |
isBase64Encoded, | |
body, | |
}; | |
} | |
async function sendResponse(response: unknown): Promise<void> { | |
if (requestId === undefined) { | |
exit("Runtime attempted to send a response without a request ID"); | |
} | |
await fetch(`runtime/invocation/${requestId}/response`, { | |
method: "POST", | |
body: response === null ? null : JSON.stringify(response), | |
}); | |
} | |
function formatBody(body?: string, isBase64Encoded?: boolean): string | null { | |
if (body === undefined) { | |
return null; | |
} | |
if (!isBase64Encoded) { | |
return body; | |
} | |
return Buffer.from(body).toString("base64"); | |
} | |
type HttpEventV1 = { | |
readonly version: "1.0"; | |
readonly requestContext: { | |
readonly requestId: string; | |
readonly domainName: string; | |
readonly httpMethod: string; | |
readonly path: string; | |
}; | |
readonly headers: Record<string, string>; | |
readonly multiValueHeaders?: Record<string, string[]>; | |
readonly queryStringParameters?: Record<string, string>; | |
readonly multiValueQueryStringParameters?: Record<string, string[]>; | |
readonly isBase64Encoded: boolean; | |
readonly body?: string; | |
}; | |
function isHttpEventV1(event: any): event is HttpEventV1 { | |
return event.version === "1.0" && typeof event.requestContext === "object"; | |
} | |
function formatHttpEventV1(event: HttpEventV1): Request { | |
const request = event.requestContext; | |
const headers = new Headers(); | |
for (const [name, value] of Object.entries(event.headers)) { | |
headers.append(name, value); | |
} | |
for (const [name, values] of Object.entries(event.multiValueHeaders ?? {})) { | |
for (const value of values) { | |
headers.append(name, value); | |
} | |
} | |
const hostname = headers.get("Host") ?? request.domainName; | |
const proto = headers.get("X-Forwarded-Proto") ?? "http"; | |
const url = new URL(request.path, `${proto}://${hostname}/`); | |
for (const [name, value] of new URLSearchParams(event.queryStringParameters)) { | |
url.searchParams.append(name, value); | |
} | |
for (const [name, values] of Object.entries(event.multiValueQueryStringParameters ?? {})) { | |
for (const value of values ?? []) { | |
url.searchParams.append(name, value); | |
} | |
} | |
return new Request(url.toString(), { | |
method: request.httpMethod, | |
headers, | |
body: formatBody(event.body, event.isBase64Encoded), | |
}); | |
} | |
type HttpEventV2 = { | |
readonly version: "2.0"; | |
readonly requestContext: { | |
readonly requestId: string; | |
readonly domainName: string; | |
readonly http: { | |
readonly method: string; | |
readonly path: string; | |
}; | |
}; | |
readonly headers: Record<string, string>; | |
readonly queryStringParameters?: Record<string, string>; | |
readonly cookies?: string[]; | |
readonly isBase64Encoded: boolean; | |
readonly body?: string; | |
}; | |
function isHttpEventV2(event: any): event is HttpEventV2 { | |
return event.version === "2.0" && typeof event.requestContext === "object"; | |
} | |
function formatHttpEventV2(event: HttpEventV2): Request { | |
const request = event.requestContext; | |
const headers = new Headers(); | |
for (const [name, values] of Object.entries(event.headers)) { | |
for (const value of values.split(",")) { | |
headers.append(name, value); | |
} | |
} | |
for (const [name, values] of Object.entries(event.queryStringParameters ?? {})) { | |
for (const value of values.split(",")) { | |
headers.append(name, value); | |
} | |
} | |
for (const cookie of event.cookies ?? []) { | |
headers.append("Set-Cookie", cookie); | |
} | |
const hostname = headers.get("Host") ?? request.domainName; | |
const proto = headers.get("X-Forwarded-Proto") ?? "http"; | |
const url = new URL(request.http.path, `${proto}://${hostname}/`); | |
return new Request(url.toString(), { | |
method: request.http.method, | |
headers, | |
body: formatBody(event.body, event.isBase64Encoded), | |
}); | |
} | |
type WebSocketEvent = { | |
readonly headers: Record<string, string>; | |
readonly multiValueHeaders: Record<string, string[]>; | |
readonly isBase64Encoded: boolean; | |
readonly body?: string; | |
readonly requestContext: { | |
readonly apiId: string; | |
readonly requestId: string; | |
readonly connectionId: string; | |
readonly domainName: string; | |
readonly stage: string; | |
readonly identity: { | |
readonly sourceIp: string; | |
}; | |
} & ( | |
| { | |
readonly eventType: "CONNECT"; | |
} | |
| { | |
readonly eventType: "MESSAGE"; | |
} | |
| { | |
readonly eventType: "DISCONNECT"; | |
readonly disconnectStatusCode: number; | |
readonly disconnectReason: string; | |
} | |
); | |
}; | |
function isWebSocketEvent(event: any): event is WebSocketEvent { | |
return typeof event.requestContext === "object" && typeof event.requestContext.connectionId === "string"; | |
} | |
function isWebSocketUpgrade(event: any): event is WebSocketEvent { | |
return isWebSocketEvent(event) && event.requestContext.eventType === "CONNECT"; | |
} | |
function formatWebSocketUpgrade(event: WebSocketEvent): Request { | |
const request = event.requestContext; | |
const headers = new Headers(); | |
headers.set("Upgrade", "websocket"); | |
headers.set("x-amzn-connection-id", request.connectionId); | |
for (const [name, values] of Object.entries(event.multiValueHeaders as any)) { | |
for (const value of (values as any) ?? []) { | |
headers.append(name, value); | |
} | |
} | |
const hostname = headers.get("Host") ?? request.domainName; | |
const proto = headers.get("X-Forwarded-Proto") ?? "http"; | |
const url = new URL(`${proto}://${hostname}/${request.stage}`); | |
return new Request(url.toString(), { | |
headers, | |
body: formatBody(event.body, event.isBase64Encoded), | |
}); | |
} | |
function formatUnknownEvent(event: unknown): Request { | |
return new Request("https://lambda/", { | |
method: "POST", | |
body: JSON.stringify(event), | |
headers: { | |
"Content-Type": "application/json;charset=utf-8", | |
}, | |
}); | |
} | |
function formatRequest(input: LambdaRequest): Request | undefined { | |
const { event, requestId, traceId, functionArn, deadlineMs } = input; | |
let request: Request; | |
if (isHttpEventV2(event)) { | |
request = formatHttpEventV2(event); | |
} else if (isHttpEventV1(event)) { | |
request = formatHttpEventV1(event); | |
} else if (isWebSocketEvent(event)) { | |
if (!isWebSocketUpgrade(event)) { | |
return undefined; | |
} | |
request = formatWebSocketUpgrade(event); | |
} else { | |
request = formatUnknownEvent(input); | |
} | |
request.headers.set("x-amzn-requestid", requestId); | |
request.headers.set("x-amzn-trace-id", traceId); | |
request.headers.set("x-amzn-function-arn", functionArn); | |
if (deadlineMs !== null) { | |
request.headers.set("x-amzn-deadline-ms", `${deadlineMs}`); | |
} | |
// @ts-ignore: Attach the original event to the Request | |
request.aws = event; | |
return request; | |
} | |
class LambdaServer implements Server { | |
#lambda: Lambda; | |
#webSockets: Map<string, LambdaWebSocket>; | |
#upgrade: Response | null; | |
pendingRequests: number; | |
pendingWebSockets: number; | |
port: number; | |
hostname: string; | |
development: boolean; | |
constructor(lambda: Lambda) { | |
this.#lambda = lambda; | |
this.#webSockets = new Map(); | |
this.#upgrade = null; | |
this.pendingRequests = 0; | |
this.pendingWebSockets = 0; | |
this.port = 80; | |
this.hostname = "lambda"; | |
this.development = false; | |
} | |
async accept(request: LambdaRequest): Promise<unknown> { | |
const deadlineMs = request.deadlineMs === null ? Date.now() + 60_000 : request.deadlineMs; | |
const durationMs = Math.max(1, deadlineMs - Date.now()); | |
let response: unknown; | |
try { | |
response = await Promise.race([ | |
new Promise<undefined>(resolve => setTimeout(resolve, durationMs)), | |
this.#acceptRequest(request), | |
]); | |
} catch (cause) { | |
await sendError("RequestError", cause); | |
return; | |
} | |
if (response === undefined) { | |
await sendError("TimeoutError", "Function timed out"); | |
return; | |
} | |
return response; | |
} | |
async #acceptRequest(event: LambdaRequest): Promise<unknown> { | |
const request = formatRequest(event); | |
let response: Response | undefined; | |
if (request === undefined) { | |
await this.#acceptWebSocket(event.event); | |
} else { | |
response = await this.fetch(request); | |
if (response.status === 101) { | |
await this.#acceptWebSocket(event.event); | |
} | |
} | |
if (response === undefined) { | |
return { | |
statusCode: 200, | |
}; | |
} | |
if (!request?.headers.has("Host")) { | |
return response.text(); | |
} | |
return formatResponse(response); | |
} | |
async #acceptWebSocket(event: WebSocketEvent): Promise<void> { | |
const request = event.requestContext; | |
const { connectionId, eventType } = request; | |
const webSocket = this.#webSockets.get(connectionId); | |
if (webSocket === undefined || this.#lambda.websocket === undefined) { | |
return; | |
} | |
const { open, message, close } = this.#lambda.websocket; | |
switch (eventType) { | |
case "CONNECT": { | |
if (open) { | |
await open(webSocket); | |
} | |
break; | |
} | |
case "MESSAGE": { | |
if (message) { | |
const body = formatBody(event.body, event.isBase64Encoded); | |
if (body !== null) { | |
await message(webSocket, body); | |
} | |
} | |
break; | |
} | |
case "DISCONNECT": { | |
try { | |
if (close) { | |
const { disconnectStatusCode: code, disconnectReason: reason } = request; | |
await close(webSocket, code, reason); | |
} | |
} finally { | |
this.#webSockets.delete(connectionId); | |
this.pendingWebSockets--; | |
} | |
break; | |
} | |
} | |
} | |
stop(): void { | |
exit("Runtime exited because Server.stop() was called"); | |
} | |
reload(options: any): void { | |
this.#lambda = { | |
fetch: options.fetch ?? this.#lambda.fetch, | |
error: options.error ?? this.#lambda.error, | |
websocket: options.websocket ?? this.#lambda.websocket, | |
}; | |
this.port = | |
typeof options.port === "number" | |
? options.port | |
: typeof options.port === "string" | |
? parseInt(options.port) | |
: this.port; | |
this.hostname = options.hostname ?? this.hostname; | |
this.development = options.development ?? this.development; | |
} | |
async fetch(request: Request): Promise<Response> { | |
this.pendingRequests++; | |
try { | |
let response = await this.#lambda.fetch(request, this); | |
if (response instanceof Response) { | |
return response; | |
} | |
if (response === undefined && this.#upgrade !== null) { | |
return this.#upgrade; | |
} | |
throw new Error("fetch() did not return a Response"); | |
} catch (cause) { | |
console.error(cause); | |
if (this.#lambda.error !== undefined) { | |
try { | |
return await this.#lambda.error(cause); | |
} catch (cause) { | |
console.error(cause); | |
} | |
} | |
return new Response(null, { status: 500 }); | |
} finally { | |
this.pendingRequests--; | |
this.#upgrade = null; | |
} | |
} | |
} | |
const lambda = await init(); | |
const server = new LambdaServer(lambda); | |
while (true) { | |
try { | |
const request = await receiveRequest(); | |
const response = await server.accept(request); | |
if (response !== undefined) { | |
await sendResponse(response); | |
} | |
} finally { | |
reset(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment