|
from time import localtime, strftime |
|
from time import sleep |
|
import requests |
|
import getpass |
|
import re |
|
from urllib.parse import unquote |
|
|
|
LOG_FILE = "log.csv" |
|
WORK_FILE = "reset.csv" |
|
DONE_FILE = "done.csv" |
|
|
|
REQUEST_SLEEP_TIME = 1.0 |
|
|
|
PROXY = "proxy.det.nsw.edu.au" |
|
PROXY_PORT = "8080" |
|
|
|
EMU_PROTOCOL = "https" |
|
EMU_DOMAIN = "online.det.nsw.edu.au" |
|
EMU_LANDING_PATH = "/emu" |
|
EMU_LIST_SCHOOLS_PATH = "/emu/?ssosource=login" |
|
EMU_SET_SCHOOL_PATH = "/emu/student/selectLocation" |
|
EMU_USER_INFO_PATH = "/emu/{{SCHOOL_CODE}}/students/{{USERNAME}}" |
|
EMU_PW_RESET_PATH = "/emu/{{SCHOOL_CODE}}/students/{{USERNAME}}/password" |
|
|
|
SCRAPE_RESET_SUCCESS = "Password reset request has been submitted for the user" |
|
SCRAPE_UNAUTHORISED = "Unauthorized!" |
|
SCRAPE_USER_NOT_FOUND = "No details found for this user" |
|
|
|
det_un = input("Enter DET username: ") |
|
det_pw = getpass.getpass("Enter DET password: ") |
|
|
|
PROXIES = { |
|
"https": "http://%s:%s@%s:%s" % (det_un, det_pw, PROXY, PROXY_PORT), |
|
"http": "http://%s:%s@%s:%s" % (det_un, det_pw, PROXY, PROXY_PORT) |
|
} |
|
|
|
HTTP_HEADERS = { |
|
"Accept-Language": "en-GB,en;q=0.5", |
|
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; x64; rv:44.0) Gecko/20100101 Firefox/44.0", |
|
"Connection": "keep-alive" |
|
} |
|
|
|
def logwrite(username, operation, result, comment=""): |
|
"""Write an entry into the log file with timestamp""" |
|
|
|
global log |
|
log.write("%s,%s,%s,%s,%s\n" % (strftime("%Y-%m-%d %H:%M:%S", localtime()), username, operation, result, comment)) |
|
log.flush() |
|
|
|
def getjobs(): |
|
"""Get all incomplete jobs |
|
|
|
Parse two colum CSV files in the form: username,password |
|
""" |
|
|
|
jobs = {} |
|
jobs_done = {} |
|
|
|
# Check for jobs that are already complete |
|
try: |
|
with open(DONE_FILE, 'rU') as dfile: |
|
for line in dfile: |
|
un, pw = line.rstrip().split(',') |
|
jobs_done[un] = pw |
|
except FileNotFoundError: |
|
pass |
|
|
|
# Get jobs and ignore completed jobs |
|
try: |
|
with open(WORK_FILE, 'rU') as jfile: |
|
for line in jfile: |
|
un, pw = line.rstrip().split(',') |
|
if un not in jobs_done: |
|
jobs[un] = pw |
|
except FileNotFoundError: |
|
print("Jobs file not found") |
|
return None |
|
|
|
return jobs |
|
|
|
def detauth(session): |
|
"""Authenticate with DET SSO server""" |
|
|
|
response = session.get( |
|
"%s://%s%s" % (EMU_PROTOCOL, EMU_DOMAIN, EMU_LANDING_PATH), |
|
verify=True, |
|
proxies=PROXIES) |
|
|
|
formurl = unquote(re.findall(r"name='loginform'.+action='(.+)'", response.text)[0]) |
|
ssocreds = re.findall(r'name="ssoDetails" value="([^"]+)"', response.text)[0] |
|
|
|
input("Press ENTER to authenticate to the following URL with your DET details:\n%s\n" % formurl) |
|
|
|
response = session.post( |
|
formurl, |
|
data={"ssoDetails": ssocreds, |
|
"site2pstoretoken": "", |
|
"ssousername": det_un, |
|
"password": det_pw, |
|
"IDToken1": det_un, |
|
"IDToken2": det_pw |
|
}, |
|
verify=True, |
|
proxies=PROXIES) |
|
|
|
def emu_setschool(session): |
|
"""Select the school to operate on. |
|
|
|
If only one school is available, it is selected authmatically. |
|
Else a list of schools is printed and a prompt shown to enter a school code. |
|
|
|
Returns the school code as a string |
|
""" |
|
|
|
response = session.get( |
|
"%s://%s%s" % (EMU_PROTOCOL, EMU_DOMAIN, EMU_LIST_SCHOOLS_PATH), |
|
verify=True, |
|
proxies=PROXIES) |
|
|
|
schools = re.findall(r'<option value="([0-9]+)">([^<>]+)</option>', response.text) |
|
|
|
if len(schools) == 1: |
|
school = schools[0][0] |
|
else: |
|
print("Multiple schools available:") |
|
for s in schools: |
|
print(" %s: %s" % s) |
|
|
|
school = input("\nEnter the code of the school you wish to use: ") |
|
|
|
response = session.post( |
|
"%s://%s%s" % (EMU_PROTOCOL, EMU_DOMAIN, EMU_SET_SCHOOL_PATH), |
|
data={"locationId": school, |
|
"select": "Select" |
|
}, |
|
verify=True, |
|
proxies=PROXIES) |
|
|
|
if "Unauthorized" not in response.text: |
|
print("Using school code: %s" % school) |
|
return school |
|
else: |
|
print("Error setting school: %s" % school) |
|
exit(1) |
|
|
|
def emu_getuserinfo(session, username): |
|
"""Scrape and return user details as a list of tuples""" |
|
|
|
response = session.get( |
|
"%s://%s%s" % (EMU_PROTOCOL, EMU_DOMAIN, EMU_USER_INFO_PATH.replace("{{SCHOOL_CODE}}", school).replace("{{USERNAME}}", username)), |
|
verify=True, |
|
proxies=PROXIES) |
|
|
|
if SCRAPE_USER_NOT_FOUND in response.text: |
|
# Username does not exist |
|
return None |
|
else: |
|
info = re.findall(r'<label>([^<]+) :</label>\s*<p class="text">([^<]+)</p>', response.text, re.MULTILINE) |
|
info.append(("Group Memberships", re.findall(r'<tr>\s*<td>([^<]+)</td>\s*<td>(CN=[^<]+)</td>\s*</tr>', response.text, re.MULTILINE))) |
|
return info |
|
|
|
def emu_userinfo(session, username): |
|
"""Parse user details into a dictionary""" |
|
scrapeinfo = emu_getuserinfo(session, username) |
|
|
|
if scrapeinfo == None: |
|
return None |
|
|
|
info = {} |
|
for item in scrapeinfo: |
|
info[item[0]] = item[1] |
|
return info |
|
|
|
def emu_reset(session, username, password): |
|
"""Reset an account password by username |
|
|
|
Returns True if successful |
|
""" |
|
|
|
response = session.post( |
|
"%s://%s%s" % (EMU_PROTOCOL, EMU_DOMAIN, EMU_PW_RESET_PATH.replace("{{SCHOOL_CODE}}", school).replace("{{USERNAME}}", username)), |
|
data={"password": password, |
|
"confirmPwd": password, |
|
"reset": "Reset" |
|
}, |
|
verify=True, |
|
proxies=PROXIES) |
|
|
|
# Check if password was reset successfully |
|
if SCRAPE_RESET_SUCCESS in response.text: |
|
return True |
|
else: |
|
return False |
|
|
|
log = open(LOG_FILE, "a") |
|
done = open(DONE_FILE, "a") |
|
jobs = getjobs() |
|
|
|
if jobs: |
|
print("\n%d jobs todo\n" % len(jobs)) |
|
|
|
# Create a requests session to store cookie information between requests |
|
session = requests.Session() |
|
session.headers.update(HTTP_HEADERS) |
|
|
|
detauth(session) |
|
school = emu_setschool(session) |
|
|
|
input("\nPress ENTER to start jobs...") |
|
|
|
for job in jobs: |
|
print("Resetting: '%s' to '%s'" % (job, jobs[job])) |
|
|
|
if emu_reset(session, job, jobs[job]): |
|
logwrite(job, "reset", "OK") |
|
print("OK\n") |
|
|
|
# Mark as done |
|
done.write("%s,%s\n" % (job, jobs[job])) |
|
done.flush() |
|
else: |
|
userinfo = emu_userinfo(session, job) |
|
|
|
if userinfo == None: |
|
comment = SCRAPE_USER_NOT_FOUND |
|
else: |
|
comment = "Student not allocated to this school" |
|
|
|
logwrite(job, "reset", "FAIL", comment) |
|
print("FAIL: %s\n" % (comment)) |
|
|
|
# Slow down requests in case of rate limiting |
|
sleep(REQUEST_SLEEP_TIME) |
|
else: |
|
print("No jobs todo or all jobs complete") |
|
|
|
done.close() |
|
log.close() |
|
|
|
input("\nPress ENTER to close...") |