Last active
November 21, 2024 13:36
-
-
Save ram-pi/03d7056a1f0a692e8db4be9fa3fcdb5a to your computer and use it in GitHub Desktop.
Change replication factor script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
# Usage: change_replication_factor.sh <bootstrap-server> <broker-list> <topic-file> <replication-factor> <min-isr> <extra-var> | |
# Example: ./change_replication_factor.sh localhost:9092 1,2,3 /tmp/topic.txt 3 2 --command-config client.properties | |
# Example: export PATH=$PATH:/path/to/kafka/cli/bin ; ./change_replication_factor.sh localhost:9092 1,2,3 /tmp/topic.txt 3 2 --command-config client.properties | |
# Requires: jq, kafka-topics, kafka-reassign-partitions, kafka-configs | |
# Author: @ram-pi | |
# Version: 0.3 | |
# Date: 2024-11-21 | |
# Description: Change replication factor and min.insync.replicas for topics defined in a file | |
# NOTE: It will not work for topics with a replication factor greater then the number of brokers in the broker-list | |
if [ $# -eq 0 ]; then | |
echo "Usage: change_replication_factor.sh <bootstrap-server> <broker-list> <topic-file> <replication-factor> <min-isr> <extra-var>" | |
echo "Example: ./change_replication_factor.sh localhost:9092 1,2,3 /tmp/topic.txt 3 2 --command-config client.properties" | |
echo "Example: export PATH=\$PATH:/path/to/kafka/cli/bin ; ./change_replication_factor.sh localhost:9092 1,2,3 /tmp/topic.txt 3 2 --command-config client.properties" | |
exit 1 | |
fi | |
# Create topics template | |
echo '{"topics":[{"topic":"!TOPIC!"}],"version":1}' | jq > topics.template.json | |
BOOTSTRAP_SERVER=$1 | |
shift | |
BROKER_LIST=$1 | |
shift | |
TOPIC_FILE=$1 | |
shift | |
RF=$1 | |
shift | |
MIN_ISR=$1 | |
shift | |
# shuffle array | |
# https://stackoverflow.com/questions/5533569/simple-method-to-shuffle-the-elements-of-an-array-in-bash-shell | |
shuffle_array() { | |
local arr=("$@") | |
local i | |
for ((i=${#arr[@]}-1; i>0; i--)); do | |
local j=$((RANDOM % (i+1))) | |
local temp=${arr[i]} | |
arr[i]=${arr[j]} | |
arr[j]=$temp | |
done | |
echo "${arr[@]}" | |
} | |
EXTRA_VAR=("$@") | |
echo "Bootstrap server: $BOOTSTRAP_SERVER" | |
echo "Broker list: $BROKER_LIST" | |
echo "Topic file list: $TOPIC_FILE" | |
echo "Replication factor: $RF" | |
echo "Min ISR: $MIN_ISR" | |
echo "EXTRA_VAR:" "${EXTRA_VAR[@]}" | |
# Get list of topics | |
# topics=$(kafka-topics --bootstrap-server "$BOOTSTRAP_SERVER" --list "${EXTRA_VAR[@]}") | |
topics=$(cat $TOPIC_FILE) | |
# Iterate over topics | |
for topic in $topics; do | |
# Reassign partitions for topic | |
echo "Reassign topic: $topic..." | |
sed -i'.bak' "s/\!TOPIC\!/$topic/" topics.template.json | |
mv topics.template.json topics.json ; mv topics.template.json.bak topics.template.json | |
# generate proposed assignement | |
echo "Generate proposed assignment for topic: $topic..." | |
cat topics.json | |
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --topics-to-move-json-file topics.json --broker-list "$BROKER_LIST" --generate "${EXTRA_VAR[@]}" | \ | |
tee >(awk -F: '/Current partition replica assignment/ { getline; print $0 }' | \ | |
jq > /tmp/topic.current.json) >(awk -F: '/Proposed partition reassignment configuration/ { getline; print $0 }' | jq > /tmp/topic.proposed.json) | |
jq 'del(.partitions[] | .log_dirs)' /tmp/topic.proposed.json > /tmp/topic.reassign.json | |
# initialize replicas array | |
jq '.partitions[].replicas |= []' /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json | |
# Split string into array | |
IFS=',' read -ra array <<< "$BROKER_LIST" | |
# Loop over array | |
LEN=$( echo "$D" | jq '.partitions | length' /tmp/topic.reassign.json ) | |
echo "Partition: $LEN" | |
cp /tmp/topic.reassign.json /tmp/topic.reassign.json.bak | |
for (( i=0; i<$LEN; i++ )) ; do | |
shuffled_array=($(shuffle_array "${array[@]}")) | |
# loop over shuffled array until number $RF is reached | |
for element in "${shuffled_array[@]:0:$RF}" | |
do | |
echo "Partition: $i" | |
echo "Brokers: $element" | |
#jq --arg broker "$element" --arg partition "$i" '.partitions[$partition].replicas += [$broker|tonumber]' /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json | |
jq ".partitions[$i].replicas += [$element|tonumber]" /tmp/topic.reassign.json > /tmp/tmp.json ; mv /tmp/tmp.json /tmp/topic.reassign.json | |
cat /tmp/topic.reassign.json | |
done | |
done | |
# Reassign partitions | |
# echo "Reassign partitions for topic: $topic..." | |
# ls -alh /tmp/topic.reassign.json | |
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --reassignment-json-file /tmp/topic.reassign.json --execute "${EXTRA_VAR[@]}" | |
kafka-reassign-partitions --bootstrap-server "$BOOTSTRAP_SERVER" --reassignment-json-file /tmp/topic.reassign.json --verify "${EXTRA_VAR[@]}" | |
# update min.insync.replicas | |
kafka-configs --bootstrap-server "$BOOTSTRAP_SERVER" --alter --entity-type topics --entity-name "$topic" --add-config min.insync.replicas="$MIN_ISR" "${EXTRA_VAR[@]}" | |
done |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment