Last active
March 13, 2024 19:39
-
-
Save dieu/17ffbcace6024dcf5b75dbbee1961977 to your computer and use it in GitHub Desktop.
SleepHQ Uploader
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import binascii | |
from lxml.html import parse as parse_url | |
from parse import parse | |
import requests | |
from tqdm import tqdm | |
from os import path | |
import os | |
import time | |
class ezshare(): | |
def __init__(self, url="http://ezshare.card/dir?dir=A:", num_retries=5): | |
self.base = url | |
self.num_retries = num_retries | |
def is_dir(self, href): | |
r = parse("{}/download?file={name}", href) | |
return r == None | |
def is_file(self, href): | |
return not self.is_dir(href) | |
def _get(self, url): | |
#print(f"GET {url}") | |
return parse_url(url) | |
def ping(self): | |
try: | |
self._get(self.base) | |
return True | |
except: | |
return False | |
def listdir(self, dir, recursive=False, shift=0): | |
ret = {} | |
is_root = False | |
if dir=="/": | |
dir="" | |
is_root = True | |
dir=dir.replace("/","\\") | |
soup = self._get(f"{self.base}{dir}") | |
has_dotiles=False | |
for a in soup.xpath("//pre/a"): | |
href = a.get("href") | |
name = a.text.strip() | |
if name == "." or name == ".." or name == "ezshare.cfg": | |
has_dotiles=True | |
continue | |
if self.is_dir(href): | |
ret[name] = {} | |
if recursive: | |
dir_content = self.listdir(f"{dir}/{name}", recursive=recursive) | |
if dir_content is not None: | |
ret[name] = dir_content | |
else: | |
ret[name] = href | |
#No dotfiles or ezshare.cfg and is not root? This must not be a directory | |
if not has_dotiles and not is_root: | |
return None | |
return ret | |
def print_list(self, dirlist, shift=0): | |
shiftstr = " " * shift | |
for k,v in dirlist.items(): | |
if type(v) is dict: | |
print(f"{shiftstr} {k}/") | |
self.print_list(v, shift=shift+1) | |
else: | |
print(f"{shiftstr} {k}") | |
def stream_size(self, stream): | |
pos = stream.tell() | |
stream.seek(0,2) | |
ln = stream.tell() | |
stream.seek(pos) | |
return ln - pos | |
def _dload(self, link, file_name, crc): | |
with open(file_name, "ab") as f: | |
f.seek(0) | |
response = requests.head(link) | |
curlength = self.stream_size(f) | |
total_length = int(response.headers.get('content-length')) | |
if not crc and curlength == total_length: | |
print(f"Skipping file {file_name} (same size)") | |
return True | |
response = requests.get(link, stream=True) | |
if total_length is None: # no content length header | |
f.truncate() | |
f.write(response.content) | |
return True | |
elif crc and curlength == total_length: | |
f_crc32_hash = binascii.crc32(f.read()) | |
r_crc32_hash = binascii.crc32(response.content) | |
if f_crc32_hash == r_crc32_hash: | |
print(f"Skipping file {file_name} (same crc32 sum)") | |
return True | |
total_length = int(total_length) | |
with tqdm(desc=file_name, total=total_length, unit='B', unit_scale=True, unit_divisor=1024, miniters=1) as pbar: | |
f.truncate(0) | |
for data in response.iter_content(chunk_size=4096): | |
f.write(data) | |
pbar.update(len(data)) | |
return True | |
def _dload_with_retry(self, link, file_name, crc): | |
last_exception = None | |
for retries in range(self.num_retries): | |
try: | |
return self._dload(link, file_name, crc) | |
except (requests.exceptions.ConnectionError, requests.exceptions.ChunkedEncodingError) as err: | |
print(f"Retrying link: {link} ({retries} times)") | |
last_exception = err | |
if last_exception is not None: | |
print(f"Failed to download {link}: {last_exception}") | |
return False | |
def download(self, remote_file, local_file=None, recursive=False, crc=False): | |
if local_file == None: | |
local_file = path.basename(remote_file) | |
if local_file[-1]=="/": | |
os.makedirs(local_file, exist_ok=True) | |
if path.isdir(local_file): | |
local_file = path.join(local_file, path.basename(remote_file)) | |
remote_dir = path.dirname(remote_file) | |
basename = path.basename(remote_file) | |
link = self.listdir(remote_dir, recursive)[basename] | |
self._dload_with_retry(link, local_file, crc) | |
def _sync_list(self, todo, local_dir, crc): | |
os.makedirs(local_dir, exist_ok=True) | |
for k,v in todo.items(): | |
if type(v) is dict: | |
self._sync_list(v, path.join(local_dir, k), crc) | |
elif v: | |
self._dload_with_retry(v, path.join(local_dir, k), crc) | |
else: | |
print(f"Skipping sync of {k} because link is {v}") | |
def sync(self, remote_dir, local_dir=".", recursive=False, crc=False): | |
if local_dir == None: | |
local_dir = "." | |
todo = self.listdir(remote_dir, recursive=recursive) | |
if todo is None: | |
self.download(remote_dir, local_dir, crc=crc) | |
else: | |
self._sync_list(todo, local_dir, crc) | |
import binascii | |
import os | |
import time | |
import zipfile | |
import logging | |
from selenium import webdriver | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
chromedriver_path = "/usr/lib/chromium/chromedriver" | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
def remove_path(path): | |
try: | |
print(f"rm {path}") | |
os.remove(path) | |
except Exception: | |
pass | |
def cleanup_data(path): | |
remove_path(f"{path}/Identification.crc") | |
remove_path(f"{path}/Identification.tgt") | |
remove_path(f"{path}/Journal.dat") | |
remove_path(f"{path}/STR.edf") | |
remove_path(f"{path}/SETTINGS") | |
def download_data(to_path): | |
from_path = "/" | |
share = ezshare(num_retries=3) | |
print(f"Synchronizing remote {from_path} -> {to_path}") | |
share.sync(from_path, to_path, recursive=True) | |
def file_modified_since_zip(file_path, relative_path, zipf): | |
try: | |
z_crc32_hash = zipf.getinfo(relative_path).CRC | |
f_crc32_hash = 0 | |
with open(file_path, "rb") as f: | |
while True: | |
chunk = f.read(4096) | |
if not chunk: | |
break | |
f_crc32_hash = binascii.crc32(chunk, f_crc32_hash) & 0xffffffff | |
if f_crc32_hash == z_crc32_hash: | |
print(f"{file_path} already in archive") | |
return False | |
else: | |
print(f"{file_path} ({f_crc32_hash}) != {relative_path} ({z_crc32_hash})") | |
return True | |
except KeyError: | |
print(f"{file_path} as {relative_path} is not exist in archive") | |
return True | |
def create_zip(folder_path, zip_path): | |
has_something_new = False | |
if os.path.isfile(zip_path): | |
with zipfile.ZipFile(zip_path, "a", zipfile.ZIP_DEFLATED) as zipf: | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
relative_path = os.path.relpath(file_path, os.path.dirname(folder_path)) | |
if file_modified_since_zip(file_path, relative_path, zipf): | |
has_something_new = True | |
break | |
if has_something_new: | |
break | |
else: | |
has_something_new = True | |
if has_something_new: | |
with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipf: | |
for root, dirs, files in os.walk(folder_path): | |
for file in files: | |
file_path = os.path.join(root, file) | |
relative_path = os.path.relpath(file_path, os.path.dirname(folder_path)) | |
print(f"adding {file_path} as {relative_path}") | |
zipf.write(file_path, relative_path) | |
return has_something_new | |
def upload_file(file_path, username, password): | |
options = Options() | |
options.add_argument("--disable-extensions") | |
options.add_argument("--disable-gpu") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument("--no-sandbox") | |
options.add_argument("--headless") | |
with webdriver.Chrome(service=Service(chromedriver_path), options=options) as driver: | |
driver.get('https://sleephq.com/users/sign_in') | |
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.ID, "user_email"))) | |
# Login | |
if driver.current_url == 'https://sleephq.com/users/sign_in': | |
logging.info("logging in...") | |
driver.find_element(By.ID, 'user_email').send_keys(username) | |
driver.find_element(By.ID, 'user_password').send_keys(password) | |
driver.find_element(By.TAG_NAME, 'button').click() | |
WebDriverWait(driver, 10).until(EC.url_changes(driver.current_url)) | |
# Upload | |
dashboard_url = driver.current_url | |
team_id = dashboard_url.split('/')[-1].split("?")[0] | |
import_url = f'https://sleephq.com/account/teams/{team_id}/imports' | |
file_element = '//input[@type="file" and @class="dz-hidden-input"]' | |
logging.info(f"import page {import_url}") | |
driver.get(import_url) | |
WebDriverWait(driver, 10).until(EC.presence_of_element_located((By.XPATH, file_element))) | |
driver.find_element(By.XPATH, file_element).send_keys(file_path) | |
time.sleep(60) | |
driver.find_element(By.ID, 'start-upload-button').click() | |
logging.info("uploading...") | |
time.sleep(60) | |
logging.info("upload completed.") | |
ezshare_path = "/share/ezshare/" | |
ezshare_archive_path = f"/share/ezshare_archive.zip" | |
cleanup_data(ezshare_path) | |
download_data(ezshare_path) | |
if create_zip(ezshare_path, ezshare_archive_path): | |
sleep_hq_username = os.environ.get("SLEEP_HQ_USERNAME") | |
sleep_hq_password = os.environ.get("SLEEP_HQ_PASSWORD") | |
upload_file(ezshare_archive_path, sleep_hq_username, sleep_hq_password) | |
else: | |
logging.info("nothing new in archive, nothing to upload") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment