Skip to content

Instantly share code, notes, and snippets.

@ram-pi
Created June 18, 2025 13:33
Show Gist options
  • Select an option

  • Save ram-pi/0835790f564f38b3c8343976d2e56c2a to your computer and use it in GitHub Desktop.

Select an option

Save ram-pi/0835790f564f38b3c8343976d2e56c2a to your computer and use it in GitHub Desktop.
#!/usr/bin/env bash
# Check if partition and offset are provided
if [ "$#" -lt 2 ]; then
echo "Usage: $0 <partition> <offset> [topic]"
echo " partition: The partition number to read from"
echo " offset: The offset to start reading from"
echo " topic: Optional topic name (defaults to 'test')"
exit 1
fi
PARTITION=$1
OFFSET=$2
TOPIC=${3:-"test"} # Default to "test" if not provided
# Configuration paths
SCHEMA_PATH="./schema/schema-test-value-v1.avsc"
CONFIG_PATH="./kafka/client.properties"
REGISTRY_PATH="./kafka/registry.properties"
# Extract schema registry details
SCHEMA_REGISTRY_URL=$(grep "schemaRegistryURL" $REGISTRY_PATH | cut -d'=' -f2)
SCHEMA_REGISTRY_USER=$(grep "schemaRegistryUser" $REGISTRY_PATH | cut -d'=' -f2)
SCHEMA_REGISTRY_PASSWORD=$(grep "schemaRegistryPassword" $REGISTRY_PATH | cut -d'=' -f2)
# Extract bootstrap servers
BOOTSTRAP_SERVERS=$(grep "bootstrap.servers" $CONFIG_PATH | cut -d'=' -f2)
echo "=== Single Message Consumer ==="
echo "Topic: $TOPIC"
echo "Partition: $PARTITION"
echo "Offset: $OFFSET"
# Construct the command
KAFKA_CMD="kafka-avro-console-consumer \
--bootstrap-server $BOOTSTRAP_SERVERS \
--consumer.config $CONFIG_PATH \
--topic $TOPIC \
--property schema.registry.url=$SCHEMA_REGISTRY_URL \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD \
--partition $PARTITION \
--offset $OFFSET \
--max-messages 1 \
--property print.key=true \
--property print.offset=true \
--property print.partition=true \
--property print.timestamp=true"
# Print the command (hiding password for security)
echo "Command to be executed:"
#echo "$KAFKA_CMD" | gsed -e "s|$SCHEMA_REGISTRY_PASSWORD|********|g"
echo "$KAFKA_CMD"
# Consume the single message
echo "Reading single message from topic: $TOPIC, partition: $PARTITION, offset: $OFFSET"
kafka-avro-console-consumer \
--bootstrap-server $BOOTSTRAP_SERVERS \
--consumer.config $CONFIG_PATH \
--topic $TOPIC \
--property schema.registry.url=$SCHEMA_REGISTRY_URL \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD \
--partition $PARTITION \
--offset $OFFSET \
--max-messages 1 2> /dev/null | tee /tmp/message.json
#--property print.key=true \
#--property print.offset=true \
#--property print.partition=true \
#--property print.timestamp=true \
# Ask if user wants to send the message back
echo ""
echo "Message consumed successfully and saved to /tmp/message.json"
echo ""
read -p "Do you want to send this message back using kafka-avro-console-producer? (y/N): " -n 1 -r
echo ""
if [[ $REPLY =~ ^[Yy]$ ]]; then
echo "Sending message back to topic $TOPIC..."
kafka-avro-console-producer \
--bootstrap-server $BOOTSTRAP_SERVERS \
--producer.config $CONFIG_PATH \
--topic $TOPIC \
--property schema.registry.url=$SCHEMA_REGISTRY_URL \
--property basic.auth.credentials.source=USER_INFO \
--property basic.auth.user.info=$SCHEMA_REGISTRY_USER:$SCHEMA_REGISTRY_PASSWORD \
--property value.schema.file=$SCHEMA_PATH < /tmp/message.json
echo "Message sent successfully!"
else
echo "Skipping message send."
fi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment