Skip to content

Instantly share code, notes, and snippets.

@arjunprakash027
Last active October 7, 2023 19:21
Show Gist options
  • Save arjunprakash027/68cf56975dd2efe8e0e759fa0437986c to your computer and use it in GitHub Desktop.
Save arjunprakash027/68cf56975dd2efe8e0e759fa0437986c to your computer and use it in GitHub Desktop.
Python codes to use aiohttp. Try using aiohttp instead of requests!
#simple example
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get('http://python.org') as response:
print(response.status)
print(response.headers)
html = await response.text()
print(html[:60])
#asyncio.run(main())
#running a aiohttp server
from aiohttp import web
async def handle(request):
name = request.match_info.get('name', "Anonymous")
text = "Hello, " + name
return web.Response(text=text)
app = web.Application()
app.add_routes([web.get('/', handle), web.get('/{name}', handle)])
#web.run_app(app)
#passsing parameters in url
async def post_req(session: aiohttp.ClientSession, url:str, data:dict):
async with session.post(url, data=data) as response:
return await response.text()
async def main():
params = {'name':'John', 'age':25}
async with aiohttp.ClientSession() as session:
async with session.get('http://httpbin.org/get',params=params) as response:
print(await response.text())
expected = "http://httpbin.org/get?name=John&age=25"
assert str(response.url) == expected
async with session.post('http://httpbin.org/post',data=b'data') as response:
print(await response.text())
response_post = await post_req(session,'http://httpbin.org/post',params)
print(response_post)
#asyncio.run(main())
#doing everything in true async fashion
async def post_req(session: aiohttp.ClientSession, url:str, data:dict):
async with session.post(url, data=data) as response:
return await response.text()
async def main():
params = {'name':'John', 'age':25}
async with aiohttp.ClientSession() as session:
tasks = []
tasks.append(session.get('http://httpbin.org/get',params=params))
tasks.append(session.post('http://httpbin.org/post',data=b'data'))
tasks.append(post_req(session,'http://httpbin.org/post',params))
results = await asyncio.gather(*tasks)
#print(results)
for result in results:
try:
print(await result.text())
except:
print(result)
print('-' * 80)
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment