-
-
Save cuongnguyen2190/97aa5c868a0c3163ce2db03e0dd43c5a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Demo of a bunch of asyncio + gevent scenarios using aiogevent. | |
These are not tests in the sense that there are no assertions, but pytest | |
is a nice way to run them in bulk or choose a specific "test" | |
Run like this: | |
python3 -m pytest test_coexist.py -s -v | |
""" | |
import random | |
import aiogevent | |
import asyncio | |
asyncio.set_event_loop_policy(aiogevent.EventLoopPolicy()) | |
import gevent | |
import gevent.queue | |
def looper_sync(): | |
print() | |
for i in range(10): | |
print("loop {}".format(i)) | |
gevent.sleep(0.4) | |
async def looper_async(): | |
print() | |
for i in range(10): | |
print("loop {}".format(i)) | |
gevent.sleep(0.4) | |
def g_produce(queue, n): | |
print() | |
for x in range(1, n + 1): | |
# produce an item | |
print('GEVENT: producing {}/{}'.format(x, n)) | |
# simulate i/o operation using sleep | |
gevent.sleep(random.random()) | |
item = str(x) | |
# put the item in the queue | |
queue.put(item) | |
# indicate the producer is done | |
queue.put(None) | |
def g_consume(queue): | |
while True: | |
# wait for an item from the producer | |
item = queue.get() | |
if item is None: | |
# the producer emits None to indicate that it is done | |
break | |
# process the item | |
print('GEVENT: consuming item {}...'.format(item)) | |
# simulate i/o operation using sleep | |
gevent.sleep(random.random()) | |
def run_gevent_sync(): | |
g_queue = gevent.queue.Queue() | |
gevent.joinall([ | |
gevent.spawn(g_produce, g_queue, 10), | |
gevent.spawn(g_consume, g_queue), | |
]) | |
async def run_gevent_async(): | |
g_queue = gevent.queue.Queue() | |
gevent.joinall([ | |
gevent.spawn(g_produce, g_queue, 10), | |
gevent.spawn(g_consume, g_queue), | |
]) | |
# --- | |
async def a_produce(queue, n): | |
print() | |
for x in range(1, n + 1): | |
# produce an item | |
print('AIO: producing {}/{}'.format(x, n)) | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
item = str(x) | |
# put the item in the queue | |
await queue.put(item) | |
# indicate the producer is done | |
await queue.put(None) | |
async def a_consume(queue): | |
while True: | |
# wait for an item from the producer | |
item = await queue.get() | |
if item is None: | |
# the producer emits None to indicate that it is done | |
break | |
# process the item | |
print('AIO: consuming item {}...'.format(item)) | |
# simulate i/o operation using sleep | |
await asyncio.sleep(random.random()) | |
def run_aio(): | |
loop = asyncio.get_event_loop() | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro)) | |
# loop.close() | |
def test_aio_queue_and_gevent_queue(): | |
""" | |
2 sets of producers and consumers | |
gevent: call gevent.joinall inside a coroutine, and gather it | |
no work: async coroutines block while gevent greenlets run | |
""" | |
loop = asyncio.get_event_loop() | |
# asyncio | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
# gevent: call gevent.joinall inside a coroutine, and gather it | |
gevent_coro = run_gevent_async() | |
loop.run_until_complete( | |
asyncio.gather(producer_coro, consumer_coro, gevent_coro)) | |
# loop.close() | |
def test_aio_queue_and_gevent_wrapped_queue(): | |
""" | |
2 sets of producers and consumers | |
gevent: create futures from greenlets and gather them | |
works | |
""" | |
loop = asyncio.get_event_loop() | |
# asyncio | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
# gevent: create futures from greenlets and gather them | |
g_queue = gevent.queue.Queue() | |
producer_greenlet = gevent.spawn(g_produce, g_queue, 10) | |
producer_future = aiogevent.wrap_greenlet(producer_greenlet, loop) | |
consumer_greenlet = gevent.spawn(g_consume, g_queue) | |
consumer_future = aiogevent.wrap_greenlet(consumer_greenlet, loop) | |
# run | |
loop.run_until_complete( | |
asyncio.gather(producer_coro, | |
consumer_coro, | |
producer_future, | |
consumer_future)) | |
# loop.close() | |
async def gather_gevent(): | |
loop = asyncio.get_event_loop() | |
g_queue = gevent.queue.Queue() | |
producer_greenlet = gevent.spawn(g_produce, g_queue, 10) | |
producer_future = aiogevent.wrap_greenlet(producer_greenlet, loop) | |
consumer_greenlet = gevent.spawn(g_consume, g_queue) | |
consumer_future = aiogevent.wrap_greenlet(consumer_greenlet, loop) | |
await asyncio.gather(producer_future, consumer_future) | |
def test_aio_queue_and_gevent_nested_wrapped_queue(): | |
""" | |
2 sets of producers and consumers | |
gevent: call coroutine that gathers futures from greenlets | |
works | |
""" | |
loop = asyncio.get_event_loop() | |
# asyncio | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
# gevent: call coroutine that gathers futures from greenlets | |
gevent_coro = gather_gevent() | |
# run | |
loop.run_until_complete( | |
asyncio.gather(producer_coro, | |
consumer_coro, | |
gevent_coro)) | |
# loop.close() | |
def test_aio_queue_and_gevent_nested_wrapped_spawned_queue(): | |
""" | |
2 sets of producers and consumers | |
gevent: create a future from a greenlet that spawns more greenlets and | |
joins them | |
works: | |
this is somewhat surprising since this is so similar to | |
`test_aio_queue_and_gevent_queue`. this indicates that gevent functions | |
do not behave as expected *directly* inside a coroutine, but do when | |
nested anywhere below a greenlet. | |
""" | |
loop = asyncio.get_event_loop() | |
# asyncio | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
# gevent: create a future from a greenlet that spawns more greenlets and | |
# joins them | |
gevent_future = aiogevent.wrap_greenlet(gevent.spawn(run_gevent_sync)) | |
# run | |
loop.run_until_complete( | |
asyncio.gather(producer_coro, | |
consumer_coro, | |
gevent_future)) | |
# loop.close() | |
def test_aio_queue_and_gevent_looper(): | |
""" | |
one asyncio producer and consumer, and a loop with gevent.sleep | |
gevent: looper is asyncio coroutine | |
no work | |
""" | |
loop = asyncio.get_event_loop() | |
# asyncio | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
gevent_coro = looper_async() | |
loop.run_until_complete( | |
asyncio.gather(producer_coro, consumer_coro, gevent_coro)) | |
# loop.close() | |
def test_aio_queue_and_gevent_wrapped_looper(): | |
""" | |
one asyncio producer and consumer, and a loop with gevent.sleep | |
gevent: looper is greenlet wrapped as future | |
works | |
""" | |
loop = asyncio.get_event_loop() | |
a_queue = asyncio.Queue(loop=loop) | |
producer_coro = a_produce(a_queue, 10) | |
consumer_coro = a_consume(a_queue) | |
looper_future = aiogevent.wrap_greenlet(gevent.spawn(looper_sync)) | |
loop.run_until_complete( | |
asyncio.gather(producer_coro, consumer_coro, looper_future)) | |
# loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment