Skip to content

Instantly share code, notes, and snippets.

@hernandohhoyos
Created August 8, 2024 20:28
Show Gist options
  • Save hernandohhoyos/91272909a09b922cda18585a1e95480c to your computer and use it in GitHub Desktop.
Save hernandohhoyos/91272909a09b922cda18585a1e95480c to your computer and use it in GitHub Desktop.
from confluent_kafka import TopicPartition, Consumer, Producer
from confluent_kafka.admin import AdminClient
from confluent_kafka.cimpl import NewTopic
message = {"topic": "my-topic", "payload": "mensajito"}
config = {
"producer_config": {
"bootstrap.servers": "broker:port,broker:port",
"security.protocol": "SSL",
},
"consumer_config": {
"bootstrap.servers": "broker:port,broker:port",
"group.id": "my_group",
"security.protocol": "SSL",
"max.poll.interval.ms": 300000,
},
}
producer_config = config.get("producer_config")
consumer_config = config.get("consumer_config")
def create_topics(provider, topics):
""" Create topics """
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=2) for topic in topics]
# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
fs = provider.admin_client.create_topics(new_topics)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} created".format(topic))
provider.consumer = Consumer(consumer_config)
provider.consumer.subscribe([topic])
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
def delete_topics(provider, topics):
""" delete topics """
# Returns a dict of <topic,future>.
fs = provider.admin_client.delete_topics(topics, operation_timeout=30)
# Wait for operation to finish.
for topic, f in fs.items():
try:
f.result() # The result itself is None
print("Topic {} deleted".format(topic))
except Exception as e:
print("Failed to delete topic {}: {}".format(topic, e))
def get_list_topics(provider):
""" Return the topics in a consumer group """
topics = []
# Get all topics
list_topics = provider.admin_client.list_topics().topics
for topic in list_topics:
topics.append(topic)
# Assign a topic partition for the topic
partitions = [
TopicPartition(topic, p)
for p in range(0, len(provider.consumer.list_topics().topics[topic].partitions))
]
# Get the consumer group offsets for this topic
offsets = provider.consumer.committed(partitions)
# If there are committed offsets for this topic, it means the consumer group is consuming from it
if any(offset.offset != -1001 for offset in offsets):
print(topic)
return topics
def publish_message(provider, message):
""" Publish a broker message """
if message["topic"] not in get_list_topics(provider):
create_topics(provider, [message["topic"]])
provider.producer.poll(0)
provider.producer.produce(
topic=message["topic"],
value=message["payload"],
callback=provider.delivery_report,
)
provider.producer.flush()
def listen_messages(provider):
provider.consumer.subscribe(topics)
while True:
msg = provider.consumer.poll(1.0)
if msg and "hhurtado" in msg.value():
print(msg.value())
class KafkaProvider(object):
admin_client = None
consumer = None
producer
provider = new KafkaProvider()
provider.admin_client = AdminClient(producer_config)
provider.consumer = Consumer(consumer_config)
provider.producer = Producer(producer_config)
# delete_topics(provider, [message["topic"]])
# create_topics(provider, [message["topic"]])
# publish_message(provider, message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment