Last active
January 8, 2024 23:57
-
-
Save zulrang/d07c64b4f445c92351b4067544aa5346 to your computer and use it in GitHub Desktop.
Async DynamoDB Batch Writer using aioboto3 (typed)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aioboto3 | |
from types_aiobotocore_dynamodb.service_resource import Table | |
class DynamoDBBatchWriter: | |
""" | |
A class that batches up to 25 writes to DynamoDB. Flushes automatically when the batch reaches 25 items, | |
5 seconds pass, or when the `flush()` method is called. | |
This class is not thread-safe. | |
""" | |
def __init__(self, table: Table): | |
self.table = table | |
self._batch = [] | |
self._futures = set() | |
self._running = True | |
# flush periodically | |
self._periodic_task = asyncio.create_task(self._flush_periodically()) | |
async def _flush_periodically(self): | |
while self._running: | |
await asyncio.sleep(5) | |
print(f"Writer status: {len(self._batch)} items in batch, {len(self._futures)} futures") | |
await self._flush() | |
async def put_item(self, item: dict): | |
self._batch.append(item) | |
if len(self._batch) >= 25: | |
await self._flush() | |
def _remove_done(self, task: asyncio.Task): | |
self._futures.remove(task) | |
async def _flush(self): | |
async with self.table.batch_writer() as batch_writer: | |
for item in self._batch: | |
task = asyncio.create_task(batch_writer.put_item(Item=item)) | |
task.add_done_callback(self._remove_done) | |
self._futures.add(task) | |
self._batch = [] | |
async def flush(self): | |
await self._flush() | |
await asyncio.gather(*self._futures) | |
self._futures = [] | |
async def stop(self): | |
self._running = False | |
await self.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment