Created
June 26, 2022 05:18
-
-
Save DanaEpp/02a4f11566fdcd36cb36395dec2ca67f to your computer and use it in GitHub Desktop.
TryHackMe (THM) dump script to find rooms with open tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/env python3 | |
import getpass | |
import time | |
import requests | |
from requests.cookies import create_cookie | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
import re | |
from typing import List | |
####################################################################################### | |
# | |
# Shout out to @szymex73 for his thmapi module. Although its currently broken, | |
# it gave me some clarity on interrogating the THM API. | |
# | |
####################################################################################### | |
_MAXROOMS = 600 | |
_APIROOT = 'https://tryhackme.com' | |
_csrf_input_regex = re.compile('<input type="hidden" name="_csrf" value="(.{36})">') | |
_csrf_script_regex = re.compile("const csrfToken[ ]{0,1}=[ ]{0,1}[\"|'](.{36})[\"|']") | |
class THM: | |
def __init__(self, creds=None): | |
""" | |
Initialize THM API wrapper | |
""" | |
# We need to use a retry strategy to get around the throttling THM has in | |
# their CloudFlare rules. The incrimental backoff is an added precaution. | |
retry_strategy = Retry( | |
total = 5, | |
backoff_factor = 1, | |
status_forcelist = [429, 500, 503, 504] | |
) | |
self.session = requests.Session() | |
self.session.mount( "https://", HTTPAdapter(max_retries=retry_strategy)) | |
self.authenticated = False | |
if creds is not None: | |
self.login(creds) | |
def login( self, con_sid: str ) -> None: | |
""" | |
Setup login session using connect.sid cookie | |
""" | |
print( "[+] Preparing to login to TryHackMe API" ) | |
cookie = create_cookie('connect.sid', con_sid, domain='tryhackme.com') | |
self.session.cookies.set_cookie(cookie) | |
# Test if the connection token is valid | |
try: | |
r = self.session.get(f'https://tryhackme.com/message/unseen') | |
if r.status_code == 200 and 'application/json' in r.headers.get('Content-Type'): | |
try: | |
res = r.json() | |
if res['success']: | |
self.authenticated = True | |
print( "[+] Authenticated!" ) | |
return | |
else: | |
print( "[!] Failed to authenticate" ) | |
raise Exception( f'Failed to authenticate' ) | |
except: | |
print( "[!] Malformed response during auth." ) | |
raise Exception( "[!] Malformed response during auth." ) | |
else: | |
raise Exception( 'Invalid HTTP response. Valid connect.sid cookie?' ) | |
except Exception as err: | |
raise err | |
def fetch_csrf_token( self, path: str, pattern: str ) -> str: | |
""" | |
Fetches the required CSRF token before POST | |
""" | |
r = self.session.get( path ) | |
if pattern == 'csrf-input': | |
return _csrf_input_regex.search(str(r.content)).group(1) | |
elif pattern == 'csrf-script': | |
return _csrf_script_regex.search(str(r.content)).group(1) | |
else: | |
return "" | |
def http_get( self, path ): | |
h = { "Accept": "application/json" } | |
try: | |
r = self.session.get( f'{path}', headers=h) | |
if r.status_code == 200 and "application/json" in r.headers.get('Content-Type'): | |
data = r.json() | |
else: | |
data = None | |
return data | |
except Exception as err: | |
raise err | |
def http_post( self, path, post_data, header_data ): | |
try: | |
r = self.session.post( f'{path}', json=post_data, headers=header_data) | |
if r.status_code == 200 and "application/json" in r.headers.get('Content-Type'): | |
data = r.json() | |
else: | |
data = None | |
return data | |
except Exception as err: | |
raise err | |
def get_room_codes( self ) -> list: | |
room_codes = list() | |
# As THM API is weak in checking the limit param, let's take advantage of that | |
# and pull it all back in a single query | |
data = self.http_get(f'{_APIROOT}/api/hacktivities?limit={_MAXROOMS}') | |
if "rooms" in data: | |
rooms = data["rooms"] | |
for room in rooms: | |
room_codes.append(room["code"]) | |
return room_codes | |
def get_room_status(self, room_code: str ): | |
data = self.http_get(f'{_APIROOT}/api/room/details?codes={room_code}&loadUser=true') | |
if data is not None and room_code in data: | |
completed = data[room_code]['userCompleted'] | |
joined = data[room_code]['code'] | |
print( f'{room_code} - Joined: {joined}, Completed: {completed}') | |
else: | |
print(room_code) | |
def join_room(self, room_code: str) -> bool: | |
r = self.session.get( f'{_APIROOT}/jr/{room_code}' ) | |
if r.status_code == 302: | |
return True | |
else: | |
return False | |
def join_rooms(self, room_codes: list) -> None: | |
for room in room_codes: | |
print( f'[*] Attempting to join {room}...' ) | |
self.join_room(room) | |
def joined_rooms(self) -> list: | |
room_codes = list() | |
data = self.http_get( f'{_APIROOT}/api/my-rooms?limit={_MAXROOMS}' ) | |
if "rooms" in data: | |
rooms = data["rooms"] | |
for room in rooms: | |
room_codes.append(room["code"]) | |
return room_codes | |
def leave_room(self, room_code: str) -> None: | |
csrf_token = self.fetch_csrf_token( f'{_APIROOT}/room/{room_code}', 'csrf-script' ) | |
post_data = { | |
"_csrf": csrf_token, | |
"code": room_code | |
} | |
headers = { "Content-Type": "application/json" } | |
self.http_post( f'{_APIROOT}/api/room/leave', post_data, headers ) | |
def leave_rooms(self, room_codes: list) -> None: | |
for room in room_codes: | |
print( f'[*] Attempting to leave {room}...' ) | |
self.leave_room(room) | |
def get_open_tasks(self, room_code: str) -> int: | |
open_tasks = 0 | |
print( f'[*] Checking for open tasks in {room_code}' ) | |
resp = self.http_get( f'{_APIROOT}/api/tasks/{room_code}' ) | |
if resp is not None and "data" in resp: | |
totalTasks = int(resp["totalTasks"]) | |
for i in range(totalTasks): | |
taskList = resp["data"][i]["tasksInfo"] | |
for c in range(len(taskList)): | |
if taskList[c]["correct"] == False: | |
open_tasks += 1 | |
return open_tasks | |
def main(): | |
con_sid = input( "connect.sid: " ) | |
thm = THM(con_sid) | |
room_codes = thm.get_room_codes() | |
print( f"[+] Found {len(room_codes)} room codes" ) | |
joined_rooms = thm.joined_rooms() | |
print( f"[+] Found {len(joined_rooms)} rooms are currently joined" ) | |
rooms_to_join = [i for i in room_codes if i not in joined_rooms] | |
print( f"[+] Found {len(rooms_to_join)} rooms that can still be joined" ) | |
thm.join_rooms(rooms_to_join) | |
joined_rooms = thm.joined_rooms() | |
print( f"[+] Found {len(joined_rooms)} rooms are now joined" ) | |
rooms_with_open_tasks = list() | |
for room in joined_rooms: | |
open_tasks = thm.get_open_tasks(room) | |
if open_tasks > 0: | |
rooms_with_open_tasks.append( (room, open_tasks) ) | |
print( f"[+] Found {len(rooms_with_open_tasks)} rooms that have open tasks" ) | |
for room, task_cnt in rooms_with_open_tasks: | |
print( f'\t{room} :\tOpen Tasks: {task_cnt}' ) | |
thm.leave_rooms(joined_rooms) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment