Last active
September 6, 2019 04:51
-
-
Save jtarang/8b4fd94ec85905924ce3d723b1093e1c to your computer and use it in GitHub Desktop.
Populate Cache on CDN for EpgReady
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiohttp import ( ClientSession, TCPConnector) | |
from aiohttp.client_exceptions import ContentTypeError | |
from ssl import create_default_context | |
# allows us to customize the session with cert files | |
def custom_session(path_to_cert=None, custom_headers=None): | |
if path_to_cert: | |
try: | |
# with open | no need for importing `os` to do path.exists | |
with open(path_to_cert) as cert: | |
cert.read() | |
sslcontext = create_default_context(cafile=path_to_cert) | |
conn = TCPConnector(ssl_context=sslcontext) | |
except: | |
return 'Cert file not Found!' | |
else: | |
conn = TCPConnector(verify_ssl=False) | |
session = ClientSession(connector=conn, headers=custom_headers) | |
return session | |
class Requests: | |
def __init__(self, ca_cert=None, proxy='', headers=None, session=None): | |
self.proxy = proxy | |
self.cert = ca_cert | |
self.headers = headers | |
self.session = self.init_session() | |
def init_session(self): | |
return custom_session(path_to_cert=self.cert, custom_headers=self.headers) | |
async def __aenter__(self): | |
return self | |
async def __aexit__(self, *args): | |
await self.session.close() | |
# takes in a request obj to extract the response | text or json | |
@staticmethod | |
async def get_response_from_request(request=None): | |
if request is not None: | |
try: | |
# https://docs.aiohttp.org/en/stable/client_advanced.html#disabling-content-type-validation-for-json-responses | |
# resp = await request.json(content_type='text/html; charset=utf-8') | |
resp = await request.json() | |
# ValueError thrown if no JSON decoded | |
except ContentTypeError: | |
resp = request.text | |
return resp | |
raise Exception('No request object passed!') | |
async def do_delete(self, url): | |
async with self.session as session: | |
async with session.delete(url, proxy=self.proxy) as response: | |
j_resp = await self.get_response_from_request(request=response) | |
return j_resp | |
async def do_get(self, url, data=None): | |
async with self.session as session: | |
async with session.get(url, proxy=self.proxy, params=data) as response: | |
j_resp = await self.get_response_from_request(request=response) | |
return j_resp | |
async def do_patch(self, url, data=None): | |
async with self.sesison as session: | |
async with session.patch(url, proxy=self.proxy, json=data) as response: | |
j_resp = await self.get_response_from_request(request=response) | |
return j_resp | |
async def do_post(self, url, data=None): | |
async with self.session as session: | |
async with session.post(url, proxy=self.proxy, json=data) as response: | |
j_resp = await self.get_response_from_request(request=response) | |
return j_resp | |
async def do_put(self, url, data=None): | |
async with self.session as session: | |
async with session.put(url, proxy=self.proxy, json=data) as response: | |
j_resp = await self.get_response_from_request(request=response) | |
return j_resp | |
79,1 All |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from async_client import Requests | |
from syncer import sync | |
class CacheReactor: | |
def __init__(self): | |
self.host = "https://api.epgready.com" | |
async def get_countries(self): | |
async with Requests() as self.requests: | |
countries_dict = await self.requests.do_get(self.host + '/api/countries') | |
return countries_dict | |
async def populate_cache(self): | |
countries_dict = await self.get_countries() | |
for country in countries_dict['countries']: | |
async with Requests() as self.requests: | |
await self.requests.do_get(self.host + '/api/channel-mappings', | |
data=dict(country=country['country_value'])) | |
return None | |
if __name__ == "__main__": | |
cache_reactor = CacheReactor() | |
sync(cache_reactor.populate_cache()) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment