Skip to content

Instantly share code, notes, and snippets.

@knthls
Last active September 10, 2022 19:58
Show Gist options
  • Select an option

  • Save knthls/7734001996bf951484ee71cd49d62e1a to your computer and use it in GitHub Desktop.

Select an option

Save knthls/7734001996bf951484ee71cd49d62e1a to your computer and use it in GitHub Desktop.
A demo implementation to use consus distributed semaphore system. No guarantees of whatsoever are given. Feel free to use and modify.
#!/usr/bin/env bash
set -Eeuo pipefail
trap cleanup SIGINT SIGTERM ERR EXIT
script_dir=$(cd "$(dirname "${BASH_SOURCE[0]}")" &>/dev/null && pwd -P)
usage() {
cat <<EOF
Usage: $(basename "${BASH_SOURCE[0]}") [-h] [-v] -s my_service/semaphore -t my-distributed-task
A demo implementation of a consul distributed semaphore as shown here
https://learn.hashicorp.com/tutorials/consul/distributed-semaphore
Consul is assumed to be configured using mTLS.
Dependencies are bash, curl and jq.
Available options:
-h, --help Print this help and exit
-v, --verbose Print script debug info
-s, --session-key-prefix Key prefix for consul k/v store
-t, --task-name Task name to associate with metadata
-l, --slot-limit Semaphore slot limit, defaults to 1
-f, --state-file File to write the state to - states are leader and
follower. Defaults to .state.
--host Name of the node to tie the session to - defaults to hostname
Thanks for their helpful blogposts go to
* Maciej Radzikowski for the article https://betterdev.blog/minimal-safe-bash-script-template/
* Emilien Kenler for https://blog.emilienkenler.com/2017/05/20/distributed-locking-with-consul/
and of course for the poeple at hashicorp for their amazing work.
EOF
exit
}
cleanup() {
trap - SIGINT SIGTERM ERR EXIT
[[ -n "${session_id-}" ]] && release_semaphore_slot "${session_id}"
[[ -n "${session_id-}" ]] && delete_lock_contender_entry "${session_id}"
[[ -n "${session_id-}" ]] && destroy_session "${session_id}"
}
msg() {
echo >&2 -e "${1-}"
return 0
}
die() {
local msg=$1
local code=${2-1} # default exit status 1
msg "$msg"
exit "$code"
}
parse_params() {
# default values of variables set from params
session_key_prefix=''
host=$(hostname)
task_name=''
slot_limit=1
state_file="state"
while :; do
case "${1-}" in
-h | --help) usage ;;
-v | --verbose) set -x ;;
-s | --session-key-prefix)
session_key_prefix="${2-}"
shift
;;
-t | --task-name)
task_name="${2-}"
shift
;;
-l | --slot-limit)
slot_limit="${2-}"
shift
;;
-f | --state-file)
state_file="${2-}"
shift
;;
-H | --host)
host="${2-}"
shift
;;
-?*) die "Unknown option: $1" ;;
*) break ;;
esac
shift
done
args=("$@")
# check required params and arguments
[[ -z "${session_key_prefix-}" ]] && die "Missing argument: session-key-prefix"
[[ -z "${task_name-}" ]] && die "Missing argument: task-name"
return 0
}
verify_consul_env_vars() {
msg "checking required environment variables"
local required_vars=(
CONSUL_CLIENT_CERT
CONSUL_CLIENT_KEY
CONSUL_CACERT
CONSUL_HTTP_ADDR
CONSUL_TOKEN
)
for var in "${required_vars[@]}"; do
[[ -z "${!var-}" ]] && die "Missing required environment variable $var"
done
return 0
}
verify_consul_tls_files_exist() {
msg "checking required files exist"
local required_files=(
CONSUL_CLIENT_CERT
CONSUL_CLIENT_KEY
CONSUL_CACERT
)
for file in "${required_files[@]}"; do
[[ ! -f "${!file-}" ]] && die "Not a file ${!file}"
printf ' - %-70s [OK]\n' "${!file}"
done
return 0
}
init_session() {
msg "Initialize session on host $host"
local endpoint="${CONSUL_HTTP_ADDR}/v1/session/create"
cat <<EOF | curl "${curl_args[@]}" -X PUT $endpoint --data-binary "@-" | jq -r '.ID'
{
"Name": "${task_name}",
"Node": "${host}",
"Behavior": "release"
}
EOF
return 0;
}
destroy_session() {
local session_id="${1}"
msg "Destroy session $session_id on host $host"
local endpoint="${CONSUL_HTTP_ADDR}/v1/session/destroy/$session_id"
result=$(curl "${curl_args[@]}" -X PUT $endpoint)
[ "${result}" == "true" ] && msg "Session ${session_id} destroyed"
}
check_session() {
local session_id="${1}"
msg "Check session $session_id on host $host"
local endpoint="${CONSUL_HTTP_ADDR}/v1/session/info/${session_id}"
local result=$(curl "${curl_args[@]}" "$endpoint")
local n_entries=$(echo "$result" | jq -r 'length')
if (( "$n_entries" > 0 )); then
msg "Session ${session_id} is valid" && return 0
fi
die "Session was invalidated"
}
acquire_lock_contender_entry() {
local session_id="${1}"
local endpoint="$CONSUL_HTTP_ADDR/v1/kv/$session_key_prefix/$session_id?acquire=$session_id"
msg "Trying to acquire lock contender entry via endpoint $endpoint"
local result=$(cat <<EOF | curl "${curl_args[@]}" -X PUT "$endpoint" --data-binary "@-"
{
"Name": "${task_name}"
}
EOF
)
[ "${result}" == "true" ] && msg "Lock acquired on session ${session_id}" && return 0;
return 1;
}
delete_lock_contender_entry() {
local session_id=${1}
local endpoint="$CONSUL_HTTP_ADDR/v1/kv/$session_key_prefix/$session_id"
echo "Delete lock contender entry."
local result=$(curl "${curl_args[@]}" -X DELETE $endpoint)
[ "${result}" == "true" ] && echo "Lock entry deleted"
}
# the return value of this function indicates if the process has successfully acquired
# a slot on the first try
init_semaphore() {
session_id="${1}"
slot_limit="${2}"
local endpoint="$CONSUL_HTTP_ADDR/v1/kv/$session_key_prefix/.lock?cas=0"
result=$(cat <<EOF | curl "${curl_args[@]}" -X PUT "$endpoint" --data-binary "@-"
{
"Limit": ${slot_limit},
"Holders": ["${session_id}"]
}
EOF
)
[ "${result}" == "true" ] && msg "Slot acquired" && return 0;
return 1;
}
get_semaphore_state() {
msg "Query semaphore state"
local endpoint="$CONSUL_HTTP_ADDR/v1/kv/$session_key_prefix?recurse"
curl "${curl_args[@]}" "$endpoint"
}
get_semaphore_holders() {
local state="${1}"
echo "$state" | jq -r ".[0].Value" | base64 -d | jq -r '.Holders | .[]'
}
get_active_sessions() {
local state="${1}"
echo "$state" | jq -r ".[] | select(.Key!=\"$session_key_prefix/.lock\") | .Session"
}
prune_semaphore_holders() {
# compare semaphore holders to active sessions and return holders that are alive
local state="${1}"
local holders=$(get_semaphore_holders "$state")
local sessions="$(get_active_sessions "$state")"
echo ${sessions[@]} ${holders[@]} | tr ' ' '\n' | sort | uniq -d
}
acquire_semaphore_slot() {
msg "Acquire semaphore slot"
local new_holders="${1}"
local lock_modify_index="${2}"
local endpoint="$CONSUL_HTTP_ADDR/v1/kv/$session_key_prefix/.lock?cas=$lock_modify_index"
result=$(cat <<EOF | jq | curl "${curl_args[@]}" -X PUT "$endpoint" --data-binary "@-"
{
"Limit": ${slot_limit},
"Holders": $(echo "$new_holders" | tr ' ' '\n' | jq -nR '[ inputs | select(length > 0) ]')
}
EOF
)
[ "${result}" == "true" ] && msg "Slot acquired" && return 0;
return 1
}
release_semaphore_slot() {
msg "Release semaphore slot"
local session_id="${1}"
local state=$(get_semaphore_state)
local holders=$(get_semaphore_holders "$state")
local lock_modify_index=$(get_modify_index "$state")
local endpoint="$CONSUL_HTTP_ADDR/v1/kv/$session_key_prefix/.lock?cas=$lock_modify_index"
result=$(cat <<EOF | curl "${curl_args[@]}" -X PUT "$endpoint" --data-binary "@-"
{
"Limit": ${slot_limit},
"Holders": $(echo "$holders" | tr ' ' '\n' | grep -v "$session_id" | jq -nR '[ inputs | select(length > 0) ]')
}
EOF
)
[ "${result}" == "true" ] && msg "Slot released"
}
get_modify_index() {
local state="${1}"
echo "$state" | jq -r ".[] | select(.Key==\"$session_key_prefix/.lock\") | .ModifyIndex"
}
########
# MAIN #
########
parse_params "$@"
# make sure that expected environment variables exist
verify_consul_env_vars
verify_consul_tls_files_exist
# these curl arguments are required for communication with consul
curl_args=(
-s
--cacert "$CONSUL_CACERT"
--key "$CONSUL_CLIENT_KEY"
--cert "$CONSUL_CLIENT_CERT"
--header "X-Consul-Token: $CONSUL_TOKEN"
)
# Initialize session
session_id=$(init_session)
[[ -z "${session_id-}" ]] && die "Could not init session"
acquire_lock_contender_entry "$session_id" \
|| die "Could not register lock contender entry."
if init_semaphore "$session_id" "$slot_limit"; then
echo "Leader" > "$state_file"
else
echo "Follower" > "$state_file"
fi
watch_semaphore() {
msg "Watch semaphore"
state=$(get_semaphore_state)
pruned=$(prune_semaphore_holders "$state")
live_holders=$(echo "$pruned" | wc -w)
msg "Found $live_holders semaphore holders that are alive"
if (( $live_holders < $slot_limit )); then
msg "Acquire Leadership"
modify_index=$(get_modify_index "$state")
acquire_semaphore_slot "$(echo $pruned $session_id)" "$modify_index" \
&& echo "Leader" > "$state_file"
fi
}
while true; do
check_session "$session_id" && watch_semaphore
sleep 5;
done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment