Last active
April 7, 2026 12:56
-
-
Save luiscoms/d03bee25cd3391d92dfd496939c5b22e to your computer and use it in GitHub Desktop.
This script is used to send the contents of a specified file to a Kafka topic.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """ | |
| source: https://gist.github.com/luiscoms/d03bee25cd3391d92dfd496939c5b22e | |
| This script is used to send the contents of a specified file to a Kafka topic. | |
| Pre-requisites: | |
| - Python 3.x | |
| - Kafka broker running and accessible | |
| - Required Python packages installed (e.g., kafka-python, json, logging) | |
| The file is expected to contain JSON data, which will be read, processed, and sent as a message to the Kafka topic. | |
| The script supports both single file sending and bulk sending from a list of files. | |
| It also includes logging with unique identifiers for batch, trace, and message IDs to facilitate tracking. | |
| Usage: | |
| -- install dependencies | |
| pip install kafka-python | |
| -- send a single file to Kafka topic | |
| python kafka_send_file.py -f <filename> -b <kafka-broker> -t <topic-name> [--username <username>] [--password <password>] | |
| -- send multiple files listed in a file to Kafka topic | |
| python kafka_send_file.py --bulk-send <file-with-list-of-files> -b <kafka-broker> -t <topic-name> [--username <username>] [--password <password>] | |
| -- send file to a specific partition | |
| python kafka_send_file.py -f <filename> -b <kafka-broker> -t <topic-name> -p <partition-number> [--username <username>] [--password <password>] | |
| """ | |
| import argparse | |
| import json | |
| import logging | |
| import logging.config | |
| import time | |
| from os.path import isfile | |
| from uuid import uuid4 | |
| from kafka import KafkaProducer | |
| from kafka.errors import NoBrokersAvailable, KafkaError | |
| BATCH_ID = str(uuid4()) | |
| class SystemLogFilter(logging.Filter): | |
| def filter(self, record): | |
| if not hasattr(record, 'batchId'): | |
| record.batchId = BATCH_ID | |
| if not hasattr(record, 'traceId'): | |
| record.traceId = '--' | |
| if not hasattr(record, 'messageId'): | |
| record.messageId = '--' | |
| return super().filter(record) | |
| LOGGING_CONFIG = { | |
| 'version': 1, | |
| 'disable_existing_loggers': False, | |
| 'formatters': { | |
| 'standard': { | |
| 'format': '%(asctime)s %(levelname)s BATCH_ID="%(batchId)s" TRACE_ID="%(traceId)s" MESSAGE_ID="%(messageId)s" %(message)s', | |
| }, | |
| }, | |
| 'filters': { | |
| 'message_params': { | |
| '()': 'kafka_send_file.SystemLogFilter', | |
| } | |
| }, | |
| 'handlers': { | |
| 'default': { | |
| 'level': 'DEBUG', | |
| 'formatter': 'standard', | |
| 'class': 'logging.StreamHandler', | |
| 'stream': 'ext://sys.stdout', # Default is stderr | |
| 'filters': ['message_params'], | |
| }, | |
| }, | |
| 'loggers': { | |
| '': { # root logger | |
| 'handlers': ['default'], | |
| 'level': 'DEBUG', | |
| 'propagate': False | |
| }, | |
| 'kafka': { # root logger | |
| 'handlers': ['default'], | |
| 'level': 'WARNING', | |
| 'propagate': False | |
| }, | |
| '__main__': { # if name == 'main' | |
| 'handlers': ['default'], | |
| 'level': 'DEBUG', | |
| 'propagate': False | |
| }, | |
| } | |
| } | |
| logging.config.dictConfig(LOGGING_CONFIG) | |
| def read_file_contents(filename): | |
| try: | |
| with open(filename, 'r') as file: | |
| return file.read() | |
| except FileNotFoundError: | |
| logging.error(f"File '{filename}' not found") | |
| raise | |
| def file_to_message(filename): | |
| content = read_file_contents(filename) | |
| logging.debug('read file %s', filename) | |
| try: | |
| data = json.loads(content) | |
| except json.JSONDecodeError as e: | |
| logging.error(f"Error decoding JSON from file '{filename}': {e}") | |
| raise | |
| # append message and trace ids in message payload | |
| data["messageId"] = str(uuid4()) | |
| data["traceId"] = str(uuid4()) | |
| data["batchId"] = BATCH_ID | |
| return data | |
| def create_kafka_producer(kafka_broker, username=None, password=None): | |
| """Create and return a KafkaProducer instance""" | |
| producer = KafkaProducer( | |
| bootstrap_servers=[kafka_broker], | |
| value_serializer=lambda x: json.dumps(x).encode('utf-8'), | |
| security_protocol="SASL_SSL" if username and password else 'PLAINTEXT', | |
| sasl_mechanism="SCRAM-SHA-512" if username and password else None, | |
| sasl_plain_username=username, | |
| sasl_plain_password=password, | |
| # Optimization: batch messages for better throughput | |
| batch_size=32768, # 32KB batches | |
| linger_ms=100, # Wait up to 100ms for batching | |
| acks='all' # Confirm when all replicas acknowledge | |
| ) | |
| return producer | |
| def send_message_to_kafka(producer, message, topic_name, partition=None): | |
| """Send message using existing producer (for bulk operations)""" | |
| topic_key = f"script-send-{message['traceId']}" | |
| future = producer.send(topic_name, message, topic_key.encode('utf-8'), partition=partition) | |
| logging.debug( | |
| 'Message sent to Kafka topic %s (async)', | |
| topic_name, extra=dict(traceId=message['traceId'], messageId=message['messageId']) | |
| ) | |
| return future | |
| def send_message_to_kafka_sync(message, kafka_broker, topic_name, partition=None, username=None, password=None): | |
| """Send a single message synchronously (for -f option)""" | |
| producer = create_kafka_producer(kafka_broker, username, password) | |
| logging.debug( | |
| "KafkaProducer initialized with broker: %s, topic: %s, partition: %s, username: %s", | |
| kafka_broker, topic_name, partition, username | |
| ) | |
| topic_key = f"script-send-{message['traceId']}" | |
| producer.send(topic_name, message, topic_key.encode('utf-8'), partition=partition) | |
| producer.flush() | |
| producer.close() | |
| logging.debug( | |
| 'Message %s sent to %s Kafka topic %s successfully', | |
| topic_key, topic_name, kafka_broker, extra=dict(traceId=message['traceId'], messageId=message['messageId']) | |
| ) | |
| def list_files(filename): | |
| with open(filename, 'r') as r: | |
| lines = r.read().splitlines() | |
| [ logging.warning("%s is not a valid file", line) for line in lines if line and not isfile(line) ] | |
| return [line for line in lines if line and isfile(line)] | |
| def send_file_to_kafka(filename, kafka_broker, topic_name, partition=None, username=None, password=None, producer=None): | |
| message = file_to_message(filename) | |
| if producer: | |
| # Bulk send mode: use shared producer (async) | |
| send_message_to_kafka(producer, message, topic_name, partition) | |
| else: | |
| # Single file mode: create producer, send, and close (sync) | |
| send_message_to_kafka_sync(message, kafka_broker, topic_name, partition, username, password) | |
| logging.info( | |
| 'Message of file %s sent to %s Kafka topic %s successfully', | |
| filename, topic_name, kafka_broker, extra=dict(traceId=message['traceId'], messageId=message['messageId']) | |
| ) | |
| def main(): | |
| logging.info("Starting the script") | |
| parser = argparse.ArgumentParser(description='Send file contents to Kafka topic', prefix_chars='-') | |
| parser.add_argument('-f', '--filename', metavar='filename', type=str, default=None, help='Name of file to read') | |
| parser.add_argument('-b', '--kafka-broker', | |
| metavar='kafka-broker', type=str, default='127.0.0.1:9092', help='Kafka broker address') | |
| parser.add_argument('--bulk-send', | |
| metavar='bulk-send', type=str) | |
| parser.add_argument('-t', '--topic-name', | |
| metavar='topic-name', type=str, required=True, help='Kafka topic name') | |
| parser.add_argument('-p', '--partition', | |
| metavar='partition', type=int, default=None, help='Kafka topic partition number') | |
| parser.add_argument("--username", type=str, help="Kafka username", required=False) | |
| parser.add_argument("--password", type=str, help="Kafka password", required=False) | |
| args = parser.parse_args() | |
| if args.filename: | |
| send_file_to_kafka(args.filename, args.kafka_broker, args.topic_name, args.partition, args.username, args.password) | |
| elif args.bulk_send: | |
| files = list_files(args.bulk_send) | |
| skipped = 0 | |
| sent = 0 | |
| # Create producer once for entire bulk operation | |
| producer = create_kafka_producer(args.kafka_broker, args.username, args.password) | |
| logging.info("KafkaProducer initialized for bulk send. Files to process: %d", len(files)) | |
| try: | |
| for file in files: | |
| print("Processing file:", file) | |
| for attempt in range(1, 4): | |
| try: | |
| send_file_to_kafka(file, args.kafka_broker, args.topic_name, args.partition, args.username, args.password, producer=producer) | |
| sent += 1 | |
| break | |
| except (NoBrokersAvailable, KafkaError) as e: | |
| if attempt < 3: | |
| logging.warning( | |
| "Attempt %d/3 failed for file %s: %s. Retrying in 3s...", | |
| attempt, file, e | |
| ) | |
| time.sleep(3) | |
| else: | |
| logging.error( | |
| "Skipping file %s after 3 failed attempts: %s", | |
| file, e | |
| ) | |
| skipped += 1 | |
| finally: | |
| # Flush and close producer | |
| logging.info("Flushing remaining messages...") | |
| producer.flush() | |
| producer.close() | |
| if skipped or sent: | |
| logging.warning("Bulk send completed. %d file(s) sent, %d file(s) skipped due to broker errors.", sent, skipped) | |
| logging.info("Script finished") | |
| if name == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Code review notes: