Last active
December 28, 2023 21:18
-
-
Save zulrang/0f2de61e045ce493d5dcdd58370ca86a to your computer and use it in GitHub Desktop.
Simulating Goroutine and Channel functionality in Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from typing import Any, AsyncGenerator, Coroutine, Generator, Never | |
class Channel: | |
def __init__(self, maxsize: int = 0): | |
self._queue: asyncio.Queue = asyncio.Queue(maxsize=maxsize) | |
async def push(self, item): | |
return await self._queue.put(item) | |
async def _async_gen(self): | |
yield await self._queue.get() | |
self._queue.task_done() | |
def __aiter__(self) -> AsyncGenerator[Any, None]: | |
return self._async_gen() | |
def go(coroutine: Generator[Any, None, Never] | Coroutine[Any, Any, Never]): | |
loop = asyncio.get_event_loop() | |
loop.create_task(coroutine) | |
async def producer(c: Channel): | |
i = 0 | |
while True: | |
await asyncio.sleep(0.5) # Simulating some work | |
await c.push(f'Item {i}') | |
print(f'Item {i} added to queue') | |
i += 1 | |
async def consumer(c: Channel, id: int): | |
async for item in c: | |
print(f'Processing {item} in consumer {id}') | |
await asyncio.sleep(3) | |
async def main(): | |
c = Channel(maxsize=10) | |
# Creating producer and consumer coroutines | |
go(producer(c)) | |
go(consumer(c, 1)) | |
go(consumer(c, 2)) | |
go(consumer(c, 3)) | |
# Wait for all tasks to be processed | |
await asyncio.Future() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment