Skip to content

Instantly share code, notes, and snippets.

@keithchambers
Created February 2, 2025 00:12
Show Gist options
  • Save keithchambers/a18b83eaa2a1423a4020d0019e8886ac to your computer and use it in GitHub Desktop.
Save keithchambers/a18b83eaa2a1423a4020d0019e8886ac to your computer and use it in GitHub Desktop.
MVSep.com Data Downloader
import argparse
import json
import re
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
import requests
from bs4 import BeautifulSoup
import yaml
BASE_URL = "https://mvsep.com/quality_checker/queue"
# Increase concurrency for both steps to 200.
MAX_DETAIL_WORKERS = 200
MAX_PAGE_WORKERS = 200
# Global mapping for worker IDs based on thread identity.
worker_ids = {}
worker_id_lock = threading.Lock()
next_worker_id = 1
def get_worker_id():
"""Assign and retrieve a unique worker ID for the current thread."""
global next_worker_id
tid = threading.get_ident()
with worker_id_lock:
if tid not in worker_ids:
worker_ids[tid] = next_worker_id
next_worker_id += 1
return worker_ids[tid]
def fetch_url(url):
"""Fetch the content at the given URL or raise an exception."""
response = requests.get(url)
response.raise_for_status()
return response.text
def parse_metrics(text):
"""
Parse metrics from a text string and return a dictionary mapping metric names to float values.
Expected pattern: 'SDR <Metric Name>: <number>'
"""
metrics = {}
pattern = r"(SDR [A-Za-z0-9 ]+):\s*([\d\.]+)"
for key, value in re.findall(pattern, text):
try:
metrics[key.strip()] = float(value.strip())
except ValueError:
pass
return metrics
def parse_row(row):
"""
Parse a table row element representing a run.
Extracts the run's id, date (YYYY-MM-DD), description, status, metrics, and a temporary detail URL.
Returns None if the run should be skipped based on filtering criteria.
"""
run = {}
row_text = row.get_text(" ", strip=True)
tokens = row_text.split()
if len(tokens) < 4:
return None
# Assume first token is the ID and second token is the date (YYYY-MM-DD).
run_id = tokens[0]
date_str = tokens[1] # Expected to be in YYYY-MM-DD format.
run["id"] = run_id
run["date"] = date_str
# Locate the dash ("-") that separates description from status/metrics.
try:
dash_index = tokens.index("-")
except ValueError:
dash_index = -1
if dash_index == -1:
run["description"] = " ".join(tokens[3:])
run["status"] = ""
run["metrics"] = {}
else:
run["description"] = " ".join(tokens[3:dash_index])
status_tokens = []
metrics_tokens = []
for token in tokens[dash_index + 1:]:
if token.startswith("SDR"):
metrics_tokens.append(token)
elif metrics_tokens:
metrics_tokens.append(token)
else:
status_tokens.append(token)
run["status"] = " ".join(status_tokens)
metrics_text = " ".join(metrics_tokens)
run["metrics"] = parse_metrics(metrics_text)
# Filtering: Skip runs that contain "SDR Restored"
if "SDR Restored" in run["metrics"]:
return None
# Filtering: For runs on 2023-02-11 or 2023-02-13, skip if any metric value exceeds 20.
if run["date"] in ("2023-02-11", "2023-02-13"):
if any(value > 20 for value in run["metrics"].values()):
return None
# Extract the detail URL (temporarily store it for detail fetching).
a_tag = row.find("a")
if a_tag and a_tag.get("href", "").strip():
href = a_tag.get("href").strip()
if href.startswith("/"):
run["detail_url"] = "https://mvsep.com" + href
else:
run["detail_url"] = href
else:
run["detail_url"] = f"https://mvsep.com/quality_checker/queue/{run_id}"
run["details"] = ""
return run
def trim_details(details_text):
"""
Trim redundant content from the details text.
If "Algorithm info:" is present, extract from there up to "Date added:" (if found),
otherwise return the first 200 characters. Extra whitespace is removed.
"""
details_text = details_text.strip()
if "Algorithm info:" in details_text:
start = details_text.find("Algorithm info:")
if "Date added:" in details_text[start:]:
end = details_text.find("Date added:", start)
details_text = details_text[start:end].strip()
else:
details_text = details_text[start:].strip()
else:
details_text = details_text[:200].strip()
return " ".join(details_text.split())
def fetch_detail(url):
"""
Fetch and parse the detail page from the given URL.
Returns the trimmed details text.
"""
html = fetch_url(url)
soup = BeautifulSoup(html, "html.parser")
details_container = soup.find("div", id="run-details")
if details_container:
raw_details = details_container.get_text(" ", strip=True)
else:
raw_details = soup.get_text(" ", strip=True)
return trim_details(raw_details)
def process_run(run):
"""
Log that details are being fetched for the given ID.
Attempt to fetch and process the detail page for a run up to 3 times.
Returns True if successful; otherwise, returns False.
"""
print(f"Fetching details data for ID {run['id']}...")
worker_id = get_worker_id()
for attempt in range(3):
try:
detail_content = fetch_detail(run["detail_url"])
if detail_content.strip():
run["details"] = detail_content
return True
except Exception:
pass # Silently ignore errors
time.sleep(0.5)
return False
def fetch_page(page_num):
"""
Fetch a page by its page number.
Returns the HTML content if successful, else returns None.
Does not log errors if the page does not exist.
"""
page_url = f"{BASE_URL}?page={page_num}"
try:
return fetch_url(page_url)
except Exception:
return None
def main():
parser = argparse.ArgumentParser(description="MVSEP Queue Scraper")
parser.add_argument("-f", "--format", default="yaml", choices=["yaml", "json"],
help='Output format: "yaml" (default) or "json"')
args = parser.parse_args()
# Fetch pages 1 to 100 concurrently with up to 200 workers.
pages_html = {}
with ThreadPoolExecutor(max_workers=MAX_PAGE_WORKERS) as page_executor:
future_to_page = {page_executor.submit(fetch_page, page_num): page_num for page_num in range(1, 101)}
for future in as_completed(future_to_page):
page_num = future_to_page[future]
page_html = future.result()
if page_html:
pages_html[page_num] = page_html
if not pages_html:
print("No pages fetched. Exiting.")
sys.exit(0)
runs = []
# Parse each fetched page.
for page_num, html in pages_html.items():
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table")
if not table:
continue # Skip pages without a table.
rows = table.find_all("tr")
if not rows or len(rows) <= 1:
continue # Skip pages without data rows.
page_runs_count = 0
for row in rows[1:]:
run = parse_row(row)
if run:
runs.append(run)
page_runs_count += 1
print(f"Page {page_num}: Found {page_runs_count} runs.")
if not runs:
print("No runs found after filtering. Exiting.")
sys.exit(0)
print(f"Total runs collected: {len(runs)}")
# Use up to 200 concurrent workers for fetching details.
skipped_ids = set()
with ThreadPoolExecutor(max_workers=MAX_DETAIL_WORKERS) as detail_executor:
future_to_run = {detail_executor.submit(process_run, run): run for run in runs}
for future in as_completed(future_to_run):
run = future_to_run[future]
try:
success = future.result()
if not success:
skipped_ids.add(run["id"])
except Exception:
skipped_ids.add(run["id"])
# Remove the temporary detail_url field from each run.
for run in runs:
run.pop("detail_url", None)
# Merge the status field into the description field, remove any occurrence of "Sucessfully processed"
# and then omit the status field. Also, if a run has no metrics, omit it.
final_runs = []
for run in runs:
# Omit runs with no metric data.
if not run.get("metrics"):
skipped_ids.add(run["id"])
continue
status_field = run.pop("status", "").strip()
# Remove the phrase "Sucessfully processed" (case insensitive)
status_clean = re.sub(r'[\+ ]*sucessfully processed', '', status_field, flags=re.IGNORECASE).strip()
# Merge status into description.
if run.get("description", "").strip():
if status_clean:
merged = run["description"].strip() + " " + status_clean
else:
merged = run["description"].strip()
else:
merged = status_clean
# Also remove "Sucessfully processed" from the merged description
merged = re.sub(r'\s*sucessfully processed', '', merged, flags=re.IGNORECASE).strip()
run["description"] = merged
# Omit run if final description still contains "error" (case-insensitive)
if "error" in run["description"].lower():
skipped_ids.add(run["id"])
continue
final_runs.append(run)
# Exclude any runs that were marked as skipped.
filtered_runs = [run for run in final_runs if run["id"] not in skipped_ids]
if not filtered_runs:
print("No successful runs to write out after filtering errors. Exiting.")
sys.exit(0)
current_date = datetime.now().strftime("%Y-%m-%d")
file_ext = "json" if args.format.lower() == "json" else "yaml"
file_name = f"MVSep-queue-{current_date}.{file_ext}"
output_data = {"runs": filtered_runs}
try:
with open(file_name, "w", encoding="utf-8") as f:
if args.format.lower() == "json":
json.dump(output_data, f, indent=2)
else:
yaml.dump(output_data, f, sort_keys=False, allow_unicode=True)
print(f"Output written to {file_name}")
except Exception as e:
print(f"Error writing output file: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment