Introduction
k8s_audit.py is a security auditing tool for Kubernetes clusters focused on misconfiguration enumeration and penetration testing in containerised environments. All operations are strictly read-only: the tool never modifies cluster state, making it safe to run against production environments without risk of disruption or unintended side effects.
The tool is organised into twelve independent audit modules, each targeting a specific attack surface within a Kubernetes cluster. Modules can be run individually or in any combination, allowing the scope of the audit to be tailored to the specific needs of each environment. The available modules are:
- cluster-info: server version currency and node health.
- rbac: cluster-admin bindings, wildcard ClusterRoles, and default service account token automount.
- pod-security: privileged pods, dangerous Linux capabilities, host namespace sharing, and root execution.
- network-policy: namespaces without a NetworkPolicy and policies permitting traffic from any source.
- secrets: legacy service account token secrets, suspicious key names, and plaintext credentials in environment variables.
- api-server: anonymous access, Dashboard exposure, insecure port, authorisation mode, and etcd configuration.
- service-accounts: service accounts with ClusterRoleBindings and mounted tokens.
- ingress: NodePort and LoadBalancer services, Ingress objects without TLS, and catch-all rules.
- psa: PodSecurityPolicies (deprecated) and Pod Security Admission labels per namespace.
- configmaps: keys and values matching credential patterns.
- images: images from public registries and images not pinned by SHA256 digest.
- logging: API server audit log configuration and presence of monitoring agents.
Every finding is classified into one of five severity levels: CRITICAL, HIGH, MEDIUM, LOW, and INFO. The CRITICAL level indicates an immediate risk requiring urgent attention, such as unauthenticated access with cluster administrator privileges. HIGH covers serious misconfigurations that can be exploited with little additional effort, such as privileged containers or dangerous Linux capabilities that enable container escape to the underlying node. MEDIUM groups notable risks that should be addressed in the short term, such as namespaces lacking network policies or service accounts with token automount enabled. LOW captures minor hardening gaps that do not represent a direct threat but that reduce the overall security posture of the cluster, such as images without a pinned version tag or containers with a writable root filesystem. Finally, INFO reports informational items with no associated risk that may be useful as context during the analysis.
In addition to severity classification, each finding includes a detailed description of the detected problem and a concrete remediation recommendation. Output can be filtered by one or more severity levels to focus attention on the most critical issues, or suppressed entirely using quiet mode so that only a summary table is printed at the end of the run. Results can be exported to a JSON file for integration with other tools, ingestion into a SIEM, or later review without needing to reconnect to the cluster. A previously saved report can be loaded and displayed at any time using the --input option, enabling teams to share findings across environments or review the history of past audits without requiring live cluster access.
Use of the tool
The following are the different options available in the application:
--kubeconfig FILE(-k): Path to the kubeconfig file. Defaults to$KUBECONFIGor~/.kube/config.--modules MODULE...(-m): Run only the specified modules. Omit to run all modules.--output FILE(-o): Save the full report to a JSON file.--input FILE(-i): Load and display a previously saved JSON report without connecting to the cluster.--severity LEVEL...(-s): Filter output by severity level. See--helpepilog for valid levels.--quiet(-q): Suppress individual findings during the run; only print the summary table at the end.--namespaces NS...(-n): Audit only the specified namespaces.--exclude-namespaces NS...(-e): Exclude the specified namespaces and audit all others.
Here are some practical examples of possible commands to execute:
# Full audit using the default kubeconfig
python3 k8s_audit.py
# Audit only the RBAC and pod security modules
python3 k8s_audit.py --modules rbac pod-security
# Audit only the "default" and "production" namespaces
python3 k8s_audit.py --namespaces default production
# Exclude system namespaces and save the report to JSON
python3 k8s_audit.py --exclude-namespaces kube-system kube-public --output report.json
# Show only critical and high severity findings
python3 k8s_audit.py --severity CRITICAL HIGH
# Run in quiet mode (summary only) and save the report
python3 k8s_audit.py --quiet --output report.json
# Review a previously saved report
python3 k8s_audit.py --input report.json
# Review a saved report filtering only critical findings
python3 k8s_audit.py --input report.json --severity CRITICAL
# Use a custom kubeconfig with specific modules
python3 k8s_audit.py --kubeconfig ~/.kube/prod-config --modules secrets configmaps api-server
When the application is run without parameters, a standard analysis is performed, as in this example:
+--------------------------------------------------------------+
| K8S SECURITY AUDIT TOOL - Read-Only Mode |
| Kubernetes Pentesting & Misconfiguration Scanner |
+--------------------------------------------------------------+
All checks are strictly read-only. No cluster state will be modified.
[INFO] Audit started : 2026-02-16 09:32:10 UTC
[INFO] Using kubeconfig : ~/.kube/config (default)
[INFO] Namespace scope : all namespaces
[INFO] Severity filter : all
[INFO] Quiet mode : off
[INFO] Modules to run : cluster-info, rbac, pod-security, ...
------------------------------------------------------------
CLUSTER INFORMATION
------------------------------------------------------------
[INFO] Client version : v1.29.2
[INFO] Server version : v1.26.9
[MEDIUM] Kubernetes version approaching EOL
Server is running v1.26. Consider upgrading.
-> Plan upgrade to latest stable release.
[INFO] Nodes found: 3
[INFO] Node: node-01 roles=['control-plane'] conditions=['Ready']
[INFO] Node: node-02 roles=['worker'] conditions=['Ready']
[INFO] Node: node-03 roles=['worker'] conditions=['Ready']
------------------------------------------------------------
RBAC - ROLES & BINDINGS
------------------------------------------------------------
[HIGH] ServiceAccount has cluster-admin
SA 'monitoring/prometheus-sa' bound via 'prometheus-cluster-admin'.
-> Restrict to least-privilege roles.
[MEDIUM] [default] Default SA auto-mounts token
Auto-mounted tokens in every pod increase attack surface.
-> Set automountServiceAccountToken: false on the default SA.
------------------------------------------------------------
POD SECURITY - PRIVILEGED / CAPABILITIES / HOST NAMESPACES
------------------------------------------------------------
[CRITICAL] [default/debug-pod/debug] Privileged container
Privileged containers can escape to the host.
-> Remove privileged: true and use fine-grained capabilities.
[HIGH] [default/debug-pod/debug] Dangerous capabilities: {'SYS_ADMIN'}
These capabilities can be used for container escape.
-> Drop all capabilities and add only what is needed.
------------------------------------------------------------
SUMMARY REPORT
------------------------------------------------------------
Total findings: 18
CRITICAL ## 2
HIGH ##### 5
MEDIUM ######## 8
LOW ### 3
INFO 0
Source code
#!/usr/bin/env python3
"""
k8s_audit.py - Kubernetes Security Audit Tool (Read-Only Enumeration)
Performs security audits and misconfiguration checks on Kubernetes clusters.
All operations are strictly read-only; no cluster state is ever modified.
Usage modes:
- Live audit : connect to a cluster and run one or more audit modules.
- Report mode : load a previously saved JSON report and display findings.
"""
import argparse
import json
import os
import subprocess
import sys
import textwrap
from datetime import datetime
import time
from typing import Optional
# GLOBAL STATE
# Accumulates every finding; always complete regardless of display filters.
findings: list[dict] = []
# Severity filter for display only (None = show everything).
severity_filter: Optional[set[str]] = None
# When True, individual findings are suppressed; only the summary is printed.
quiet_mode: bool = False
# OUTPUT HELPERS
def banner():
print("""
+--------------------------------------------------------------+
| K8S SECURITY AUDIT TOOL - Read-Only Mode |
| Kubernetes Pentesting & Misconfiguration Scanner |
+--------------------------------------------------------------+
All checks are strictly read-only. No cluster state will be modified.
""")
def section(name: str):
"""Print a visible section separator."""
print(f"\n {'-' * 60}")
print(f" {name}")
print(f" {'-' * 60}")
def info(msg: str):
print(f" [INFO] {msg}")
def ok(msg: str):
print(f" [PASS] {msg}")
def warn(msg: str):
print(f" [WARN] {msg}")
def error(msg: str):
print(f" [ERROR] {msg}")
def finding(severity: str, category: str, title: str,
detail: str, remediation: str = ""):
"""
Record a finding and optionally print it.
The finding is always appended to the global list so the saved JSON
report is always complete, regardless of active display filters.
"""
findings.append({
"severity": severity,
"category": category,
"title": title,
"detail": detail,
"remediation": remediation,
})
if quiet_mode:
return
if severity_filter is not None and severity not in severity_filter:
return
print(f" [{severity}] {title}")
if detail:
for line in textwrap.wrap(detail, width=90):
print(f" {line}")
if remediation:
print(f" -> {remediation}")
def print_finding_block(f: dict):
"""Print a single finding dict (used when replaying a saved report)."""
print(f" [{f.get('severity', '?')}] {f.get('title', '')}")
if detail := f.get("detail", ""):
for line in textwrap.wrap(detail, width=90):
print(f" {line}")
if rem := f.get("remediation", ""):
print(f" -> {rem}")
# CREDENTIAL PATTERN LISTS
# Used across multiple modules (secrets, configmaps, env vars).
# Substrings matched against key / variable *names*.
CREDENTIAL_KEY_PATTERNS = (
"password", "passwd", "pass",
"token", "secret", "key",
"credential", "api_key", "apikey",
"private", "cert", "aws_", "gcp_", "kubeconfig",
)
# Substrings matched against key / variable *values* (assignment-style
# patterns common in config files and env var content).
CREDENTIAL_VALUE_PATTERNS = (
"password=", "passwd=", "pass=",
"token=", "secret=",
"aws_access_key", "private key",
)
# NAMESPACE FILTER
class NsFilter:
"""
Controls which namespaces are in scope for the audit.
Priority:
1. --namespaces -> audit only those namespaces.
2. --exclude-namespaces -> audit all except those.
3. Neither flag -> audit every namespace.
"""
def __init__(self, include: Optional[list[str]], exclude: Optional[list[str]]):
self.include = set(include) if include else None
self.exclude = set(exclude) if exclude else set()
def allow(self, ns: str) -> bool:
if self.include is not None:
return ns in self.include
return ns not in self.exclude
def filter_items(self, items: list[dict]) -> list[dict]:
"""Keep resource dicts whose namespace is in scope.
Cluster-scoped resources (no namespace field) are always kept."""
return [i for i in items if self.allow(i["metadata"].get("namespace", ""))]
def filter_ns_list(self, ns_list: list[str]) -> list[str]:
"""Filter a plain list of namespace name strings."""
return [ns for ns in ns_list if self.allow(ns)]
def describe(self) -> str:
if self.include is not None:
return f"include={sorted(self.include)}"
if self.exclude:
return f"exclude={sorted(self.exclude)}"
return "all namespaces"
# KUBECTL WRAPPER
# Verbs that modify cluster state - blocked as a safety net.
_WRITE_VERBS = {
"apply", "create", "delete", "patch", "replace",
"edit", "label", "annotate", "taint", "drain",
"cordon", "uncordon", "scale", "rollout", "set",
"exec", "cp", "port-forward", "attach", "run",
}
def kubectl(args: list[str], kubeconfig: Optional[str] = None,
output: str = "json", namespace: str = "") -> Optional[dict | list | str]:
"""
Execute a read-only kubectl command and return the parsed result.
output="json" -> returns a parsed Python dict / list.
output="raw" -> returns the raw stdout string (for yes/no queries).
Raises PermissionError if a write verb is requested.
Returns None on any error (timeout, bad JSON, kubectl not found, etc.).
"""
verb = args[0].lower() if args else ""
if verb in _WRITE_VERBS:
raise PermissionError(f"Write verb '{verb}' is not allowed in audit mode.")
cmd = ["kubectl"] + args
if kubeconfig:
cmd += ["--kubeconfig", kubeconfig]
if namespace:
cmd += ["-n", namespace]
if output and output != "raw":
cmd += ["-o", output]
try:
result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
if result.returncode != 0:
return None
if output == "json":
return json.loads(result.stdout)
return result.stdout.strip()
except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
return None
# SMALL SHARED HELPERS
def _get_namespaces(kubeconfig) -> list[str]:
"""Return the list of all namespace names in the cluster."""
data = kubectl(["get", "namespaces"], kubeconfig, output="json")
return [n["metadata"]["name"] for n in (data or {}).get("items", [])]
def _cmd_str(container: dict) -> str:
"""Merge a container's command + args into a single string for flag checks."""
return " ".join(container.get("command", []) + container.get("args", []))
def _all_containers(pod: dict) -> list[dict]:
"""Return regular and init containers for a pod spec."""
spec = pod.get("spec", {})
return spec.get("containers", []) + spec.get("initContainers", [])
# AUDIT MODULES
def audit_cluster_info(kubeconfig, nsf: NsFilter):
"""Check server version currency and node health."""
section("CLUSTER INFORMATION")
version = kubectl(["version"], kubeconfig, output="json")
if not version:
error("Could not retrieve cluster version - check connectivity / kubeconfig.")
return
client = version.get("clientVersion", {}).get("gitVersion", "unknown")
server = version.get("serverVersion", {}).get("gitVersion", "unknown")
info(f"Client version : {client}")
info(f"Server version : {server}")
try:
major = int(version["serverVersion"]["major"])
minor = int(version["serverVersion"]["minor"].replace("+", ""))
if major == 1 and minor < 24:
finding("HIGH", "cluster-info", "Outdated Kubernetes version",
f"Server is running v{major}.{minor}. Versions < 1.24 have known CVEs.",
"Upgrade to a supported Kubernetes release.")
elif major == 1 and minor < 27:
finding("MEDIUM", "cluster-info", "Kubernetes version approaching EOL",
f"Server is running v{major}.{minor}. Consider upgrading.",
"Plan upgrade to latest stable release.")
else:
ok(f"Kubernetes version {major}.{minor} is recent.")
except Exception:
pass # Version fields missing or in an unexpected format.
nodes = kubectl(["get", "nodes"], kubeconfig, output="json")
if not nodes or "items" not in nodes:
return
info(f"Nodes found: {len(nodes['items'])}")
for node in nodes["items"]:
name = node["metadata"]["name"]
roles = [k.split("/")[-1] for k in node["metadata"].get("labels", {})
if "node-role.kubernetes.io" in k]
ready = [c["type"] for c in node["status"].get("conditions", [])
if c.get("status") == "True"]
info(f" Node: {name} roles={roles} conditions={ready}")
if node["spec"].get("unschedulable"):
finding("LOW", "cluster-info", f"Node '{name}' is unschedulable (cordoned)",
"Cordoned nodes may indicate an ongoing incident or forgotten maintenance.",
"Verify node status and uncordon if safe.")
def audit_rbac(kubeconfig, nsf: NsFilter):
"""Inspect ClusterRoleBindings, wildcard ClusterRoles, and default SA token automount."""
section("RBAC - ROLES & BINDINGS")
# cluster-admin bindings
crbs = kubectl(["get", "clusterrolebindings"], kubeconfig, output="json")
if crbs and "items" in crbs:
for crb in crbs["items"]:
if crb.get("roleRef", {}).get("name") != "cluster-admin":
continue
crb_name = crb["metadata"]["name"]
for s in crb.get("subjects") or []:
kind = s.get("kind")
name = s.get("name")
# Skip SA subjects whose namespace is out of scope.
if kind == "ServiceAccount":
sa_ns = s.get("namespace", "")
if sa_ns and not nsf.allow(sa_ns):
continue
if name == "system:anonymous":
finding("CRITICAL", "rbac",
"Anonymous user has cluster-admin",
f"ClusterRoleBinding '{crb_name}' grants cluster-admin to anonymous.",
"Remove this binding immediately.")
elif kind == "ServiceAccount":
finding("HIGH", "rbac",
"ServiceAccount has cluster-admin",
f"SA '{s.get('namespace','?')}/{name}' bound via '{crb_name}'.",
"Restrict to least-privilege roles.")
elif kind in ("User", "Group"):
finding("MEDIUM", "rbac",
f"{kind} '{name}' has cluster-admin",
f"Via ClusterRoleBinding '{crb_name}'.",
"Audit whether cluster-admin is truly required.")
# wildcard ClusterRoles
crs = kubectl(["get", "clusterroles"], kubeconfig, output="json")
if crs and "items" in crs:
for cr in crs["items"]:
name = cr["metadata"]["name"]
if name.startswith("system:"):
continue
for rule in cr.get("rules") or []:
if "*" in rule.get("verbs", []) and "*" in rule.get("resources", []):
finding("HIGH", "rbac",
f"ClusterRole '{name}' uses wildcard verbs+resources",
"Wildcards grant excessive permissions.",
"Replace wildcards with explicit verbs and resources.")
break # One finding per ClusterRole is enough.
# default SA token automount
for ns in nsf.filter_ns_list(_get_namespaces(kubeconfig)):
sa = kubectl(["get", "serviceaccount", "default"],
kubeconfig, output="json", namespace=ns)
if sa and sa.get("automountServiceAccountToken") is not False:
finding("MEDIUM", "rbac",
f"[{ns}] Default SA auto-mounts token",
"Auto-mounted tokens in every pod increase the attack surface.",
"Set automountServiceAccountToken: false on the default SA.")
def audit_pod_security(kubeconfig, nsf: NsFilter):
"""Check pods for dangerous security configurations."""
section("POD SECURITY - PRIVILEGED / CAPABILITIES / HOST NAMESPACES")
pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
if not pods or "items" not in pods:
warn("No pods found or insufficient permissions.")
return
DANGEROUS_CAPS = {"SYS_ADMIN", "NET_ADMIN", "SYS_PTRACE",
"SYS_MODULE", "DAC_READ_SEARCH", "ALL"}
for pod in nsf.filter_items(pods["items"]):
ns = pod["metadata"]["namespace"]
name = pod["metadata"]["name"]
spec = pod.get("spec", {})
# Host namespace sharing lets a compromised pod affect the node directly.
for flag, label in [("hostNetwork", "hostNetwork"), ("hostPID", "hostPID"), ("hostIPC", "hostIPC")]:
if spec.get(flag):
finding("HIGH", "pod-security", f"[{ns}/{name}] {label}: true",
f"Pod shares the host {label.replace('host', '').lower()} namespace.",
f"Remove {label} unless strictly required.")
for c in _all_containers(pod):
sc = c.get("securityContext", {})
cname = c.get("name", "?")
ref = f"[{ns}/{name}/{cname}]"
if sc.get("privileged"):
finding("CRITICAL", "pod-security", f"{ref} Privileged container",
"Privileged containers can escape to the host.",
"Remove privileged: true and use fine-grained capabilities.")
if sc.get("allowPrivilegeEscalation") is not False:
finding("MEDIUM", "pod-security",
f"{ref} allowPrivilegeEscalation not set to false",
"Container may escalate privileges via setuid binaries.",
"Add allowPrivilegeEscalation: false to securityContext.")
risky = set(sc.get("capabilities", {}).get("add", [])) & DANGEROUS_CAPS
if risky:
finding("HIGH", "pod-security", f"{ref} Dangerous capabilities: {risky}",
"These capabilities can be used for container escape.",
"Drop all capabilities and add only what is needed.")
if not sc.get("readOnlyRootFilesystem"):
finding("LOW", "pod-security", f"{ref} No readOnlyRootFilesystem",
"A writable root filesystem eases attacker persistence.",
"Set readOnlyRootFilesystem: true.")
if not sc.get("runAsNonRoot") and sc.get("runAsUser", 0) == 0:
finding("MEDIUM", "pod-security", f"{ref} May run as root",
"Container does not enforce non-root execution.",
"Set runAsNonRoot: true and a non-zero runAsUser.")
image = c.get("image", "")
if image.endswith(":latest") or ":" not in image.split("/")[-1]:
finding("LOW", "pod-security", f"{ref} Image uses 'latest' or no tag",
f"Image: {image}",
"Pin images to a specific digest or version tag.")
def audit_network_policies(kubeconfig, nsf: NsFilter):
"""Verify that every in-scope namespace has at least one NetworkPolicy."""
section("NETWORK POLICIES")
ns_list = nsf.filter_ns_list(_get_namespaces(kubeconfig))
netpols = kubectl(["get", "networkpolicies", "--all-namespaces"],
kubeconfig, output="json")
covered_ns = {np["metadata"]["namespace"]
for np in (netpols or {}).get("items", [])}
for ns in ns_list:
if ns not in covered_ns:
finding("MEDIUM", "network-policy",
f"Namespace '{ns}' has no NetworkPolicy",
"All pods in this namespace can communicate freely.",
"Add a default-deny NetworkPolicy and explicit allow rules.")
else:
ok(f"Namespace '{ns}' has at least one NetworkPolicy.")
# Flag policies that allow ingress from the entire internet.
if netpols and "items" in netpols:
for np in nsf.filter_items(netpols["items"]):
ns = np["metadata"]["namespace"]
name = np["metadata"]["name"]
for rule in np.get("spec", {}).get("ingress") or []:
for frm in rule.get("from") or []:
if frm.get("ipBlock", {}).get("cidr") == "0.0.0.0/0":
finding("MEDIUM", "network-policy",
f"[{ns}/{name}] Ingress from 0.0.0.0/0",
"Policy allows ingress from any IP address.",
"Restrict CIDR to known source ranges.")
def audit_secrets(kubeconfig, nsf: NsFilter):
"""Look for legacy SA tokens, suspicious secret key names, and plaintext env vars."""
section("SECRETS & SENSITIVE DATA")
secrets = kubectl(["get", "secrets", "--all-namespaces"], kubeconfig, output="json")
if not secrets or "items" not in secrets:
warn("Could not list secrets (may require elevated permissions).")
return
in_scope = nsf.filter_items(secrets["items"])
info(f"Total secrets in scope: {len(in_scope)}")
for s in in_scope:
ns = s["metadata"]["namespace"]
name = s["metadata"]["name"]
stype = s.get("type", "")
data = s.get("data") or {}
# Long-lived SA token secrets were deprecated in Kubernetes 1.24.
if stype == "kubernetes.io/service-account-token":
finding("LOW", "secrets",
f"[{ns}/{name}] Legacy SA token secret",
"Long-lived SA token secrets are deprecated; prefer bound tokens.",
"Remove and rely on projected ServiceAccount tokens.")
# For Opaque secrets, scan key names for credential-related substrings.
if stype == "Opaque":
suspicious = [k for k in data
if any(p in k.lower() for p in CREDENTIAL_KEY_PATTERNS)]
if suspicious:
finding("INFO", "secrets",
f"[{ns}/{name}] Secret contains sensitive-looking keys",
f"Keys: {', '.join(suspicious[:5])}",
"Ensure secrets are encrypted at rest (EncryptionConfiguration).")
# Scan pod env vars for hardcoded credentials (secretKeyRef should be used instead).
pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
if not pods or "items" not in pods:
return
for pod in nsf.filter_items(pods["items"]):
ns = pod["metadata"]["namespace"]
name = pod["metadata"]["name"]
for c in pod["spec"].get("containers", []):
for env in c.get("env") or []:
# Only flag literal "value" fields, not valueFrom references.
if "value" not in env:
continue
key = env.get("name", "").lower()
if any(p in key for p in CREDENTIAL_KEY_PATTERNS):
finding("HIGH", "secrets",
f"[{ns}/{name}] Plaintext sensitive env var '{env['name']}'",
"Hardcoded sensitive values in pod env are exposed via the API.",
"Use secretKeyRef or a secrets manager instead.")
def audit_api_server(kubeconfig, nsf: NsFilter):
"""
Test anonymous access and inspect kube-apiserver / etcd process flags.
kube-system checks are not namespace-filtered because they target
cluster-level infrastructure that is relevant to every audit scope.
"""
section("API SERVER & ETCD CONFIGURATION")
# anonymous access
anon_pods = kubectl(["auth", "can-i", "list", "pods",
"--as=system:anonymous", "-n", "default"],
kubeconfig, output="raw")
if anon_pods and anon_pods.strip().lower() == "yes":
finding("CRITICAL", "api-server",
"Anonymous user can list pods in default namespace",
"API server allows unauthenticated requests.",
"Set --anonymous-auth=false on kube-apiserver.")
else:
ok("Anonymous access to list pods is denied.")
anon_secrets = kubectl(["auth", "can-i", "get", "secrets",
"--as=system:anonymous", "-n", "kube-system"],
kubeconfig, output="raw")
if anon_secrets and anon_secrets.strip().lower() == "yes":
finding("CRITICAL", "api-server",
"Anonymous user can read kube-system secrets",
"Critical secrets (bootstrap tokens, etc.) are exposed.",
"Set --anonymous-auth=false and review RBAC immediately.")
# dashboard exposure
svcs = kubectl(["get", "services", "--all-namespaces"], kubeconfig, output="json")
if svcs and "items" in svcs:
for svc in nsf.filter_items(svcs["items"]):
ns = svc["metadata"]["namespace"]
name = svc["metadata"]["name"]
stype = svc["spec"].get("type", "ClusterIP")
if "dashboard" not in name.lower():
continue
if stype in ("LoadBalancer", "NodePort"):
finding("HIGH", "api-server",
f"[{ns}/{name}] Kubernetes Dashboard exposed externally",
f"Service type: {stype}",
"Use kubectl proxy or restrict with NetworkPolicy/authentication.")
else:
finding("INFO", "api-server",
f"[{ns}/{name}] Kubernetes Dashboard deployed (ClusterIP)",
"Ensure access is protected with authentication.",
"Use token-based auth and avoid skip-login.")
# apiserver & etcd startup flags
kube_pods = kubectl(["get", "pods", "-n", "kube-system"], kubeconfig, output="json")
if not kube_pods or "items" not in kube_pods:
return
for pod in kube_pods["items"]:
pod_name = pod["metadata"]["name"]
for c in pod["spec"].get("containers", []):
cmd = _cmd_str(c)
if "kube-apiserver" in pod_name or "apiserver" in cmd:
if "--insecure-port=0" not in cmd and "--insecure-port" in cmd:
finding("CRITICAL", "api-server",
"kube-apiserver insecure port may be enabled",
"The insecure port bypasses authentication and authorisation.",
"Set --insecure-port=0 on kube-apiserver.")
if "--authorization-mode" in cmd and "AlwaysAllow" in cmd:
finding("CRITICAL", "api-server",
"kube-apiserver uses AlwaysAllow authorisation",
"Every request is authorised without checking.",
"Use RBAC as the authorisation mode.")
if "--enable-admission-plugins" in cmd:
if "PodSecurityPolicy" not in cmd and "NodeRestriction" not in cmd:
finding("MEDIUM", "api-server",
"Admission plugins may be missing",
"NodeRestriction and security admission plugins are recommended.",
"Enable NodeRestriction and appropriate admission controllers.")
if "etcd" in pod_name:
if "--client-cert-auth=true" not in cmd:
finding("HIGH", "api-server",
"etcd may not require client certificate authentication",
"etcd without mTLS can be accessed without credentials.",
"Set --client-cert-auth=true on etcd.")
if "--auto-tls=true" in cmd:
finding("HIGH", "api-server",
"etcd uses auto-TLS (self-signed, no CA validation)",
"Auto-TLS does not validate client identity properly.",
"Use proper PKI certificates for etcd.")
def audit_service_accounts(kubeconfig, nsf: NsFilter):
"""Detect pods whose service account has a ClusterRoleBinding and token automount on."""
section("SERVICE ACCOUNTS")
pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
if not pods or "items" not in pods:
return
# Fetch ClusterRoleBindings once and reuse across all pod checks.
# cluster-admin bindings
crbs = kubectl(["get", "clusterrolebindings"], kubeconfig, output="json")
crb_items = (crbs or {}).get("items", [])
for pod in nsf.filter_items(pods["items"]):
ns = pod["metadata"]["namespace"]
name = pod["metadata"]["name"]
sa = pod["spec"].get("serviceAccountName", "default")
automount = pod["spec"].get("automountServiceAccountToken", True)
# Default SA automount is already covered in the RBAC module.
if not automount or sa == "default":
continue
for crb in crb_items:
for subj in crb.get("subjects") or []:
if (subj.get("kind") == "ServiceAccount"
and subj.get("name") == sa
and subj.get("namespace") == ns):
role = crb["roleRef"]["name"]
finding("HIGH", "service-accounts",
f"[{ns}/{name}] Pod SA '{sa}' has ClusterRoleBinding",
f"Role: {role} - token is mounted and reachable from within the pod.",
"Scope bindings to namespace roles; disable token automount if possible.")
def audit_ingress_services(kubeconfig, nsf: NsFilter):
"""Identify externally exposed services and Ingress objects lacking TLS."""
section("INGRESS & EXPOSED SERVICES")
svcs = kubectl(["get", "services", "--all-namespaces"], kubeconfig, output="json")
if svcs and "items" in svcs:
for svc in nsf.filter_items(svcs["items"]):
ns = svc["metadata"]["namespace"]
name = svc["metadata"]["name"]
stype = svc["spec"].get("type", "ClusterIP")
if stype == "NodePort":
ports = [str(p.get("nodePort", "?"))
for p in svc["spec"].get("ports", []) if p.get("nodePort")]
finding("MEDIUM", "ingress-services",
f"[{ns}/{name}] NodePort service",
f"Exposed on node ports: {', '.join(ports)}",
"Prefer LoadBalancer with firewall rules or use Ingress.")
if stype == "LoadBalancer":
lb_ingress = svc["status"].get("loadBalancer", {}).get("ingress", [])
ips = [i.get("ip") or i.get("hostname", "pending") for i in lb_ingress]
finding("INFO", "ingress-services",
f"[{ns}/{name}] LoadBalancer service",
f"External IPs: {', '.join(ips) if ips else 'pending'}",
"Ensure only intended services are internet-facing.")
ingresses = kubectl(["get", "ingresses", "--all-namespaces"],
kubeconfig, output="json")
if not ingresses or "items" not in ingresses:
return
for ing in nsf.filter_items(ingresses["items"]):
ns = ing["metadata"]["namespace"]
name = ing["metadata"]["name"]
spec = ing.get("spec", {})
if not spec.get("tls"):
finding("MEDIUM", "ingress-services",
f"[{ns}/{name}] Ingress without TLS",
"Traffic may be served over plain HTTP.",
"Add a TLS section with a valid certificate.")
for rule in spec.get("rules") or []:
if not rule.get("host"):
finding("LOW", "ingress-services",
f"[{ns}/{name}] Ingress rule without host (catch-all)",
"Catch-all rules may expose unexpected backends.",
"Specify explicit host names for all Ingress rules.")
def audit_psa(kubeconfig, nsf: NsFilter):
"""Check PodSecurityPolicies (deprecated) and Pod Security Admission labels."""
section("POD SECURITY ADMISSION / POD SECURITY POLICIES")
# PodSecurityPolicy was removed in Kubernetes 1.25.
psps = kubectl(["get", "podsecuritypolicies"], kubeconfig, output="json")
if psps and "items" in psps:
info(f"PodSecurityPolicies found: {len(psps['items'])} (deprecated resource)")
for psp in psps["items"]:
name = psp["metadata"]["name"]
spec = psp.get("spec", {})
if spec.get("privileged"):
finding("HIGH", "psa",
f"PSP '{name}' allows privileged containers",
"Any pod bound to this PSP can run as privileged.",
"Migrate to Pod Security Admission and restrict privileged.")
if spec.get("hostNetwork") or spec.get("hostPID") or spec.get("hostIPC"):
finding("MEDIUM", "psa",
f"PSP '{name}' allows host namespace access",
"Pods can access host network/PID/IPC namespaces.",
"Disallow host namespaces in PSP spec.")
else:
ok("No PodSecurityPolicies found (or not applicable for this cluster version).")
# Check PSA enforcement labels on each in-scope namespace.
namespaces = kubectl(["get", "namespaces"], kubeconfig, output="json")
if not namespaces or "items" not in namespaces:
return
ns_map = {n["metadata"]["name"]: n for n in namespaces["items"]}
for ns_name in nsf.filter_ns_list(list(ns_map)):
labels = ns_map[ns_name]["metadata"].get("labels") or {}
enforce = labels.get("pod-security.kubernetes.io/enforce")
if not enforce:
finding("MEDIUM", "psa",
f"Namespace '{ns_name}' has no PSA enforce label",
"Pod Security Admission is not enforced in this namespace.",
"Add pod-security.kubernetes.io/enforce=restricted (or baseline).")
elif enforce == "privileged":
finding("HIGH", "psa",
f"Namespace '{ns_name}' PSA enforce=privileged",
"No pod security restrictions are applied.",
"Change the PSA enforce level to 'baseline' or 'restricted'.")
else:
ok(f"Namespace '{ns_name}' PSA enforce={enforce}")
def audit_configmaps(kubeconfig, nsf: NsFilter):
"""Scan ConfigMap keys and values for credential-like content."""
section("CONFIGMAPS - SENSITIVE DATA EXPOSURE")
cms = kubectl(["get", "configmaps", "--all-namespaces"], kubeconfig, output="json")
if not cms or "items" not in cms:
return
for cm in nsf.filter_items(cms["items"]):
ns = cm["metadata"]["namespace"]
name = cm["metadata"]["name"]
data = cm.get("data") or {}
for k, v in data.items():
# Check the key name first.
if any(p in k.lower() for p in CREDENTIAL_KEY_PATTERNS):
finding("MEDIUM", "configmaps",
f"[{ns}/{name}] ConfigMap key '{k}' looks sensitive",
"Sensitive data should not be stored in ConfigMaps.",
"Move to a Kubernetes Secret or external secrets manager.")
break
# Then check the value for assignment-style patterns.
if isinstance(v, str) and len(v) > 10:
if any(p in v.lower() for p in CREDENTIAL_VALUE_PATTERNS):
finding("HIGH", "configmaps",
f"[{ns}/{name}] ConfigMap value in key '{k}' may contain credentials",
"Credential-like strings found in ConfigMap value.",
"Move credentials to Secrets and rotate immediately.")
break
def audit_images(kubeconfig, nsf: NsFilter):
"""Flag images from public registries and images not pinned by SHA256 digest."""
section("IMAGE SECURITY & SUPPLY CHAIN")
pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
if not pods or "items" not in pods:
return
PUBLIC_REGISTRIES = {"docker.io", "ghcr.io", "quay.io", "gcr.io", "public.ecr.aws"}
seen: set[str] = set() # Avoid duplicate findings for the same image.
for pod in nsf.filter_items(pods["items"]):
ns = pod["metadata"]["namespace"]
name = pod["metadata"]["name"]
for c in _all_containers(pod):
image = c.get("image", "")
if image in seen:
continue
seen.add(image)
if any(reg in image for reg in PUBLIC_REGISTRIES):
finding("LOW", "images",
f"[{ns}/{name}] Image from public registry",
f"Image: {image}",
"Use a private registry with image scanning enabled.")
# Without a @sha256: digest a tag can be silently replaced (tag mutation).
if "@sha256:" not in image:
finding("LOW", "images",
f"[{ns}/{name}] Image not pinned by digest",
f"Image: {image}",
"Pin images by SHA256 digest for supply-chain integrity.")
def audit_logging(kubeconfig, nsf: NsFilter):
"""Verify API server audit logging flags and detect common monitoring agents."""
section("LOGGING & AUDIT CONFIGURATION")
# Inspect kube-apiserver static pod for audit log flags.
kube_pods = kubectl(["get", "pods", "-n", "kube-system"], kubeconfig, output="json")
if kube_pods and "items" in kube_pods:
for pod in kube_pods["items"]:
for c in pod["spec"].get("containers", []):
cmd = _cmd_str(c)
if "kube-apiserver" not in pod["metadata"]["name"] and "apiserver" not in cmd:
continue
if "--audit-log-path" not in cmd:
finding("MEDIUM", "logging",
"kube-apiserver audit logging not configured",
"API requests are not being logged.",
"Configure --audit-log-path and --audit-policy-file.")
else:
ok("kube-apiserver audit logging is configured.")
if "--audit-log-maxage" not in cmd:
finding("LOW", "logging",
"kube-apiserver audit log max-age not set",
"Old audit logs may accumulate or be deleted prematurely.",
"Set --audit-log-maxage to comply with your retention policy.")
# Check whether any well-known monitoring agent is running in the cluster.
LOG_KEYWORDS = {"fluentd", "fluent-bit", "filebeat", "logstash",
"falco", "prometheus", "loki", "grafana"}
all_pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
has_agent = False
if all_pods and "items" in all_pods:
for pod in nsf.filter_items(all_pods["items"]):
if any(kw in pod["metadata"]["name"].lower() for kw in LOG_KEYWORDS):
has_agent = True
break
if has_agent:
ok("Logging/monitoring agent detected in cluster.")
else:
finding("MEDIUM", "logging",
"No common logging/monitoring agent detected",
"Falco, Prometheus, Fluentd, etc. were not found.",
"Deploy a runtime security and logging solution.")
# REPORT LOADING (--input mode)
def load_and_display_report(path: str, severity_filter: Optional[set[str]] = None):
"""
Load a previously saved JSON report and display it in the same format
as a live audit, then print the summary counts.
"""
banner()
info(f"Loading saved report: {path}")
try:
with open(path) as fh:
report = json.load(fh)
except FileNotFoundError:
error(f"File not found: {path}")
sys.exit(1)
except json.JSONDecodeError as exc:
error(f"Invalid JSON in report file: {exc}")
sys.exit(1)
info(f"Report generated : {report.get('generated', 'unknown')}")
loaded = report.get("findings", [])
if not loaded:
warn("No findings found in the report file.")
print()
return
# Group findings by category and display each section.
by_category: dict[str, list[dict]] = {}
for f in loaded:
by_category.setdefault(f.get("category", "uncategorized"), []).append(f)
for cat, items in by_category.items():
visible = [f for f in items
if severity_filter is None or f.get("severity") in severity_filter]
if not visible:
continue
section(cat.upper())
for f in visible:
print_finding_block(f)
# Summary always reflects the full report totals, not just the filtered view.
section("SUMMARY REPORT")
counts = report.get("summary") or {}
if not counts:
counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0}
for f in loaded:
sev = f.get("severity", "INFO")
counts[sev] = counts.get(sev, 0) + 1
total = sum(counts.values())
print(f"\n Total findings: {total}")
for sev, cnt in counts.items():
bar = "#" * min(cnt, 40)
print(f" {sev:<10} {bar} {cnt}")
print()
# SUMMARY REPORT
def print_summary(output_file: Optional[str] = None):
"""Print the counts-per-severity table and optionally save the report to JSON."""
section("SUMMARY REPORT")
counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0}
for f in findings:
counts[f["severity"]] = counts.get(f["severity"], 0) + 1
total = sum(counts.values())
print(f"\n Total findings: {total}")
for sev, cnt in counts.items():
bar = "#" * min(cnt, 40)
print(f" {sev:<10} {bar} {cnt}")
if output_file:
report = {
"generated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f" ({time.tzname[time.daylight]})",
"summary": counts,
"findings": findings,
}
with open(output_file, "w") as fh:
json.dump(report, fh, indent=2)
print(f"\n JSON report saved to: {output_file}")
print()
# CLI
MODULES = {
"cluster-info": audit_cluster_info,
"rbac": audit_rbac,
"pod-security": audit_pod_security,
"network-policy": audit_network_policies,
"secrets": audit_secrets,
"api-server": audit_api_server,
"service-accounts": audit_service_accounts,
"ingress": audit_ingress_services,
"psa": audit_psa,
"configmaps": audit_configmaps,
"images": audit_images,
"logging": audit_logging,
}
def main():
parser = argparse.ArgumentParser(
prog="k8s_audit.py",
description="Kubernetes Security Audit Tool - read-only enumeration / pentesting",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=(
"Available audit modules:\n"
" cluster-info\n"
" rbac\n"
" pod-security\n"
" network-policy\n"
" secrets\n"
" api-server\n"
" service-accounts\n"
" ingress\n"
" psa\n"
" configmaps\n"
" images\n"
" logging\n"
"\n"
"Severity levels:\n"
" CRITICAL Immediate risk (e.g. anonymous cluster-admin access)\n"
" HIGH Serious misconfiguration (e.g. privileged container)\n"
" MEDIUM Notable risk requiring attention\n"
" LOW Minor hardening gap\n"
" INFO Informational, no direct risk\n"
"\n"
"Examples:\n"
" %(prog)s\n"
" %(prog)s --quiet\n"
" %(prog)s --modules rbac pod-security\n"
" %(prog)s --namespaces default kube-system\n"
" %(prog)s --exclude-namespaces kube-system kube-public\n"
" %(prog)s --kubeconfig /path/to/config --output report.json\n"
" %(prog)s --input report.json\n"
" %(prog)s --input report.json --severity CRITICAL HIGH\n"
),
)
parser.add_argument(
"--kubeconfig", "-k", metavar="FILE", default=None,
help="Path to kubeconfig file (default: $KUBECONFIG or ~/.kube/config)",
)
parser.add_argument(
"--modules", "-m", nargs="+", metavar="MODULE",
choices=list(MODULES.keys()), default=None,
help="Run specific audit module(s). Omit for full audit.",
)
parser.add_argument(
"--output", "-o", metavar="FILE", default=None,
help="Save findings to a JSON report file after a live audit.",
)
parser.add_argument(
"--input", "-i", metavar="FILE", default=None,
help="Load and display a previously saved JSON report (no cluster connection).",
)
parser.add_argument(
"--severity", "-s", nargs="+", metavar="LEVEL", default=None,
choices=["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"],
help="Filter output by severity level (see epilog for valid levels).",
)
parser.add_argument(
"--quiet", "-q", action="store_true", default=False,
help="Suppress all finding output during the audit; only print the summary at the end.",
)
ns_group = parser.add_mutually_exclusive_group()
ns_group.add_argument(
"--namespaces", "-n", nargs="+", metavar="NS", default=None,
help="Audit only these namespaces (space-separated).",
)
ns_group.add_argument(
"--exclude-namespaces", "-e", nargs="+", metavar="NS", default=None,
help="Skip these namespaces; audit all others (space-separated).",
)
args = parser.parse_args()
# Report-loading mode: display a saved JSON report then exit.
if args.input:
load_and_display_report(
args.input,
severity_filter=set(args.severity) if args.severity else None,
)
sys.exit(0)
# Live audit mode.
global severity_filter, quiet_mode
severity_filter = set(args.severity) if args.severity else None
quiet_mode = args.quiet
nsf = NsFilter(include=args.namespaces, exclude=args.exclude_namespaces)
banner()
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f" ({time.tzname[time.daylight]})"
info(f"Audit started : {now}")
kc = args.kubeconfig or os.environ.get("KUBECONFIG", "~/.kube/config")
info(f"Using kubeconfig : {kc}{'' if args.kubeconfig else ' (default)'}")
info(f"Namespace scope : {nsf.describe()}")
info(f"Severity filter : {', '.join(sorted(severity_filter)) if severity_filter else 'all'}")
info(f"Quiet mode : {'on (only summary will be shown)' if quiet_mode else 'off'}")
selected = args.modules or list(MODULES.keys())
info(f"Modules to run : {', '.join(selected)}\n")
for mod_name in selected:
try:
MODULES[mod_name](args.kubeconfig, nsf)
except PermissionError as exc:
error(f"[{mod_name}] Blocked write attempt: {exc}")
except Exception as exc:
error(f"[{mod_name}] Unexpected error: {exc}")
print_summary(args.output)
if __name__ == "__main__":
main()