Created
March 30, 2025 03:05
-
-
Save viggy28/78d36d0a941cc3b7fbd4222247729e97 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import json | |
import time | |
import base64 | |
import os | |
from pydantic import BaseModel, Field | |
from datetime import datetime | |
class ExtractSchema(BaseModel): | |
company_name: str | |
company_mission: str | |
company_description: str | |
company_industry: str | |
pricing_model: bool | |
company_icp: str | |
company_website: str | |
team_size: int | |
team_description: str | |
def extract_data_from_website(api_key, urls): | |
""" | |
Extract data from websites using the Firecrawl API directly | |
Args: | |
api_key (str): Your Firecrawl API key | |
urls (list): List of URLs to crawl | |
Returns: | |
dict: JSON response from the API containing the extracted data | |
""" | |
# API endpoint for initiating extraction | |
extract_url = "https://api.firecrawl.dev/v1/extract" | |
# Headers | |
headers = { | |
"Authorization": f"Bearer {api_key}", | |
"Content-Type": "application/json" | |
} | |
# Request payload with screenshot action | |
payload = { | |
"urls": urls, | |
"prompt": "Extract the following information from the website: {ExtractSchema}", | |
"schema": ExtractSchema.model_json_schema(), | |
"scrapeOptions": { | |
"formats": ["markdown", "html", "screenshot@fullPage"], | |
#"formats": ["markdown", "html", "screenshot"], | |
"actions": [ | |
{ | |
"type": "screenshot", | |
"fullPage": True | |
} | |
] | |
} | |
} | |
# Make the API request to start the extraction | |
response = requests.post(extract_url, headers=headers, json=payload) | |
# Check if the request was successful | |
if response.status_code == 200: | |
job_data = response.json() | |
job_id = job_data.get('id') | |
if not job_id: | |
raise Exception("No job ID returned in the response") | |
print(f"Extraction job started with ID: {job_id}") | |
# Poll for results | |
return poll_for_results(api_key, job_id) | |
else: | |
raise Exception(f"API request failed with status code {response.status_code}: {response.text}") | |
def poll_for_results(api_key, job_id, max_attempts=30, delay=5): | |
""" | |
Poll the API for results of a specific job | |
Args: | |
api_key (str): Your Firecrawl API key | |
job_id (str): The job ID to poll for | |
max_attempts (int): Maximum number of polling attempts | |
delay (int): Delay between polling attempts in seconds | |
Returns: | |
dict: The final results of the extraction job | |
""" | |
# API endpoint for checking job status | |
status_url = f"https://api.firecrawl.dev/v1/extract/{job_id}" | |
# Headers | |
headers = { | |
"Authorization": f"Bearer {api_key}" | |
} | |
for attempt in range(max_attempts): | |
print(f"Polling for results (attempt {attempt + 1}/{max_attempts})...") | |
response = requests.get(status_url, headers=headers) | |
if response.status_code == 200: | |
result = response.json() | |
status = result.get('status') | |
if status == 'completed': | |
print("Extraction completed successfully!") | |
return result | |
elif status == 'failed': | |
raise Exception(f"Extraction job failed: {result.get('error', 'Unknown error')}") | |
else: | |
print(f"Job status: {status}. Waiting {delay} seconds before next check...") | |
time.sleep(delay) | |
else: | |
raise Exception(f"Failed to check job status: {response.status_code}: {response.text}") | |
raise Exception(f"Maximum polling attempts ({max_attempts}) reached without job completion") | |
def save_screenshots(result, output_dir="screenshots"): | |
""" | |
Save screenshots from the extraction result | |
Args: | |
result (dict): The extraction result containing screenshots | |
output_dir (str): Directory to save screenshots | |
Returns: | |
list: Paths to saved screenshot files | |
""" | |
# Create output directory if it doesn't exist | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
saved_files = [] | |
# Check if screenshots are in the result | |
screenshots = result.get('screenshots', []) | |
if not screenshots: | |
print("No screenshots found in the extraction result") | |
return saved_files | |
# Save each screenshot | |
for i, screenshot in enumerate(screenshots): | |
# Get the URL and base64 data | |
url = screenshot.get('url', f'unknown_{i}') | |
base64_data = screenshot.get('data') | |
if not base64_data: | |
print(f"No screenshot data found for {url}") | |
continue | |
# Clean the URL to create a valid filename | |
filename = url.replace('://', '_').replace('/', '_').replace('*', 'all') | |
if len(filename) > 100: # Limit filename length | |
filename = filename[:100] | |
# Add timestamp to ensure uniqueness | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filepath = os.path.join(output_dir, f"{filename}_{timestamp}.png") | |
# Decode and save the image | |
try: | |
# Remove the data:image/png;base64, prefix if present | |
if ',' in base64_data: | |
base64_data = base64_data.split(',', 1)[1] | |
image_data = base64.b64decode(base64_data) | |
with open(filepath, 'wb') as f: | |
f.write(image_data) | |
print(f"Screenshot saved: {filepath}") | |
saved_files.append(filepath) | |
except Exception as e: | |
print(f"Error saving screenshot for {url}: {str(e)}") | |
return saved_files | |
if __name__ == "__main__": | |
# Your API key | |
api_key = "fc-" | |
# URLs to crawl | |
urls = ["https://buildrappo.com/*"] | |
# Extract data | |
result = extract_data_from_website(api_key, urls) | |
# Print the full API response | |
print("\nFull API Response:") | |
print(json.dumps(result, indent=2)) | |
# Save screenshots | |
screenshot_files = save_screenshots(result) | |
# Print screenshot file paths | |
if screenshot_files: | |
print("\nScreenshots saved to:") | |
for file in screenshot_files: | |
print(f"- {file}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment