Last active
March 6, 2021 01:20
-
-
Save xkortex/9cb6237085aefea8ddf66e6c3c74c656 to your computer and use it in GitHub Desktop.
A server which runs subprocesses in response to HTTP requests and dumps stdout/stderr to terminal
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
"""WORK IN PROGRESS!! USE AT YOUR OWN RISK!!""" | |
import os | |
import sys | |
from typing import Dict, List, Optional | |
from urllib.parse import urlparse, ParseResult | |
from functools import partial | |
import asyncio | |
from loguru import logger | |
import pydantic | |
class ArgsRequest(pydantic.BaseModel): | |
"""Structure representing a request""" | |
path: str # path is the path of the command to run | |
args: List[str] = [] | |
addr: str | |
message: Optional[str] = None | |
id: Optional[str] = None | |
bufsize: int = 4096 | |
shell: bool = False | |
cwd: Optional[str] = None | |
env: Dict[str, str] = {} | |
timeout: Optional[float] = None | |
class ArgsResponse(pydantic.BaseModel): | |
"""Structure representing a response to ArgsRequest""" | |
status: int | |
message: Optional[str] = None | |
id: Optional[str] = None | |
class CliArgs(ArgsRequest): | |
path: Optional[str] = None | |
verb: str | |
addr: str | |
addrs: List[str] = None | |
allowed_commands: List[str] = None | |
message: Optional[str] = None | |
class Config: | |
extra = pydantic.Extra.ignore | |
def arg_parser(): | |
import argparse | |
parser = argparse.ArgumentParser( | |
description="""Subprocess as-a-service. Very basic, only mirrors output to stdout/stderr""" | |
) | |
parser.add_argument( | |
"-a", | |
"--addr", | |
default='http://localhost:7701', | |
action="store", | |
type=str, | |
help="Main address to listen/talk on ", | |
) | |
parser.add_argument( | |
"-A", "--addrs", default=[], action="append", help="list multiple addresses " | |
) | |
subparsers = parser.add_subparsers(help="sub-commands") | |
parser_c = subparsers.add_parser("c", help="client") | |
parser_c.set_defaults(verb="client") | |
parser_c.add_argument( | |
"path", type=str, help="Path of the command to be issued" | |
) | |
parser_c.add_argument( | |
"args", nargs="*", type=str, default=[], help="Arguments to be passed to the command" | |
) | |
parser_c.add_argument( | |
"-m", | |
"--message", | |
default="Hello, world!", | |
action="store", | |
type=str, | |
help="message to send", | |
) | |
parser_s = subparsers.add_parser("s", help="server") | |
parser_s.set_defaults(verb="server") | |
parser_s.add_argument( | |
"allowed_commands", | |
nargs="*", | |
type=str, | |
help="Only run commands that match one of these filters", | |
) | |
return parser | |
def get_server_func(addr: str): | |
serve_pars: ParseResult = urlparse(addr) | |
if serve_pars.scheme == "unix": | |
return partial(asyncio.start_unix_server, path=serve_pars.path) | |
elif serve_pars.port and serve_pars.hostname: | |
return partial( | |
asyncio.start_server, host=serve_pars.hostname, port=serve_pars.port | |
) | |
else: | |
raise ValueError(f"cannot parse URI addr: {addr}") | |
def get_connection_func(addr): | |
pars = urlparse(addr) | |
logger.debug(pars) | |
if pars.scheme == "unix": | |
return partial(asyncio.open_unix_connection, path=pars.path) | |
elif pars.port and pars.hostname: | |
return partial(asyncio.open_connection, host=pars.hostname, port=pars.port) | |
else: | |
raise ValueError(f"cannot parse URI addr: {addr}") | |
async def tcp_subproc_client(args: ArgsRequest): | |
logger.info(args) | |
message = args.json() | |
addr = args.addr | |
open_connection = get_connection_func(addr) | |
reader, writer = await open_connection() | |
logger.info(f"Send: {message!r}") | |
writer.write(message.encode()) | |
await writer.drain() | |
data = await reader.read(args.bufsize) | |
response = data.decode() | |
ok = response == message | |
logger.info(f"Received {ok}: {data.decode()!r}") | |
logger.debug("Close the connection") | |
writer.close() | |
await writer.wait_closed() | |
class SubprocRunner(object): | |
def __init__(self, args: CliArgs): | |
self.args = args | |
async def handle_run_subproc(self, reader, writer): | |
data = await reader.read(self.args.bufsize) | |
payload = ArgsRequest.parse_raw(data) | |
try: | |
addr = writer.get_extra_info("peername") | |
except Exception as exc: | |
addr = "{}: {}".format(exc.__class__.__name__, exc) | |
logger.info(f"Received {payload!r} from {addr!r}") | |
if payload.path not in self.args.allowed_commands: | |
resp = ArgsResponse(status=405, message=f"'{payload.path}' is a forbidden command") | |
writer.write(resp.json().encode()) | |
await writer.drain() | |
writer.close() | |
return | |
# for simplicity, we are just gonna wait for the response | |
await self.run_subprocess(payload) | |
resp = ArgsResponse(status=200, message=f"'{payload.path}' success") | |
data = resp.json().encode() | |
logger.debug(f"Send: {data!r}") | |
writer.write(data) | |
await writer.drain() | |
logger.debug("Close the connection") | |
writer.close() | |
async def run_subprocess(self, cmd: ArgsRequest): | |
logger.info("start: \n{} {}".format(cmd, " ".join(cmd.args))) | |
process = await asyncio.create_subprocess_exec( | |
cmd.path, | |
*cmd.args, | |
) | |
return await process.wait() | |
async def spawn_server(addr, args: CliArgs): | |
start_server = get_server_func(addr) | |
runner = SubprocRunner(args) | |
server = await start_server(runner.handle_run_subproc) | |
myaddr = server.sockets[0].getsockname() | |
logger.info(f"Serving on {myaddr}") | |
async with server: | |
await server.serve_forever() | |
async def main_server(args: CliArgs): | |
addrs = args.addrs | |
if args.addr is not None: | |
addrs.append(args.addr) | |
loop = asyncio.get_event_loop() | |
futures = [loop.create_task(spawn_server(addr, args)) for addr in addrs] | |
return futures | |
def main(): | |
parser = arg_parser() | |
args = parser.parse_args() | |
args = CliArgs(**vars(args)) | |
logger.info(args) | |
verb = args.verb | |
if verb == "client": | |
c_args = ArgsRequest.parse_obj(args) | |
asyncio.run(tcp_subproc_client(c_args)) | |
return | |
loop = asyncio.get_event_loop() | |
if verb == "server": | |
future = loop.create_task(main_server(args)) | |
logger.debug(f"created {future}") | |
else: | |
raise ValueError(f"unknown command: {verb}") | |
loop.run_forever() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment