Here is a self-contained Python script designed to run inside a Kubernetes pod (such as an Ubuntu container). It uses only standard Python libraries, meaning you won't need to install requests or the kubernetes SDK inside your container.
This script checks the service account directory, verifies file permissions, extracts and decodes the JWT token to show its claims (like the service account name), and attempts a live connection test to the Kubernetes API server using that token.
import os
import json
import base64
import ssl
import urllib.request
import urllib.error
SA_PATH = '/run/secrets/kubernetes.io/serviceaccount'
def test_service_account():
print("==================================================")
print("=== KUBERNETES SERVICE ACCOUNT PERMISSION TEST ===")
print("==================================================\n")
# 1. Check Directory Existence
print(f"## 1. Checking Directory: {SA_PATH}")
if not os.path.exists(SA_PATH):
print(f"❌ [ERROR] Directory '{SA_PATH}' does not exist.")
print(" Ensure this pod has 'automountServiceAccountToken: true' in its spec.\n")
return
try:
files = os.listdir(SA_PATH)
print(f"✅ [OK] Directory is readable. Found files: {files}\n")
except Exception as e:
print(f"❌ [ERROR] Failed to list directory contents: {e}\n")
return
# 2. Read Namespace
print("## 2. Reading Namespace")
namespace = None
try:
with open(os.path.join(SA_PATH, 'namespace'), 'r') as f:
namespace = f.read().strip()
print(f"✅ [OK] Mounted Namespace: '{namespace}'\n")
except Exception as e:
print(f"❌ [ERROR] Could not read namespace file: {e}\n")
# 3. Read and Decode Token
print("## 3. Reading and Parsing Token")
token = None
try:
with open(os.path.join(SA_PATH, 'token'), 'r') as f:
token = f.read().strip()
print(f"✅ [OK] Token read successfully ({len(token)} characters).")
# Manually decode JWT payload without external libraries
try:
token_parts = token.split('.')
if len(token_parts) == 3:
payload = token_parts[1]
# Fix Base64 padding if necessary
payload += '=' * (4 - len(payload) % 4)
decoded_bytes = base64.b64decode(payload)
token_data = json.loads(decoded_bytes.decode('utf-8'))
print("🔒 [INFO] Decoded JWT Metadata:")
print(f" - Service Account: {token_data.get('kubernetes.io', {}).get('serviceaccount', {}).get('name')}")
print(f" - Pod Name: {token_data.get('kubernetes.io', {}).get('pod', {}).get('name')}")
print(f" - Iss/Sub: {token_data.get('sub')}")
else:
print("⚠️ [WARN] Token format doesn't look like a standard standard 3-part JWT.")
except Exception as jwt_err:
print(f"⚠️ [WARN] Could not parse JWT token payload: {jwt_err}")
print()
except Exception as e:
print(f"❌ [ERROR] Could not read token file: {e}\n")
# 4. Check CA Certificate
print("## 4. Checking CA Certificate")
ca_path = os.path.join(SA_PATH, 'ca.crt')
if os.path.exists(ca_path):
print(f"✅ [OK] 'ca.crt' is present and readable.\n")
else:
print(f"⚠️ [WARN] 'ca.crt' not found at {ca_path}.\n")
# 5. Live Test Against Kubernetes API Server
print("## 5. Testing Live API Permissions")
host = os.environ.get('KUBERNETES_SERVICE_HOST')
port = os.environ.get('KUBERNETES_SERVICE_PORT')
if not host or not port:
print("❌ [ERROR] KUBERNETES_SERVICE_HOST or PORT env variables are missing.")
print(" Are you sure this script is running inside a Kubernetes cluster?\n")
return
if not token or not namespace:
print("❌ [SKIP] Cannot test API connectivity because token or namespace is missing.\n")
return
# We will attempt to list pods in the current namespace as a baseline check
api_url = f"https://{host}:{port}/api/v1/namespaces/{namespace}/pods"
print(f" Attempting GET request to: {api_url}")
req = urllib.request.Request(api_url)
req.add_header('Authorization', f'Bearer {token}')
req.add_header('Accept', 'application/json')
# Create an SSL context using the internal CA cert to avoid verification failures
try:
context = ssl.create_default_context(cafile=ca_path)
except Exception as e:
print(f"⚠️ [WARN] Failed to initialize SSL context with ca.crt ({e}). Falling back to unverified SSL.")
context = ssl._create_unverified_context()
try:
with urllib.request.urlopen(req, context=context, timeout=5) as response:
print(f"✅ [SUCCESS] API call allowed! HTTP Status: {response.status}")
data = json.loads(response.read().decode())
pods_found = len(data.get('items', []))
print(f" RBAC Permission Verified: This service account CAN list pods in '{namespace}' (Found {pods_found} pods).")
except urllib.error.HTTPError as e:
if e.code in [401, 403]:
print(f"❌ [FORBIDDEN/UNAUTHORIZED] API responded with HTTP {e.code}.")
print(" The Service Account token is valid, but it DOES NOT have RBAC permissions (Role/ClusterRole) to list pods in this namespace.")
else:
print(f"❌ [ERROR] API Server responded with HTTP {e.code}: {e.reason}")
try:
# Try to print out the K8s API error message for extra context
err_details = json.loads(e.read().decode())
print(f" Details: {err_details.get('message')}")
except:
pass
except Exception as e:
print(f"❌ [CONNECTION FAILED] Could not reach the API server: {e}")
print("\n==================================================")
if __name__ == '__main__':
test_service_account()If you are already interactively attached to your Ubuntu container (e.g., via kubectl exec -it <pod-name> -- bash), follow these steps:
- Create the file:
cat << 'EOF' > k8s_sa_test.py
# Paste the python script here
EOF
- Run the script:
python3 k8s_sa_test.py
- If Section 1 & 2 fail: The Service Account token wasn't mounted. Check your Pod Spec to ensure
automountServiceAccountTokenisn't set tofalse. - If Section 5 returns HTTP 403 (Forbidden): The token is perfectly healthy, but your Kubernetes Cluster blocks it. You need to bind a
RoleorClusterRoleto thisServiceAccountvia aRoleBindingto give it the access you need.
Turned this into something more useful at https://gitlab.com/flrichar/appcd/-/tree/dc38851e42e076a34c86e3eb396a3b159f0ae694/dev/tools ...
why stop at pods?