Skip to content

Instantly share code, notes, and snippets.

@ashwath007
Created December 4, 2024 05:50
Show Gist options
  • Save ashwath007/8732dce6b3dbe7e64ff78c6427fb6064 to your computer and use it in GitHub Desktop.
Save ashwath007/8732dce6b3dbe7e64ff78c6427fb6064 to your computer and use it in GitHub Desktop.
GoogleScraper.py
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from webdriver_manager.chrome import ChromeDriverManager
from selenium.common.exceptions import WebDriverException, TimeoutException
import json
import time
import traceback
from bs4 import BeautifulSoup
MAX_RETRIES = 5
TIMEOUT = 300 # 5 minutes
FILE_PATH = 'ad_image_sources.txt'
SCROLL_DELAY = 3 # seconds between scrolls
PAGE_LOAD_DELAY = 5 # seconds after page load
CARD_INSPECTION_DELAY = 1 # seconds between inspecting cards
def extract_image_src(html_content):
soup = BeautifulSoup(html_content, 'html.parser')
img_tag = soup.find('img')
if img_tag and 'src' in img_tag.attrs:
return img_tag['src']
return None
def extract_ad_data(driver):
ad_data = []
try:
ad_cards = WebDriverWait(driver, 10).until(
EC.presence_of_all_elements_located((By.TAG_NAME, 'creative'))
)
print(f"Number of ad cards found: {len(ad_cards)}")
# Extract header information (only needs to be done once)
try:
# Extract total ads count
ads_count_element = driver.find_element(By.CLASS_NAME, 'ads-count-searchable')
total_ads = ads_count_element.text.replace('~', '').replace(' ads', '').strip() if ads_count_element else None
print(f"Total ads found: {total_ads}")
# Extract legal name
legal_name_element = driver.find_element(By.XPATH, "//*[contains(text(), 'Legal name:')]/..")
legal_name = legal_name_element.text.replace('Legal name:', '').strip() if legal_name_element else None
print(f"Legal name found: {legal_name}")
# Extract region/based in
region_element = driver.find_element(By.XPATH, "//*[contains(text(), 'Based in:')]/..")
region = region_element.text.replace('Based in:', '').strip() if region_element else None
print(f"Region found: {region}")
except Exception as e:
print(f"Error extracting header information: {str(e)}")
legal_name = None
region = None
total_ads = None
# Create metadata dictionary for the overall information
metadata = {
'total_ads': total_ads,
'legal_name': legal_name,
'region': region
}
with open(FILE_PATH, 'a', encoding='utf-8') as f:
for index, card in enumerate(ad_cards):
try:
print(f"\nInspecting ad card {index + 1}:")
time.sleep(CARD_INSPECTION_DELAY)
# Extract advertiser name
try:
advertiser_element = card.find_element(By.CLASS_NAME, 'advertiser-name')
advertiser_name = advertiser_element.text if advertiser_element else None
except:
print("Could not find advertiser name element")
advertiser_name = None
# Extract ad link - try multiple methods
try:
# First try: Look for creative-bounding-box div
ad_link = None
bounding_box = card.find_elements(By.CLASS_NAME, 'creative-bounding-box')
if bounding_box:
ad_link = bounding_box[0].get_attribute('href')
# Second try: Look for parent anchor tag if first method failed
if not ad_link:
parent_a = card.find_elements(By.XPATH, './/ancestor::a[1]')
if parent_a:
ad_link = parent_a[0].get_attribute('href')
# Third try: Look for any div with href containing 'creative'
if not ad_link:
creative_divs = card.find_elements(By.XPATH, './/div[contains(@href, "/creative/")]')
if creative_divs:
ad_link = creative_divs[0].get_attribute('href')
# Convert relative URL to absolute URL
if ad_link and ad_link.startswith('/'):
ad_link = f"https://adstransparency.google.com{ad_link}"
print(f"Ad link found: {ad_link}")
elif not ad_link:
print("No ad link found using any method")
except Exception as e:
print(f"Error extracting ad link: {str(e)}")
ad_link = None
html_content = card.get_attribute('outerHTML')
img_src = extract_image_src(html_content)
ad_details = {
'advertiser': advertiser_name,
'legal_name': legal_name,
'region': region,
'image_src': img_src,
'ad_link': ad_link
}
if img_src:
print(f"Image source found: {img_src}")
print(f"Advertiser: {advertiser_name}")
f.write(f"{img_src}\n")
ad_data.append(ad_details)
else:
print("No image source found in this ad card")
except Exception as e:
print(f"Error extracting data from ad card {index + 1}: {str(e)}")
# Return both metadata and ad_data
return {'metadata': metadata, 'ads': ad_data}
except TimeoutException:
print("Timeout waiting for ad cards to load")
except Exception as e:
print(f"Unexpected error during ad data extraction: {str(e)}")
traceback.print_exc()
return {'metadata': {}, 'ads': []}
def scroll_to_bottom(driver, max_attempts=100):
"""
Enhanced scroll function to ensure complete scrolling to the bottom
"""
print("Starting to scroll the page...")
last_height = 0
same_height_count = 0
scroll_attempt = 0
while scroll_attempt < max_attempts:
try:
# Get current scroll height
current_height = driver.execute_script("return Math.max( document.body.scrollHeight, document.body.offsetHeight, document.documentElement.clientHeight, document.documentElement.scrollHeight, document.documentElement.offsetHeight );")
# Scroll in smaller increments for better reliability
driver.execute_script(f"window.scrollTo(0, {current_height/2});")
time.sleep(1)
driver.execute_script(f"window.scrollTo(0, {current_height});")
# Add a more robust wait
time.sleep(SCROLL_DELAY)
# Check if we've reached the bottom
new_height = driver.execute_script("return Math.max( document.body.scrollHeight, document.body.offsetHeight, document.documentElement.clientHeight, document.documentElement.scrollHeight, document.documentElement.offsetHeight );")
if new_height == last_height:
same_height_count += 1
print(f"Same height detected {same_height_count} times")
if same_height_count >= 5: # Increased from 3 to 5 for more certainty
print("Reached the bottom of the page - no new content loading")
# Final check - try one more aggressive scroll
driver.execute_script("window.scrollTo(0, document.body.scrollHeight * 2);")
time.sleep(SCROLL_DELAY * 2)
final_height = driver.execute_script("return Math.max( document.body.scrollHeight, document.body.offsetHeight, document.documentElement.clientHeight, document.documentElement.scrollHeight, document.documentElement.offsetHeight );")
if final_height == new_height:
print("Confirmed bottom of page reached")
break
else:
print("Found more content after final check")
same_height_count = 0
else:
same_height_count = 0
last_height = new_height
scroll_attempt += 1
print(f"Scroll attempt {scroll_attempt}, Current height: {new_height}")
# Try to ensure all content is loaded
try:
driver.execute_script("window.dispatchEvent(new Event('scroll'));")
except:
pass
except WebDriverException as e:
print(f"WebDriverException during scrolling: {str(e)}")
time.sleep(SCROLL_DELAY * 2) # Double delay on error
except Exception as e:
print(f"Unexpected error during scrolling: {str(e)}")
traceback.print_exc()
time.sleep(SCROLL_DELAY)
if scroll_attempt >= max_attempts:
print(f"Reached maximum scroll attempts ({max_attempts})")
print(f"Finished scrolling after {scroll_attempt} attempts")
# Final pause to ensure everything is loaded
time.sleep(SCROLL_DELAY * 2)
def create_driver():
chrome_options = Options()
# chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--window-size=1920,1080")
service = Service(ChromeDriverManager().install())
return webdriver.Chrome(service=service, options=chrome_options)
def scrape_with_retry(url, retries=0):
driver = None
try:
print(f"Attempt {retries + 1} to scrape {url}")
driver = create_driver()
driver.set_page_load_timeout(TIMEOUT)
print('Navigating to page...')
driver.get(url)
time.sleep(PAGE_LOAD_DELAY) # Add delay after page load
print(f"Page loaded with status: {driver.execute_script('return document.readyState')}")
print('Scrolling to bottom of page...')
scroll_to_bottom(driver)
print('Extracting ad data...')
ad_data = extract_ad_data(driver)
return ad_data
except Exception as error:
print(f"Error during scraping attempt {retries + 1}:")
traceback.print_exc()
if retries < MAX_RETRIES - 1:
print(f"Retrying... ({retries + 2}/{MAX_RETRIES})")
return scrape_with_retry(url, retries + 1)
else:
raise error
finally:
if driver:
try:
driver.quit()
except Exception:
pass
def extract_creative_sub_container(creative_sub):
"""Extract details from creative-sub-container"""
sub_container_info = {}
try:
# Get creative element details
creative = creative_sub.find_element(By.TAG_NAME, 'creative')
if creative:
sub_container_info['creative'] = {
'type': 'has-variation' if 'has-variation' in creative.get_attribute('class') else 'standard',
'interactive': creative.get_attribute('interactive') == '',
'classes': creative.get_attribute('class')
}
# Get container dimensions and scale
container = creative.find_element(By.CLASS_NAME, 'creative-container')
if container:
style = container.get_attribute('style')
sub_container_info['dimensions'] = {
'width': style.split('width: ')[1].split('px')[0] if 'width:' in style else None,
'height': style.split('height: ')[1].split('px')[0] if 'height:' in style else None
}
# Get transform scale
transform_div = container.find_element(By.CSS_SELECTOR, 'div[style*="transform"]')
if transform_div:
scale_style = transform_div.get_attribute('style')
scale = scale_style.split('scale(')[1].split(')')[0] if 'scale(' in scale_style else None
sub_container_info['scale'] = scale
# Get iframe details
try:
iframe = creative.find_element(By.TAG_NAME, 'iframe')
if iframe:
sub_container_info['iframe'] = {
'src': iframe.get_attribute('src'),
'width': iframe.get_attribute('width'),
'height': iframe.get_attribute('height'),
'sandbox': iframe.get_attribute('sandbox'),
'scrolling': iframe.get_attribute('scrolling')
}
except:
pass
# Check for policy violations
try:
policy_violation = creative_sub.find_element(By.CLASS_NAME, 'policy-violation-banner')
if policy_violation:
sub_container_info['policy_violation'] = {
'status': True,
'message': policy_violation.text
}
# Get visibility section details
visibility_section = creative_sub.find_element(By.CLASS_NAME, 'visibility-section')
if visibility_section:
visibility_text = visibility_section.find_element(By.CLASS_NAME, 'visibility-text')
sub_container_info['policy_violation']['details'] = visibility_text.text if visibility_text else None
# Check for ad policies button
policy_button = visibility_section.find_element(By.CLASS_NAME, 'policy-button')
if policy_button:
sub_container_info['policy_violation']['has_policy_button'] = True
except:
sub_container_info['policy_violation'] = {'status': False}
return sub_container_info
except Exception as e:
print(f"Error extracting creative sub-container details: {str(e)}")
return {}
def detailed_google_ads(driver, ad):
"""Extract detailed information from individual Google ad pages"""
detailed_info = {}
try:
# Navigate to ad detail page
ad_link = ad.get('ad_link')
if not ad_link:
return detailed_info
print(f"\nFetching detailed information for ad: {ad_link}")
driver.get(ad_link)
time.sleep(PAGE_LOAD_DELAY)
try:
# Extract Advertiser Name
advertiser_header = driver.find_element(By.CLASS_NAME, 'advertiser-header-link')
if advertiser_header:
detailed_info['advertiser_name'] = advertiser_header.text.strip()
detailed_info['advertiser_link'] = advertiser_header.get_attribute('href')
if detailed_info['advertiser_link'].startswith('/'):
detailed_info['advertiser_link'] = f"https://adstransparency.google.com{detailed_info['advertiser_link']}"
# Extract Last Shown Date - Updated XPath
try:
last_shown_div = driver.find_element(By.CLASS_NAME, 'last-shown')
if last_shown_div:
# Remove the "Last shown:" text and strip whitespace
last_shown_text = last_shown_div.text.replace('Last shown:', '').strip()
detailed_info['last_shown'] = last_shown_text
print(f"Last shown date found: {last_shown_text}")
except Exception as e:
print(f"Error extracting last shown date: {str(e)}")
# Extract Format - Updated selector
try:
format_div = driver.find_element(By.XPATH, "//div[contains(@class, 'property') and .//strong[text()='Format:']]")
if format_div:
# Get the text and remove "Format:" prefix
format_text = format_div.text.replace('Format:', '').strip()
detailed_info['format'] = format_text
print(f"Ad format found: {format_text}")
except Exception as e:
print(f"Error extracting format: {str(e)}")
detailed_info['format'] = None
# Extract Creative Sub-Containers
creative_subs = driver.find_elements(By.CLASS_NAME, 'creative-sub-container')
creative_details = []
for sub in creative_subs:
sub_details = extract_creative_sub_container(sub)
if sub_details:
creative_details.append(sub_details)
detailed_info['creative_details'] = creative_details
print(f"Successfully extracted detailed information including {len(creative_details)} creative sub-containers")
return detailed_info
except Exception as e:
print(f"Error extracting specific elements: {str(e)}")
return detailed_info
except Exception as e:
print(f"Error accessing ad detail page: {str(e)}")
return detailed_info
def main():
for test_run in range(1): # Run 3 times
try:
print(f"\n=== Starting Test Run {test_run + 1} of 3 ===\n")
url = 'https://adstransparency.google.com/advertiser/AR03077012839706132481?region=IN'
result = scrape_with_retry(url)
if not result['ads']:
print(f"Test Run {test_run + 1}: Failed to retrieve ad data")
continue
print(f"Test Run {test_run + 1}: Ad data saved successfully to {FILE_PATH}")
print(f"Test Run {test_run + 1}: Number of ads with images scraped: {len(result['ads'])}")
# Create new driver for detailed scraping
driver = create_driver()
# Get detailed information for each ad
detailed_ads = []
for ad in result['ads']:
detailed_info = detailed_google_ads(driver, ad)
# Merge original ad info with detailed info
detailed_ad = {**ad, **detailed_info}
detailed_ads.append(detailed_ad)
# Update result with detailed information
result['ads'] = detailed_ads
# Write complete data to JSON file with test run number
output_file = f'ad_data_run_{test_run + 1}.json'
with open(output_file, 'w') as f:
json.dump(result, f, indent=2)
print(f"Test Run {test_run + 1}: Detailed ad data saved to {output_file}")
# Clean up
driver.quit()
# Add delay between test runs
if test_run < 2: # Don't sleep after the last run
print(f"\nWaiting 10 seconds before starting next test run...\n")
time.sleep(10)
except Exception as error:
print(f'Error occurred in Test Run {test_run + 1}:', str(error))
traceback.print_exc()
print("\n=== All Test Runs Completed ===")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment