Last active
January 15, 2024 04:37
-
-
Save luopio/cce6b813ecb5b64bfe526ee266e185cc to your computer and use it in GitHub Desktop.
Listen for incoming webhooks and other requests to run shell commands. No external dependencies other than python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 -u | |
#coding=utf-8 | |
""" | |
Listens for incoming requests on given port (default 8080). Acts on POST requests | |
by looking for a matching path in the config file (default webhooker.config). | |
Example config: | |
deploy_my_cool_project: /home/dude/deploy_the_stuff.sh | |
To invoke deploy_the_stuff.sh: | |
$ curl -X POST 127.0.0.1:8080/deploy_my_cool_project | |
For some simple access restriction you can set the --key argument which | |
is then expected to match the POST body for incoming requests to work. | |
Also it might be a good idea to use SSL with --key and --cert params. If you | |
don't have a private key and cert, you can use openssl to generate one: | |
$ openssl req -new -x509 -keyout server.pem -out server.pem -days 365 -nodes | |
Finally you can provide the command some arguments with GET-parameters. Running | |
the above config with | |
$ curl -X POST "127.0.0.1:8080/deploy_my_cool_project?ref=xyz123" | |
will call "deploy_the_stuff.sh ref xyz123". You can also use parameters w/o values, | |
e.g. "?foo&faa&baz" | |
""" | |
import subprocess | |
from http.server import BaseHTTPRequestHandler, HTTPServer | |
from urllib.parse import urlparse | |
VERSION="v004" | |
COMMANDS_FILE = None | |
SECRET_KEY = None | |
class S(BaseHTTPRequestHandler): | |
def _set_headers(self): | |
self.send_response(200) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
def do_GET(self): | |
self._set_headers() | |
self.wfile.write("I don't GET it".encode("utf-8")) | |
def do_HEAD(self): | |
self._set_headers() | |
def do_POST(self): | |
global SECRET_KEY | |
if SECRET_KEY: | |
if not self._check_key(): | |
self.send_response(403) | |
self.send_header('Content-type', 'text/plain') | |
self.end_headers() | |
self.wfile.write("Ken sent you?".encode('utf-8')) | |
return | |
self._set_headers() | |
cmd = self.path[1:].split('?')[0] | |
query = urlparse(self.path).query | |
params = [qc.split("=") for qc in query.split("&") if qc] | |
self.wfile.write("On the job".encode("utf-8")) | |
self.wfile.write(("\nCommand: %s" % cmd).encode("utf-8")) | |
code, output = self.handle_cmd(cmd, params) | |
self.wfile.write(("\nResult code: %s, output: %s" % (code, output)).encode("utf-8")) | |
def _check_key(self): | |
global SECRET_KEY | |
content_len = int(self.headers.getheader('content-length', 0)) | |
key = self.rfile.read(min(content_len, 48000)) | |
return SECRET_KEY == key.strip() | |
def handle_cmd(self, cmd, params=[]): | |
commands = read_commands(COMMANDS_FILE) | |
if cmd in commands: | |
command = commands[cmd].split(' ') | |
for p_pair in params: | |
for p in p_pair: | |
command.append(p) | |
print(">> Running the command: %s" % command) | |
try: | |
return 0, subprocess.check_output(command, shell=False) | |
except subprocess.CalledProcessError as e: | |
return e.returncode, e.output | |
else: | |
return -1, "!! Unknown command" | |
def read_commands(cmd_file): | |
with open(cmd_file, 'rt') as fh: | |
commands = dict([[x.strip() for x in line.split(':')] for line in fh.readlines()]) | |
return commands | |
def run(port=8080, host="", command_file='webhooker.config', ssl_key=None, ssl_cert=None): | |
global COMMANDS_FILE | |
server_address = (host, port) | |
print("Starting server in %s:%s" % server_address) | |
COMMANDS_FILE = command_file | |
httpd = HTTPServer(server_address, S) | |
if ssl_cert: | |
import ssl | |
httpd.socket = ssl.wrap_socket(httpd.socket, certfile=ssl_cert, keyfile=ssl_key, server_side=True) | |
print('Listening...') | |
httpd.serve_forever() | |
if __name__ == "__main__": | |
import argparse | |
parser = argparse.ArgumentParser(description=__doc__) | |
parser.add_argument('--host', dest='host', default='', | |
help='Custom host instead of 0.0.0.0') | |
parser.add_argument('--port', dest='port', default=8080, type=int, | |
help='Custom port instead of 8080') | |
parser.add_argument('--config', dest='config_file', default='webhooker.config', | |
help='Read config from this file instead of ./webhooker.config') | |
parser.add_argument('--version', action='version', version='%s' % VERSION) | |
parser.add_argument('--ssl-key', dest='ssl_key', default=None, | |
help='Path to an SSL key that is used for encryption') | |
parser.add_argument('--ssl-cert', dest='ssl_cert', default=None, | |
help='Path to an SSL certificate that is used for authentication') | |
parser.add_argument('--key', dest='key', default='', | |
help='Set a secret key that needs to match to POST body for requests to work') | |
args = parser.parse_args() | |
if args.key: | |
SECRET_KEY = args.key | |
run(port=args.port, host=args.host, command_file=args.config_file, | |
ssl_key=args.ssl_key, ssl_cert=args.ssl_cert) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment