Skip to content

Instantly share code, notes, and snippets.

@gary23w
Created November 22, 2024 15:52
Show Gist options
  • Select an option

  • Save gary23w/af0addce4f84a6bdf3e3f34218ae715b to your computer and use it in GitHub Desktop.

Select an option

Save gary23w/af0addce4f84a6bdf3e3f34218ae715b to your computer and use it in GitHub Desktop.
python simple webserver with php and hot reload
# installs php-cgi . arch linux goes oom . spends 1702 hours fixing arch linux
# I like working in php sometimes too
import socket
import threading
import logging
import os
import subprocess
import re
import time
import select
import hashlib
import base64
from urllib.parse import unquote, parse_qs
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
logging.basicConfig(level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[logging.FileHandler('server.gary23w'),
logging.StreamHandler()])
HOST = '0.0.0.0'
PORT = 8080
WS_PORT = 8765
MAX_REQUEST_SIZE = 8192
WEB_DIR = 'web'
SERVER_NAME = 'Simple Web Server with PHP Support and Hot Reload'
PHP_EXECUTABLE = '/usr/bin/php-cgi'
HOT_RELOAD = True
ws_clients = []
ws_clients_lock = threading.Lock()
file_change_event = threading.Event()
class ReloadEventHandler(FileSystemEventHandler):
def on_any_event(self, event):
file_change_event.set()
logging.info(f"Detected change in {event.src_path}")
def start_file_watcher():
event_handler = ReloadEventHandler()
observer = Observer()
observer.schedule(event_handler, path=WEB_DIR, recursive=True)
observer.start()
logging.info("Started file watcher for hot reload")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def start_websocket_server():
ws_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
ws_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ws_server_socket.bind((HOST, WS_PORT))
ws_server_socket.listen(5)
logging.info(f"Garys WebSocket server started on {HOST}:{WS_PORT}")
while True:
client_socket, client_address = ws_server_socket.accept()
threading.Thread(target=handle_ws_client, args=(client_socket, client_address), daemon=True).start()
def handle_ws_client(client_socket, client_address):
try:
request = client_socket.recv(1024).decode('utf-8')
headers = parse_ws_headers(request)
if 'Sec-WebSocket-Key' not in headers:
client_socket.close()
return
accept_key = generate_ws_accept_key(headers['Sec-WebSocket-Key'])
response = (
'HTTP/1.1 101 Switching Protocols\r\n'
'Upgrade: websocket\r\n'
'Connection: Upgrade\r\n'
f'Sec-WebSocket-Accept: {accept_key}\r\n\r\n'
)
client_socket.send(response.encode('utf-8'))
with ws_clients_lock:
ws_clients.append(client_socket)
logging.info(f"Garys WebSocket client connected: {client_address}")
while True:
time.sleep(1)
except Exception as e:
logging.exception(f"Garys WebSocket error with {client_address}: {e}")
finally:
with ws_clients_lock:
if client_socket in ws_clients:
ws_clients.remove(client_socket)
client_socket.close()
def parse_ws_headers(request):
headers = {}
for line in request.split('\r\n'):
if ': ' in line:
key, value = line.split(': ', 1)
headers[key] = value
return headers
def generate_ws_accept_key(key):
GUID = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
accept = key + GUID
sha1 = hashlib.sha1(accept.encode('utf-8')).digest()
accept_key = base64.b64encode(sha1).decode('utf-8')
return accept_key
def send_reload_signal():
message = b'\x81\x02' + b'OK'
with ws_clients_lock:
for client_socket in ws_clients:
try:
client_socket.send(message)
logging.info(f"Sent reload signal to Garys WebSocket client {client_socket.getpeername()}")
except Exception as e:
logging.exception(f"Error sending reload signal: {e}")
ws_clients.remove(client_socket)
client_socket.close()
def monitor_file_changes():
file_change_event.clear()
while True:
file_change_event.wait()
if HOT_RELOAD:
send_reload_signal()
file_change_event.clear()
def handle_client(client_socket, client_address):
try:
client_socket.settimeout(10)
request = b""
while True:
chunk = client_socket.recv(2048)
if not chunk:
break
request += chunk
if b'\r\n\r\n' in request:
break
if len(request) > MAX_REQUEST_SIZE:
client_socket.sendall(b"HTTP/1.1 413 Payload Too Large\r\n\r\n")
logging.warning(f"Payload too large from {client_address}")
return
if not request:
return
try:
request_line, headers_alone = request.decode('iso-8859-1').split('\r\n', 1)
except UnicodeDecodeError:
client_socket.sendall(b"HTTP/1.1 400 Bad Request\r\n\r\n")
logging.error(f"Bad request encoding from {client_address}")
return
try:
method, endpoint, http_version = request_line.split()
except ValueError:
client_socket.sendall(b"HTTP/1.1 400 Bad Request\r\n\r\n")
logging.error(f"Bad request line from {client_address}: {request_line}")
return
headers = {}
headers_part, _, body = headers_alone.partition('\r\n\r\n')
for header_line in headers_part.split('\r\n'):
if ': ' in header_line:
key, value = header_line.split(': ', 1)
headers[key.lower()] = value
content_length = int(headers.get('content-length', '0'))
if content_length > MAX_REQUEST_SIZE:
client_socket.sendall(b"HTTP/1.1 413 Payload Too Large\r\n\r\n")
logging.warning(f"Payload too large from {client_address}")
return
while len(body.encode('iso-8859-1')) < content_length:
chunk = client_socket.recv(2048)
if not chunk:
break
body += chunk.decode('iso-8859-1')
if len(body) > MAX_REQUEST_SIZE:
client_socket.sendall(b"HTTP/1.1 413 Payload Too Large\r\n\r\n")
logging.warning(f"Payload too large from {client_address}")
return
response = handle_request(method, endpoint, headers, body, client_address)
client_socket.sendall(response)
except socket.timeout:
logging.info(f"Client socket timeout from {client_address}")
except Exception as e:
logging.exception(f"Error handling client {client_address}: {e}")
finally:
client_socket.close()
def handle_request(method, endpoint, headers, body, client_address):
if method in ['GET', 'POST']:
if endpoint == '/':
index_path_html = os.path.join(WEB_DIR, 'index.html')
index_path_php = os.path.join(WEB_DIR, 'index.php')
if os.path.exists(index_path_html):
with open(index_path_html, 'rb') as f:
content = f.read()
content = inject_hot_reload_script(content)
response = (b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/html\r\n"
b"\r\n" +
content)
logging.info(f"Served index.html to {client_address}")
return response
elif os.path.exists(index_path_php):
return execute_php_script(index_path_php, method, endpoint, headers, body, client_address)
else:
splash_screen = get_splash_screen()
splash_screen = inject_hot_reload_script(splash_screen.encode('utf-8'))
response = (b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/html\r\n"
b"\r\n" +
splash_screen)
logging.info(f"Served splash screen to {client_address}")
return response
else:
safe_endpoint = os.path.normpath(unquote(endpoint)).lstrip(os.sep)
file_path = os.path.join(WEB_DIR, safe_endpoint)
if os.path.isfile(file_path):
ext = os.path.splitext(file_path)[1].lower()
if ext == '.php':
return execute_php_script(file_path, method, endpoint, headers, body, client_address)
else:
mime_type = get_mime_type(file_path)
with open(file_path, 'rb') as f:
content = f.read()
if mime_type == 'text/html':
content = inject_hot_reload_script(content)
response = (f"HTTP/1.1 200 OK\r\n"
f"Content-Type: {mime_type}\r\n"
f"\r\n").encode('utf-8') + content
logging.info(f"Served file {file_path} to {client_address}")
return response
else:
response_body = "<h1>404 Not Found</h1>"
response = (f"HTTP/1.1 404 Not Found\r\n"
f"Content-Type: text/html\r\n"
f"\r\n"
f"{response_body}").encode('utf-8')
logging.warning(f"File not found: {file_path} requested by {client_address}")
return response
else:
response = (b"HTTP/1.1 501 Not Implemented\r\n"
b"\r\n")
logging.error(f"Method Not Implemented: {method} from {client_address}")
return response
def execute_php_script(script_path, method, endpoint, headers, body, client_address):
try:
env = os.environ.copy()
env['REDIRECT_STATUS'] = '1'
env['REQUEST_METHOD'] = method
env['SCRIPT_FILENAME'] = os.path.abspath(script_path)
env['SCRIPT_NAME'] = endpoint.split('?', 1)[0]
env['QUERY_STRING'] = unquote(endpoint.partition('?')[2])
env['CONTENT_TYPE'] = headers.get('content-type', '')
env['CONTENT_LENGTH'] = headers.get('content-length', '0')
env['SERVER_SOFTWARE'] = SERVER_NAME
env['SERVER_PROTOCOL'] = 'HTTP/1.1'
env['REMOTE_ADDR'] = client_address[0]
env['REMOTE_PORT'] = str(client_address[1])
env['SERVER_NAME'] = HOST
env['SERVER_PORT'] = str(PORT)
env['DOCUMENT_ROOT'] = os.path.abspath(WEB_DIR)
env['GATEWAY_INTERFACE'] = 'CGI/1.1'
for header, value in headers.items():
header_name = 'HTTP_' + header.upper().replace('-', '_')
env[header_name] = value
php_process = subprocess.Popen(
[PHP_EXECUTABLE],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env
)
if method == 'POST':
stdin_data = body.encode('iso-8859-1')
else:
stdin_data = None
stdout_bytes, stderr_bytes = php_process.communicate(input=stdin_data)
logging.debug(f"PHP stdout bytes:\n{repr(stdout_bytes)}")
logging.debug(f"PHP stderr bytes:\n{repr(stderr_bytes)}")
if php_process.returncode != 0:
stderr_decoded = stderr_bytes.decode('iso-8859-1')
logging.error(f"PHP script error: {stderr_decoded}")
response_body = f"<h1>500 Internal Server Error</h1><pre>{stderr_decoded}</pre>"
response = (b"HTTP/1.1 500 Internal Server Error\r\n"
b"Content-Type: text/html\r\n"
b"\r\n" +
response_body.encode('utf-8'))
return response
if b'\r\n\r\n' in stdout_bytes:
header_bytes, response_body_bytes = stdout_bytes.split(b'\r\n\r\n', 1)
elif b'\n\n' in stdout_bytes:
header_bytes, response_body_bytes = stdout_bytes.split(b'\n\n', 1)
else:
header_bytes = b''
response_body_bytes = stdout_bytes
response_headers = header_bytes.replace(b'\n', b'\r\n')
content_type = 'text/html'
for header_line in response_headers.split(b'\r\n'):
if header_line.lower().startswith(b'content-type:'):
content_type = header_line.split(b':', 1)[1].strip().decode('utf-8')
break
if 'text/html' in content_type:
response_body_bytes = inject_hot_reload_script(response_body_bytes)
response = b'HTTP/1.1 200 OK\r\n' + response_headers + b'\r\n\r\n' + response_body_bytes
logging.info(f"Executed PHP script {script_path} for {client_address}")
return response
except Exception as e:
logging.exception(f"Error executing PHP script {script_path}: {e}")
response_body = f"<h1>500 Internal Server Error</h1><p>{str(e)}</p>"
response = (b"HTTP/1.1 500 Internal Server Error\r\n"
b"Content-Type: text/html\r\n"
b"\r\n" +
response_body.encode('utf-8'))
return response
def inject_hot_reload_script(content):
try:
content_str = content.decode('utf-8')
except UnicodeDecodeError:
content_str = content.decode('iso-8859-1')
script = '''
<script>
var ws_protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://';
var ws_port = 8765;
var ws = new WebSocket(ws_protocol + window.location.hostname + ':' + ws_port);
ws.onmessage = function(event) {
console.log('Reloading page')
if (event.data === 'OK') {
console.log('Reloading page')
window.location.reload();
}
};
ws.onclose = function() {
setTimeout(function() {
window.location.reload();
}, 1000);
};
</script>
'''
pattern = re.compile(r'(</body>)', re.IGNORECASE)
content_str, count = pattern.subn(script + r'\1', content_str)
if count == 0:
content_str += script
content = content_str.encode('utf-8')
return content
def get_splash_screen():
splash_screen = """
<!doctype html>
<html>
<head>
<title>Welcome to Simple Web Server with PHP Support and Hot Reload</title>
<style>
body { font-family: Arial, sans-serif; margin: 50px; text-align: center; }
h1 { color: #333; }
p { font-size: 18px; }
ul { list-style-type: none; padding: 0; }
li { margin: 10px 0; }
</style>
</head>
<body>
<h1>Welcome!</h1>
<p>This is your custom web server with PHP support and hot reload. Here's how you can use it:</p>
<ul>
<li>Place your website files in the <code>web</code> directory.</li>
<li>If an <code>index.html</code> or <code>index.php</code> file exists, it will be served by default.</li>
<li>You can access other files directly via the URL.</li>
<li>PHP scripts with <code>.php</code> extension will be executed.</li>
<li>Any changes in the <code>web</code> directory will trigger a page reload.</li>
</ul>
<p>Enjoy your web server!</p>
</body>
</html>
"""
return splash_screen
def get_mime_type(file_path):
ext = os.path.splitext(file_path)[1].lower()
if ext == '.html':
return 'text/html'
elif ext == '.css':
return 'text/css'
elif ext == '.js':
return 'application/javascript'
elif ext == '.png':
return 'image/png'
elif ext in ['.jpg', '.jpeg']:
return 'image/jpeg'
elif ext == '.gif':
return 'image/gif'
elif ext == '.txt':
return 'text/plain'
elif ext == '.php':
return 'text/html'
else:
return 'application/octet-stream'
def start_server():
if not os.path.isdir(WEB_DIR):
os.makedirs(WEB_DIR)
logging.info(f"Created web directory at {WEB_DIR}")
threading.Thread(target=start_file_watcher, daemon=True).start()
threading.Thread(target=start_websocket_server, daemon=True).start()
threading.Thread(target=monitor_file_changes, daemon=True).start()
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind((HOST, PORT))
server_socket.listen(100)
logging.info(f"{SERVER_NAME} started on {HOST}:{PORT}")
try:
while True:
client_socket, client_address = server_socket.accept()
logging.info(f"Accepted connection from {client_address}")
client_handler = threading.Thread(
target=handle_client,
args=(client_socket, client_address),
daemon=True
)
client_handler.start()
except KeyboardInterrupt:
logging.info("Server shutting down")
except Exception as e:
logging.exception(f"Error accepting connections: {e}")
finally:
server_socket.close()
if __name__ == '__main__':
start_server()
@builders-toronto
Copy link

This is actually extremely useful, thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment