Skip to content

Instantly share code, notes, and snippets.

@sweetmoniker
Created October 13, 2017 19:56
Show Gist options
  • Save sweetmoniker/58c63c6c107384bb642524f0ae1c9a48 to your computer and use it in GitHub Desktop.
Save sweetmoniker/58c63c6c107384bb642524f0ae1c9a48 to your computer and use it in GitHub Desktop.
This gist updates my previous Youtube scraper to function on the new Youtube layout. The nice thing about the new layout is that the data for all the videos is stored in one json block. Parsing it is fairly easy. This code functions as of 13 October 2017.
from selenium import webdriver
#from selenium.common.exceptions import NoSuchElementException
#from selenium.common.exceptions import TimeoutException
#from selenium.webdriver.common.by import By
#from selenium.webdriver.support import expected_conditions as EC
#from selenium.webdriver.support.ui import WebDriverWait
from bs4 import BeautifulSoup
from collections import namedtuple
import csv
import time
import datetime
import urllib.request
import json
###This script runs on selenium with Chrome. Follow the instructions here to install the webdriver: http://selenium-python.readthedocs.io/installation.html#drivers You probably have to change your PATH. Google it.###
page_url = "https://www.youtube.com/user/GODMODEINTERNET/videos"
Video = namedtuple("Video", "video_id link title duration views age")
def parse_video_div(div):
try:
video_id = div.find("a", "yt-simple-endpoint inline-block style-scope ytd-thumbnail")['href'].replace("/watch?v=", "")
link = "https://www.youtube.com/watch?v=" + video_id
title = div.find("a", "yt-simple-endpoint style-scope ytd-grid-video-renderer")['title']
#in case a stream comes through without a duration tag
if hasattr(div.find("span", "style-scope ytd-thumbnail-overlay-time-status-renderer"), 'contents'):
duration = div.find("span", "style-scope ytd-thumbnail-overlay-time-status-renderer").text.replace('\n', '').replace(' ', '')
else:
duration = '00:00'
views = 0
full_label = div.find("a", "yt-simple-endpoint style-scope ytd-grid-video-renderer")['aria-label']
views = full_label[len(full_label)-(full_label[::-1].find(' ', full_label.find('sweiv ')+7)):]
age = 0
age = div.find_all("span", "style-scope ytd-grid-video-renderer")[1].text
except:
print("Something got skipped. Don't worry about it. It probably wasn't important anyway.")
pass
return Video(video_id, link, title, duration, views, age)
def parse_videos_page(page):
video_divs = page.find_all("div", "style-scope ytd-grid-video-renderer")
#video_divs has lots of 'partial' video components when searching this way.
#every 11th div is the full content of a video.
num_video_divs = len(video_divs)
full_video_divs = []
for i in range(num_video_divs):
if i % 11 == 0:
full_video_divs.append(video_divs[i])
return [parse_video_div(div) for div in full_video_divs]
def load_page(page_url):
try:
global driver
driver = webdriver.Chrome()
time.sleep(1)
#wait = WebDriverWait(driver, 8)
driver.get(page_url)
time.sleep(3)
lastHeight = driver.execute_script("return document.documentElement.scrollHeight")
while True:
driver.execute_script("window.scrollTo(0, document.documentElement.scrollHeight);")
time.sleep(2)
newHeight = driver.execute_script("return document.documentElement.scrollHeight")
if newHeight == lastHeight:
print("Page fully developed.")
break
lastHeight = newHeight
except (KeyboardInterrupt, SystemExit):
print("Program Stopped")
raise
except Exception as e:
print(e)
print("Some kind of exception occurred. You should probably try again.")
pass
return driver.page_source.encode('utf-8')
def get_videos(page_url):
page = BeautifulSoup(load_page(page_url),"html.parser")
print("Source code retrieved.")
driver.quit()
videos = parse_videos_page(page)
print("Videos parsed.")
return videos
def request_until_succeed(url):
req = urllib.request.Request(url)
success = False
while success is False:
try:
response = urllib.request.urlopen(req)
if response.getcode() == 200:
success = True
except Exception as e:
print(e)
print("Error for URL {}: {}".format(url, datetime.datetime.now()))
print("retrying.")
time.sleep(5)
return response.read().decode('utf-8')
def scrapeYoutubeVideos(page_url):
if page_url.find("channel") > 0:
youtube_name = page_url[page_url.find("channel")+8:page_url.find("videos")-1]
else:
youtube_name = page_url.replace('https://www.youtube.com/user/','').replace('/videos','')
with open ('C:\\Users\\xhargrav\\Desktop\\[user_name]_YouTube.csv'.format(youtube_name), 'w', newline='', encoding='utf-8') as file:
csv.writer(file).writerow(["id","link","title","duration","views","age","likes","dislikes","published"])
scrape_starttime = datetime.datetime.now()
print("Scraping {} Youtube: {} \n Pay attention to the messages below.".format(youtube_name, scrape_starttime))
videos = get_videos(page_url)
num_processed = 0
num_errors = 0
for video in videos:
while True:
try:
video_url = video[1]
page_source = request_until_succeed(video_url)
page = BeautifulSoup(page_source, 'html.parser')
likes = json.loads(page.find("button", title="I like this").text.replace(",",""))
dislikes = json.loads(page.find("button", title="I dislike this").text.replace(",",""))
published = page.find("strong", "watch-time-text").text
video = video + (likes,) + (dislikes,) + (published,)
csv.writer(file).writerow(video)
num_processed += 1
if num_processed % 100 == 0:
print("{} videos processed: {}".format(num_processed, datetime.datetime.now()))
except Exception as e:
print(e)
print(video[1])
print("Error retrieving data for this video. Retrying")
num_errors += 1
continue
except (KeyboardInterrupt, SystemExit):
print("Program Stopped")
raise
break
file.close()
print("Done! {} videos scraped in {}".format(len(videos), datetime.datetime.now() - scrape_starttime))
print("{} errors.".format(num_errors))
scrapeYoutubeVideos(page_url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment