Skip to content

Instantly share code, notes, and snippets.

@slabko
Last active March 18, 2018 05:53
Show Gist options
  • Save slabko/98eccb2ea4f94237a20eafcc4dd0848b to your computer and use it in GitHub Desktop.
Save slabko/98eccb2ea4f94237a20eafcc4dd0848b to your computer and use it in GitHub Desktop.
asyncio without with bare hands
from itertools import islice
from collections import deque
from math import sqrt
import time
import asyncio
def lucas():
yield 2
a = 2
b = 1
while True:
yield b
a, b = b, a + b
async def search(iterable, predicate):
for item in iterable:
if await predicate(item):
return item
await asyncio.sleep(0)
raise ValueError("Not Found")
async def is_prime(x):
if x < 2:
return False
for i in range(2, int(sqrt(x)) + 1):
if x % i == 0:
return False
await asyncio.sleep(0)
return True
async def print_matches(iterable, predicate):
for item in iterable:
matches = await predicate(item)
if matches:
print(f"Found: {item}")
async def repetitive_message(message, interval_seconds):
while True:
print(message)
await asyncio.sleep(interval_seconds)
async def twelve_digit_prime(x):
return (await is_prime(x)) and (len(str(x)) == 12)
async def monitored_search(iterable, predicate, future):
try:
found_item = await search(iterable, predicate)
except ValueError as not_found:
future.set_exception(not_found)
else:
future.set_result(found_item)
async def montitor_future(future, interval_seconds):
while not future.done():
print("Waiting...")
await asyncio.sleep(interval_seconds)
print("Done")
loop = asyncio.get_event_loop()
co_obj = search(lucas(), twelve_digit_prime)
search_task = asyncio.ensure_future(co_obj, loop=loop)
monitor_task = asyncio.ensure_future(montitor_future(search_task, 1.0), loop=loop)
search_and_monitor_future = asyncio.gather(search_task, monitor_task)
loop.run_until_complete(search_and_monitor_future)
print(search_task.result())
loop.close()
@slabko
Copy link
Author

slabko commented Mar 17, 2018

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment