Skip to content

Instantly share code, notes, and snippets.

@carlHandy
Last active November 23, 2024 00:26
Show Gist options
  • Save carlHandy/bfd96a7bcfeb8abac008fac4f613e094 to your computer and use it in GitHub Desktop.
Save carlHandy/bfd96a7bcfeb8abac008fac4f613e094 to your computer and use it in GitHub Desktop.
RTSP to Webrtc with GStreamer
import { WebSocketServer, WebSocketClient } from "https://deno.land/x/[email protected]/mod.ts";
import { load } from "https://deno.land/[email protected]/dotenv/mod.ts";
// Load and validate environment variables
const env = await load();
const PORT = parseInt(env.PORT || "8000", 10);
const WS_PORT = parseInt(env.WS_PORT || "8080", 10);
const ALLOWED_ORIGINS = env.ALLOWED_ORIGINS || "*";
const STUN_SERVER = env.STUN_SERVER || "stun:stun.l.google.com:19302";
const iceConfig = {
iceServers: [
{
urls: STUN_SERVER
}
]
};
if (isNaN(PORT) || isNaN(WS_PORT)) {
console.error("Invalid PORT or WS_PORT values in environment variables.");
Deno.exit(1);
}
// Structured logging
const logger = {
info: (message: string, meta = {}) => {
console.log(JSON.stringify({ level: "info", message, timestamp: new Date().toISOString(), ...meta }));
},
error: (message: string, meta = {}) => {
console.error(JSON.stringify({ level: "error", message, timestamp: new Date().toISOString(), ...meta }));
},
};
// Map for active connections and processes
const activeConnections = new Map<WebSocketClient, string>();
const activeProcesses = new Map<string, Deno.ChildProcess>();
// Health check endpoint
const healthCheck = () =>
new Response(
JSON.stringify({
status: "healthy",
connections: activeConnections.size,
processes: activeProcesses.size,
}),
{ headers: { "Content-Type": "application/json" } },
);
// HTTP server handler
const handler = async (request: Request): Promise<Response> => {
// Handle CORS preflight
if (request.method === "OPTIONS") {
return new Response(null, {
headers: {
"Access-Control-Allow-Origin": ALLOWED_ORIGINS,
"Access-Control-Allow-Methods": "POST, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type",
},
});
}
// Health check endpoint
if (request.url.endsWith("/health")) {
return healthCheck();
}
// Handle RTSP stream requests
if (request.method === "POST" && request.url.endsWith("/stream")) {
try {
const body = await request.json();
const rtspUrl: string = body.rtspUrl;
if (!rtspUrl || !/^rtsp:\/\/(?:[\w.-]+(?::\w+)?@)?[\w.-]+(:\d+)?(?:\/[\w.-]*)*\/?$/.test(rtspUrl)) {
return new Response("Invalid RTSP URL", { status: 400 });
}
return new Response(
JSON.stringify({
message: `WebSocket signaling server available at ws://localhost:${WS_PORT}`,
}),
{
status: 200,
headers: {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": ALLOWED_ORIGINS,
},
},
);
} catch (error) {
const errMessage = error instanceof Error ? error.message : String(error);
logger.error("Error processing stream request", { error: errMessage });
return new Response("Internal Server Error", { status: 500 });
}
}
return new Response("Not Found", { status: 404 });
};
// Start HTTP server
Deno.serve({ port: PORT }, handler);
logger.info(`HTTP server running on http://localhost:${PORT}`);
// Start WebSocket server
const wss = new WebSocketServer(WS_PORT);
logger.info(`WebSocket server running on ws://localhost:${WS_PORT}`);
// WebSocket connection handler
wss.on("connection", (ws: WebSocketClient) => {
const connectionId = crypto.randomUUID();
activeConnections.set(ws, connectionId);
logger.info("New WebSocket connection", { connectionId });
ws.on("message", (message) => {
try {
const data = JSON.parse(message);
switch (data.type) {
case "offer":
logger.info("Received SDP offer", { connectionId });
// Send back ICE config with the answer
ws.send(JSON.stringify({
type: "config",
iceConfig
}));
// Echo back the offer for now (in a real implementation,
// this would be processed by your media server)
ws.send(JSON.stringify({
type: "answer",
sdp: data.sdp
}));
break;
case "ice-candidate":
logger.info("Received ICE candidate", { connectionId });
// Relay ICE candidates to peers
ws.send(JSON.stringify({
type: "ice-candidate",
candidate: data.candidate
}));
break;
default:
logger.info("Unknown message type", { connectionId, type: data.type });
}
} catch (error) {
const errMessage = error instanceof Error ? error.message : String(error);
logger.error("Error processing WebSocket message", { connectionId, error: errMessage });
}
});
ws.on("close", () => {
const connectionId = activeConnections.get(ws);
activeConnections.delete(ws);
logger.info("WebSocket connection closed", { connectionId });
});
ws.on("error", (error) => {
const errMessage = error instanceof Error ? error.message : String(error);
logger.error("WebSocket error", { connectionId: activeConnections.get(ws), error: errMessage });
});
});
// Graceful shutdown
Deno.addSignalListener("SIGTERM", () => {
logger.info("Received SIGTERM, cleaning up...");
activeProcesses.forEach((process) => process.kill("SIGTERM"));
wss.close();
Deno.exit(0);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment