Created
October 16, 2022 18:40
-
-
Save GeorgeOduor/f4a060cd3093d8fd882c6fccfda0c312 to your computer and use it in GitHub Desktop.
Scrap Jobs from linked in and save to googlesheets
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from selenium import webdriver | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from bs4 import BeautifulSoup as bs | |
from datetime import datetime | |
import pygsheets | |
import pandas as pd | |
from time import sleep | |
from requests_html import HTML,HTMLSession | |
session = HTMLSession() | |
class LinkedInJobs(): | |
def __init__(self, headless, html_session,gc,webdriver): | |
self.headless = headless | |
self.html_session = html_session | |
self.gc = pygsheets.authorize(service_file=gc) | |
self.webdriver = webdriver | |
def google_sheets(self,sheet,write=False,data=None): | |
gc = self.gc | |
sh = gc.open("WebResults") | |
wks = sh[sheet] | |
if write: | |
# code to write | |
if data is not None: | |
existing = wks.get_as_df(start='H1').shape[0] | |
wks.set_dataframe(data,(existing,1)) | |
print(f"New {data.shape[0]} records added!") | |
else: | |
print('No data') | |
else: | |
# read | |
read =wks.get_as_df(start='H1') | |
existing_links = [i for i in read.job_link] | |
return existing_links | |
def initialize_selenium(self): | |
try: | |
# add settings | |
service = Service( | |
executable_path=self.webdriver) | |
options = Options() | |
options.headless = self.headless | |
driver = webdriver.Chrome(service=service, options=options) | |
# get the url | |
search_url = "https://www.linkedin.com/jobs" | |
driver.get(search_url) | |
# search tearms | |
kwsearch = driver.find_element(By.NAME, 'keywords') | |
kwsearch.clear() | |
# kwsearch.send_keys(searchterm) | |
location = driver.find_element(By.NAME, 'location') | |
location.clear() | |
location.send_keys('Kenya') | |
driver.find_element( | |
By.XPATH, '//*[@id="main-content"]/section[1]/div/section/div[2]/button[2]').click() | |
# page = driver.page_source | |
return driver | |
except Exception as e: | |
print(e) | |
return None | |
def scroll_page(self, driver, scroll=True): | |
# scroll | |
last_height = driver.execute_script( | |
"return document.body.scrollHeight") | |
while True: | |
try: | |
driver.find_element( | |
By.XPATH, '//*[@id="main-content"]/section/button').click() | |
sleep(3) | |
except Exception as e: | |
pass | |
if scroll: | |
# Scroll down to bottom | |
driver.execute_script( | |
"window.scrollTo(0, document.body.scrollHeight);") | |
# Wait to load page | |
sleep(2) | |
# Calculate new scroll height and compare with last scroll height | |
new_height = driver.execute_script( | |
"return document.body.scrollHeight") | |
if new_height == last_height: | |
new_height = driver.execute_script( | |
"return document.body.scrollHeight") | |
page = driver.page_source | |
print("Full Page height ", new_height) | |
return page | |
last_height = new_height | |
def get_job_links(self, page): | |
try: | |
existing_links = self.google_sheets(1) | |
soup = bs(page, 'html.parser') | |
job_lists = soup.find(class_='jobs-search__results-list') | |
jobs = [i.get('href') for i in job_lists.find_all('a')] | |
for i in jobs[:]: | |
if i.endswith('trk=public_jobs_jserp-result_job-search-card-subtitle') or i.split("?")[0] in existing_links: | |
jobs.remove(i) | |
return jobs | |
except Exception as e: | |
return None | |
def get_external_url(self, driver): | |
try: | |
url = driver.find_element( | |
By.PARTIAL_LINK_TEXT, "Apply").get_attribute('href') | |
redirect_url = session.get(url).url | |
return redirect_url | |
except Exception as e: | |
return None | |
def job_details(self, details_soup, driver): | |
try: | |
details = details_soup.find(class_='show-more-less-html__markup') | |
extlink = self.get_external_url(driver) | |
out = { | |
'job_link': [details_soup.link.get('href')], | |
'parent_company': [extlink], | |
'extracted_time': [datetime.now()], | |
'details': [str(details)] | |
} | |
return pd.DataFrame(out).reset_index(drop=True), extlink | |
except Exception as e: | |
return None | |
def job_heading(self, details_soup): | |
try: | |
top_card = details_soup.find( | |
class_='top-card-layout__entity-info flex-grow flex-shrink-0 basis-0 babybear:flex-none babybear:w-full babybear:flex-none babybear:w-full') | |
card_content = [i.strip() for i in [i.strip() | |
for i in top_card.text.split("\n")] if i != ''][:5] | |
job_title = card_content[0] | |
if "remote" in job_title.lower().split(" "): | |
site = "Remote" | |
elif "hybrid" in job_title.lower().split(" "): | |
site = "Hybrid" | |
else: | |
site = "Onsite" | |
sleep(5) | |
company_name_details = top_card.find_all('span') | |
top = pd.DataFrame({ | |
"tob_title": [job_title], | |
"company_name": [card_content[1]], | |
"location": [card_content[2]], | |
"posted_date": [card_content[3]], | |
"applicants": [[i for i in card_content[4].split(" ") if i.isdigit()]], | |
"site": site | |
}) | |
return top | |
except Exception as e: | |
top_dummy = pd.DataFrame({ | |
"company_name": [], | |
"location": [], | |
"posted_date": [], | |
"applicants": [] | |
}) | |
return top_dummy | |
def job_meta(self, details_soup): | |
try: | |
job_meta = details_soup.find( | |
class_='description__job-criteria-list') | |
job_meta = pd.DataFrame([{ | |
"title": i.find("h3", class_="description__job-criteria-subheader").text.strip(), | |
"description": i.find("span").text.strip() | |
} for i in job_meta.find_all("li")]).T.reset_index(drop=True) | |
job_meta.rename(columns=job_meta.iloc[0], inplace=True) | |
job_meta.drop([0], axis=0, inplace=True) | |
return job_meta | |
except Exception as e: | |
dummy = pd.DataFrame({ | |
"Seniority level": [''], | |
"Employment type": [''], | |
"Job function": [''], | |
"Industries": [''], | |
}) | |
print(e) | |
return dummy | |
def fetch_update_data(self,scroll = False,show=True): | |
try: | |
driver = self.initialize_selenium() | |
page = self.scroll_page(driver, scroll) | |
links = self.get_job_links(page) | |
combined_fetch = [] | |
for i in links: | |
driver.get(i) | |
details_soup = bs(driver.page_source,"html.parser") | |
heading = self.job_heading(details_soup).reset_index(drop=True) | |
details,extlink = self.job_details(details_soup,driver) # type: ignore | |
meta = self.job_meta(details_soup).reset_index(drop=True) | |
combined_fetch.append(pd.concat([heading,details,meta],axis=1)) | |
if extlink is None: | |
sleep(randint(1,7)) | |
print("page",links.index(i)+1) | |
data = pd.concat(combined_fetch,axis=0).reset_index() | |
self.google_sheets(sheet = 1,write=True,data=data) | |
return data | |
except Exception as e: | |
return None | |
service_file = 'G:/python3/dist/personal/kivy/websearch/searchconsole-364317-b59ba153d6ed.json' | |
webdriver_ = "G:/python3/dist/chromedriver_win32/chromedriver_v2.exe" | |
init = LinkedInJobs(False, session,service_file,webdriver_) | |
init.fetch_update_data(scroll=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment