Last active
July 7, 2025 19:58
-
-
Save greg-randall/381ff325f38783c6da3773ea449fb497 to your computer and use it in GitHub Desktop.
Python script that gets book genres from Goodreads for a list of book IDs and saves them to a JSONL file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import asyncio | |
| import argparse | |
| import json | |
| import sys | |
| from pathlib import Path | |
| from typing import List, Tuple, Set | |
| from bs4 import BeautifulSoup | |
| from curl_cffi.requests import AsyncSession | |
| from pathvalidate import sanitize_filename | |
| from tqdm import tqdm | |
| def save_debug_file(method: str, url: str, content: str) -> None: | |
| """Saves the content of a failed page to the debug directory using pathvalidate.""" | |
| if not content: | |
| return | |
| try: | |
| debug_dir = Path("debug") | |
| debug_dir.mkdir(exist_ok=True) | |
| safe_filename = sanitize_filename(url) | |
| filepath = debug_dir / f"{method}_{safe_filename}.html" | |
| filepath.write_text(content, encoding="utf-8") | |
| tqdm.write(f"Debug file saved: {filepath}") | |
| except Exception as e: | |
| tqdm.write(f"Could not save debug file for {url}: {e}") | |
| def parse_genres_from_html(html_content: str) -> Tuple[str, List[str]]: | |
| """ | |
| Parses HTML to find genres and determines the status of the parse. | |
| Returns: | |
| A tuple of (status, genres). | |
| - status: "SUCCESS" if the genre section was found (even if empty). | |
| - status: "FAILURE" if the page structure is unrecognizable. | |
| """ | |
| # Method 1: Try the modern __NEXT_DATA__ JSON blob. | |
| try: | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| if script_tag := soup.find('script', {'id': '__NEXT_DATA__', 'type': 'application/json'}): | |
| json_data = json.loads(script_tag.string) | |
| apollo_state = json_data.get('props', {}).get('pageProps', {}).get('apolloState', {}) | |
| # New, more reliable way to find the main book data | |
| root_query = apollo_state.get("ROOT_QUERY", {}) | |
| ref_key = next((k for k in root_query if k.startswith("getBookByLegacyId")), None) | |
| if ref_key: | |
| book_key_ref = root_query.get(ref_key, {}) | |
| book_key = book_key_ref.get("__ref") | |
| if book_key and book_key in apollo_state: | |
| book_data = apollo_state[book_key] | |
| if 'bookGenres' in book_data: | |
| book_genres = book_data.get('bookGenres', []) | |
| genres = [entry['genre']['name'] for entry in book_genres if entry.get('genre', {}).get('name')] | |
| return "SUCCESS", genres | |
| except Exception: | |
| pass # Fall through to the next method | |
| # Method 2: Fallback to scraping legacy HTML tags. | |
| try: | |
| soup = BeautifulSoup(html_content, 'html.parser') | |
| if genres_container := soup.find('div', attrs={'data-testid': 'genresList'}): | |
| genres = [] | |
| genre_links = genres_container.select('a.Button--tag span.Button__labelItem') | |
| for link in genre_links: | |
| genre_name = link.get_text(strip=True) | |
| if genre_name and 'show all' not in genre_name.lower(): | |
| genres.append(genre_name) | |
| return "SUCCESS", genres | |
| except Exception: | |
| pass | |
| return "FAILURE", [] | |
| async def get_book_genres(url: str, session: AsyncSession) -> Tuple[str, List[str], str]: | |
| """ | |
| Fetches page content using curl_cffi and parses it for genres. | |
| Returns: | |
| A tuple containing (status, genres, html_content). | |
| """ | |
| html_content = "" | |
| try: | |
| response = await session.get(url, impersonate="chrome110") | |
| response.raise_for_status() | |
| html_content = response.content.decode('utf-8', errors='ignore') | |
| status, genres = parse_genres_from_html(html_content) | |
| return status, genres, html_content | |
| except Exception as e: | |
| tqdm.write(f"Request error for {url}: {e}") | |
| return "FAILURE", [], html_content | |
| def load_processed_ids(output_file: Path) -> Set[str]: | |
| """Reads a .jsonl file and returns a set of all processed book_ids.""" | |
| if not output_file.exists(): | |
| return set() | |
| processed_ids = set() | |
| with output_file.open('r', encoding='utf-8') as f: | |
| for line in f: | |
| try: | |
| data = json.loads(line) | |
| if 'book_id' in data: | |
| processed_ids.add(data['book_id']) | |
| except json.JSONDecodeError: | |
| continue | |
| return processed_ids | |
| async def process_books(input_file: Path, output_file: Path, sleep_time: int, debug: bool) -> None: | |
| """The main asynchronous logic for processing book IDs with resume support.""" | |
| try: | |
| all_book_ids = [line.strip() for line in input_file.read_text(encoding="utf-8-sig").splitlines() if line.strip().isdigit()] | |
| except FileNotFoundError: | |
| print(f"Error: Input file not found at '{input_file}'") | |
| sys.exit(1) | |
| if not all_book_ids: | |
| print("No valid book IDs found in the input file.") | |
| return | |
| processed_ids = load_processed_ids(output_file) | |
| book_ids_to_process = [bid for bid in all_book_ids if bid not in processed_ids] | |
| if not book_ids_to_process: | |
| print("All book IDs have already been processed. Nothing to do.") | |
| return | |
| print(f"Found {len(all_book_ids)} total books, {len(processed_ids)} already processed. Resuming with {len(book_ids_to_process)} books.") | |
| stats = {"processed": 0, "failed": 0} | |
| base_url = "https://www.goodreads.com/book/show/" | |
| with output_file.open('a', encoding='utf-8') as f: | |
| async with AsyncSession() as session: | |
| for book_id in tqdm(book_ids_to_process, desc="Processing Books"): | |
| url = f"{base_url}{book_id}" | |
| status, genres, html_content = await get_book_genres(url, session) | |
| if status == "SUCCESS": | |
| stats["processed"] += 1 | |
| book_data = {"book_id": book_id, "genres": genres} | |
| f.write(json.dumps(book_data) + '\n') | |
| else: # status == "FAILURE" | |
| stats["failed"] += 1 | |
| tqdm.write(f"\nHard failure for book {book_id}. Will retry on next run.") | |
| if debug: | |
| save_debug_file("curl_cffi", url, html_content) | |
| await asyncio.sleep(sleep_time) | |
| # --- Statistics for this run --- | |
| print("\n--------------------------") | |
| print("📊 This Run's Statistics") | |
| print("--------------------------") | |
| print(f"Total Books Attempted in this run: {len(book_ids_to_process)}") | |
| print(f" ✅ Processed (won't be retried): {stats['processed']}") | |
| print(f" ❌ Failed (will be retried): {stats['failed']}") | |
| print("--------------------------\n") | |
| print(f"Finished! Results are in '{output_file}'.") | |
| def main() -> None: | |
| """Sets up arguments and runs the asynchronous book processing task.""" | |
| parser = argparse.ArgumentParser( | |
| description="Scrape book genres from Goodreads with resume support.", | |
| formatter_class=argparse.RawTextHelpFormatter) | |
| parser.add_argument("input_file", type=Path, help="The input file name containing book IDs, one per line.") | |
| parser.add_argument("-s", "--sleep", type=float, default=1, help="The sleep time in seconds between requests. Defaults to 1.") | |
| parser.add_argument("-o", "--output", type=Path, default="book_genres.jsonl", help="The output JSON Lines file name. Defaults to 'book_genres.jsonl'.") | |
| parser.add_argument("--debug", action="store_true", help="Enable debug mode to save failed pages to a 'debug' folder.") | |
| args = parser.parse_args() | |
| try: | |
| asyncio.run(process_books(args.input_file, args.output, args.sleep, args.debug)) | |
| except KeyboardInterrupt: | |
| print("\n\nProcess interrupted by user. Shutting down gracefully.") | |
| sys.exit(130) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment