Skip to content

Instantly share code, notes, and snippets.

@johnnymo87
Last active September 18, 2024 20:57
Show Gist options
  • Save johnnymo87/2e37e784a6abc4f3ef1e58eb28822f24 to your computer and use it in GitHub Desktop.
Save johnnymo87/2e37e784a6abc4f3ef1e58eb28822f24 to your computer and use it in GitHub Desktop.
Kafka Partition End Offset Checker
#!/usr/bin/env bash
# Kafka Partition End Offset Checker
# ==================================
#
# This script queries a Kafka cluster to retrieve the end offset information for all partitions
# of a specified topic. It provides the log end offset for each partition, which represents
# the offset where the next message would be appended.
#
# Usage:
# ./kafka_end_offsets.bash
#
# Prerequisites:
# - kafkacat (kcat) must be installed and available in the PATH
# - jq must be installed and available in the PATH
# - Proper Kafka broker connection details must be set in the KAFKA_CONSUMERS environment variable
#
# Environment Variables:
# KAFKA_CONSUMERS: Kafka broker connection string (e.g., "localhost:9092")
# TOPIC: Kafka topic name to query (default: "recipe_events")
#
# Output:
# For each partition, the script will output:
# - Topic name
# - Partition number
# - Log end offset
#
# Note:
# - This script shows the end offset for each partition, which is the offset where the next message would be appended.
# - It does not provide information about consumer progress or lag.
#
# Error Handling:
# - The script will exit immediately if any command fails (set -e)
# - It will treat unset variables as an error (set -u)
# - It will exit if any command in a pipeline fails (set -o pipefail)
# Exit immediately if a command exits with a non-zero status.
set -e
# Treat unset variables as an error when substituting.
set -u
# The return value of a pipeline is the status of the last command to exit with a non-zero status,
# or zero if no command exited with a non-zero status.
set -o pipefail
KAFKA_BROKERS=${KAFKA_CONSUMERS:-"localhost:9092"}
TOPIC=${TOPIC:-"recipe_events"}
echo "Querying end offsets for topic: $TOPIC"
echo "Using Kafka brokers: $KAFKA_BROKERS"
echo "----------------------------------------"
# Get the list of partitions
partitions=$(kafkacat -L -b $KAFKA_BROKERS -t $TOPIC -J | jq -r '.topics[0].partitions[].partition')
for partition in $partitions; do
# Get the log end offset
log_end_offset=$(kafkacat -b $KAFKA_BROKERS -C -t $TOPIC -p $partition -o end -c 1 -e 2>&1 | grep -oP 'offset \K\d+' || echo 0)
echo "Topic: $TOPIC, Partition: $partition, Log End Offset: $log_end_offset"
done
echo "----------------------------------------"
echo "End offset query completed."
@johnnymo87
Copy link
Author

johnnymo87 commented Sep 18, 2024

To copy into a pod

kstage-green --namespace consumer cp print-offsets.bash blueaproncom-ba-exec-1726687909:print-offsets.bash

To run in the pod after copying

chmod +x print-offsets.bash 
./print-offsets.bash

Example output

Querying end offsets for topic: recipe_events
Using Kafka brokers: kafka-new.staging.b--a.cloud
----------------------------------------
Topic: recipe_events, Partition: 0, Log End Offset: 643
Topic: recipe_events, Partition: 1, Log End Offset: 659
Topic: recipe_events, Partition: 2, Log End Offset: 751
Topic: recipe_events, Partition: 3, Log End Offset: 671
Topic: recipe_events, Partition: 4, Log End Offset: 590
Topic: recipe_events, Partition: 5, Log End Offset: 885
Topic: recipe_events, Partition: 6, Log End Offset: 787
Topic: recipe_events, Partition: 7, Log End Offset: 790
Topic: recipe_events, Partition: 8, Log End Offset: 682
Topic: recipe_events, Partition: 9, Log End Offset: 530
Topic: recipe_events, Partition: 10, Log End Offset: 571
Topic: recipe_events, Partition: 11, Log End Offset: 828
----------------------------------------
End offset query completed.

Often paired with

Read the last 10 messages off the end of a partition 5 and parse them from protobuf to json:

for offset in $(seq 875 885); do
  echo $offset;
  kafkacat -C -b $KAFKA_CONSUMERS -t recipe_events -p 5 -o $offset -c 1 -D "" | \
    bundle exec rails runner "puts Blueapron::Proto::CulinaryService::Events::RecipeEvent.decode(STDIN.read).to_json" | \
    jq '.'
done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment