Skip to content

Instantly share code, notes, and snippets.

@filipecifali
Created April 6, 2025 01:25
Show Gist options
  • Save filipecifali/9c1c8d9e23110aca51592b848c5fab02 to your computer and use it in GitHub Desktop.
Save filipecifali/9c1c8d9e23110aca51592b848c5fab02 to your computer and use it in GitHub Desktop.
import subprocess
import json
def check_docker_bin():
"""
Package: docker
"""
try:
r = subprocess.run(["docker", "version", "-f", "json"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to access Docker Binary")
return ("Docker version:", json.loads(r.stdout)['Client']['Version'])
def check_kubectl_bin():
"""
Download URL: https://kubernetes.io/docs/tasks/tools/install-kubectl-linux/#install-kubectl-binary-with-curl-on-linux
Download cURL: curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl"
"""
try:
r = subprocess.run(["kubectl", "version", "-o", "json", "--client=true"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to access kubectl Binary")
return ("Kubectl version:", json.loads(r.stdout)['clientVersion']['gitVersion'])
def check_git_bin():
"""
Package: git
"""
try:
r = subprocess.run(["git", "version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to access git Binary")
return ("Git version:", r.stdout.split(" ")[2].rstrip("\n"))
def check_kind_bin():
"""
Download URL: https://kind.sigs.k8s.io/docs/user/quick-start/#installing-with-a-package-manager
Download cURL: [ $(uname -m) = x86_64 ] && curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.26.0/kind-linux-amd64
"""
try:
r = subprocess.run(["kind", "version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to access kind Binary")
return ("Kind version:", r.stdout.split(" ")[1])
def check_bins():
check_docker_bin()
check_kubectl_bin()
check_git_bin()
check_kind_bin()
def check_kind_clusters():
r = subprocess.run(["kind", "get", "clusters"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if r.stderr.rstrip('\n') == "No kind clusters found.":
return create_kind_cluster()
else:
return ("Kind cluster already exists:", r.stdout.rstrip('\n'))
def create_kind_cluster():
try:
r = subprocess.run(["kind", "create", "cluster", "--config", "kind-config.yaml"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to create the kind cluster")
return ("Kind cluster 'local' created")
def configure_kubectl_context():
try:
r = subprocess.run(["kubectl", "config", "use-context", "kind-local"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to update kubectl current context")
return ("kubectl config use-context kind-local ran successfully.")
def check_kind_access():
try:
r = subprocess.run(["kubectl", "get", "nodes", "-o", "json"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
except subprocess.CalledProcessError:
return ("Unable to connect to local kind cluster")
return ("Successfully contact local kind control-plane, total number of nodes:", len(json.loads(r.stdout)["items"]))
if __name__ == "__main__":
check_kind_clusters()
configure_kubectl_context()
print(check_kind_access())
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
name: local
nodes:
- role: control-plane
- role: worker
labels:
app: backend
- role: worker
labels:
app: backend
- role: worker
labels:
app: frontend
- role: worker
labels:
app: frontend
- role: worker
labels:
app: db
- role: worker
labels:
app: db
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment