Created
January 30, 2021 10:15
-
-
Save myarik/bd6a9f22b3d0fb3209857ba803c309da to your computer and use it in GitHub Desktop.
Kafka examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from dataclasses import dataclass | |
import random | |
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer | |
async def consume(loop, total_events=10): | |
consumer = AIOKafkaConsumer( | |
'monitoring', | |
loop=loop, bootstrap_servers='kafka:9093', | |
group_id="my-group") | |
# Get cluster layout and join group `my-group` | |
await consumer.start() | |
run_consumer = True | |
while run_consumer: | |
try: | |
# Consume messages | |
async for msg in consumer: | |
print(f"Message received: {msg.value} at {msg.timestamp}") | |
except KeyboardInterrupt: | |
# Will leave consumer group; perform autocommit if enabled. | |
await consumer.stop() | |
print("Stoping consumer...") | |
run_consumer = False | |
async def send(loop, total_events=10): | |
producer = AIOKafkaProducer( | |
loop=loop, bootstrap_servers='kafka:9093') | |
# Get cluster layout and initial topic/partition leadership information | |
await producer.start() | |
for event_number in range(1, total_events + 1): | |
# Produce message | |
print(f"Sending event number {event_number}") | |
await producer.send_and_wait("monitoring", b"test") | |
# sleep for 2 seconds | |
await asyncio.sleep(2) | |
else: | |
# Wait for all pending messages to be delivered or expire. | |
await producer.stop() | |
print("Stoping producer...") | |
if __name__ == "__main__": | |
loop = asyncio.get_event_loop() | |
tasks = asyncio.gather(send(loop)) | |
loop.run_until_complete(tasks) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment