Introducción

k8s_audit.py es una herramienta de auditoría de seguridad para clústeres Kubernetes diseñada para la enumeración de configuraciones incorrectas y la realización de pruebas de penetración en entornos contenedorizados. Todas sus operaciones son estrictamente de solo lectura: la herramienta nunca modifica el estado del clúster, lo que la hace segura para ejecutar en entornos de producción sin riesgo de interrupciones ni efectos secundarios no deseados.

La herramienta está organizada en doce módulos de auditoría independientes que cubren las principales superficies de ataque de un clúster Kubernetes. Cada módulo puede ejecutarse de forma aislada o en combinación con otros, lo que permite adaptar el alcance de la auditoría a las necesidades específicas de cada entorno. Los módulos disponibles son los siguientes:

  • cluster-info: versión del servidor y estado de los nodos.
  • rbac: bindings de cluster-admin, ClusterRoles con comodines y automount de tokens en cuentas de servicio por defecto.
  • pod-security: pods con privilegios elevados, capacidades de Linux peligrosas, uso de namespaces del host y ejecución como root.
  • network-policy: namespaces sin NetworkPolicy y políticas que permiten tráfico desde cualquier origen.
  • secrets: tokens de servicio heredados, claves con nombres sospechosos y variables de entorno con credenciales en texto plano.
  • api-server: acceso anónimo, exposición del Dashboard, puerto inseguro, modo de autorización y configuración de etcd.
  • service-accounts: cuentas de servicio con ClusterRoleBinding y token montado.
  • ingress: servicios NodePort y LoadBalancer, Ingress sin TLS y reglas comodín.
  • psa: PodSecurityPolicies (deprecadas) y etiquetas de Pod Security Admission por namespace.
  • configmaps: claves y valores con patrones de credenciales.
  • images: imágenes de registros públicos e imágenes sin digest SHA256.
  • logging: configuración de auditoría en el API server y presencia de agentes de monitorización.

Cada hallazgo se clasifica en uno de cinco niveles de severidad: CRITICAL, HIGH, MEDIUM, LOW e INFO. El nivel CRITICAL indica un riesgo inmediato que requiere atención urgente, como el acceso anónimo con permisos de administrador de clúster. El nivel HIGH corresponde a configuraciones gravemente incorrectas que pueden ser explotadas con poco esfuerzo adicional, como contenedores privilegiados o capacidades de Linux peligrosas que facilitan la fuga del contenedor hacia el nodo subyacente. El nivel MEDIUM agrupa riesgos notables que deben corregirse a corto plazo, como namespaces sin política de red o cuentas de servicio con automount de token activado. El nivel LOW recoge deficiencias menores de endurecimiento que no representan una amenaza directa pero que reducen la postura de seguridad general del clúster, como imágenes sin versión fija o sistemas de ficheros raíz con escritura habilitada. Por último, INFO reporta elementos informativos sin riesgo asociado que pueden resultar útiles como contexto durante el análisis.

Además de la clasificación por severidad, cada hallazgo incluye una descripción detallada del problema detectado y una recomendación concreta de remediación. La salida puede filtrarse por uno o varios niveles de severidad para centrar la atención en los problemas más críticos, o suprimirse por completo mediante el modo silencioso para obtener únicamente la tabla de resumen al final de la ejecución. Los resultados pueden exportarse a un fichero JSON para su integración con otras herramientas, su ingesta en un SIEM o su revisión posterior sin necesidad de conectarse de nuevo al clúster. Un informe guardado previamente puede cargarse y visualizarse en cualquier momento con la opción --input, lo que permite compartir resultados entre equipos o revisar el histórico de auditorías anteriores sin requerir acceso en vivo al clúster.

Uso de la aplicación

A continuación se enumeran las diferentes opciones de las que dispone la aplicación:

  • --kubeconfig FILE (-k): Ruta al fichero kubeconfig. Por defecto usa $KUBECONFIG o ~/.kube/config.
  • --modules MODULE... (-m): Ejecuta solo los módulos indicados. Sin esta opción se ejecutan todos.
  • --output FILE (-o): Guarda el informe completo en formato JSON.
  • --input FILE (-i): Carga y muestra un informe JSON guardado previamente, sin conectarse al clúster.
  • --severity LEVEL... (-s): Filtra la salida por nivel de severidad. Ver el epilog de --help para los niveles válidos.
  • --quiet (-q): Suprime los hallazgos individuales durante la ejecución; solo muestra la tabla resumen al final.
  • --namespaces NS... (-n): Audita únicamente los namespaces indicados.
  • --exclude-namespaces NS... (-e): Excluye los namespaces indicados y audita el resto.

Estos son algunos ejemplos prácticos de posibles comandos a ejecutar:

# Auditoría completa con kubeconfig por defecto
python3 k8s_audit.py

# Auditar solo los módulos de RBAC y seguridad de pods
python3 k8s_audit.py --modules rbac pod-security

# Auditar solo los namespaces "default" y "production"
python3 k8s_audit.py --namespaces default production

# Excluir namespaces de sistema y guardar el informe en JSON
python3 k8s_audit.py --exclude-namespaces kube-system kube-public --output informe.json

# Mostrar únicamente hallazgos críticos y altos
python3 k8s_audit.py --severity CRITICAL HIGH

# Ejecutar en modo silencioso (solo resumen) y guardar el informe
python3 k8s_audit.py --quiet --output informe.json

# Revisar un informe guardado previamente
python3 k8s_audit.py --input informe.json

# Revisar un informe filtrando solo hallazgos críticos
python3 k8s_audit.py --input informe.json --severity CRITICAL

# Usar un kubeconfig personalizado con módulos específicos
python3 k8s_audit.py --kubeconfig ~/.kube/prod-config --modules secrets configmaps api-server

Al ejecutar la aplicación sin parámetros se realiza un análisis estándar, como en este ejemplo de ejecución:

$ python3 k8s_audit.py
+--------------------------------------------------------------+
|          K8S SECURITY AUDIT TOOL  -  Read-Only Mode          |
|          Kubernetes Pentesting & Misconfiguration Scanner    |
+--------------------------------------------------------------+
  All checks are strictly read-only. No cluster state will be modified.

  [INFO] Audit started    : 2026-02-16 09:32:10 UTC
  [INFO] Using kubeconfig : ~/.kube/config (default)
  [INFO] Namespace scope  : all namespaces
  [INFO] Severity filter  : all
  [INFO] Quiet mode       : off
  [INFO] Modules to run   : cluster-info, rbac, pod-security, ...

  ------------------------------------------------------------
  CLUSTER INFORMATION
  ------------------------------------------------------------
  [INFO] Client version : v1.29.2
  [INFO] Server version : v1.26.9
  [MEDIUM] Kubernetes version approaching EOL
         Server is running v1.26. Consider upgrading.
         -> Plan upgrade to latest stable release.
  [INFO] Nodes found: 3
  [INFO]   Node: node-01  roles=['control-plane']  conditions=['Ready']
  [INFO]   Node: node-02  roles=['worker']  conditions=['Ready']
  [INFO]   Node: node-03  roles=['worker']  conditions=['Ready']

  ------------------------------------------------------------
  RBAC - ROLES & BINDINGS
  ------------------------------------------------------------
  [HIGH] ServiceAccount has cluster-admin
         SA 'monitoring/prometheus-sa' bound via 'prometheus-cluster-admin'.
         -> Restrict to least-privilege roles.
  [MEDIUM] [default] Default SA auto-mounts token
         Auto-mounted tokens in every pod increase attack surface.
         -> Set automountServiceAccountToken: false on the default SA.

  ------------------------------------------------------------
  POD SECURITY - PRIVILEGED / CAPABILITIES / HOST NAMESPACES
  ------------------------------------------------------------
  [CRITICAL] [default/debug-pod/debug] Privileged container
         Privileged containers can escape to the host.
         -> Remove privileged: true and use fine-grained capabilities.
  [HIGH] [default/debug-pod/debug] Dangerous capabilities: {'SYS_ADMIN'}
         These capabilities can be used for container escape.
         -> Drop all capabilities and add only what is needed.

  ------------------------------------------------------------
  SUMMARY REPORT
  ------------------------------------------------------------

  Total findings: 18
  CRITICAL   ## 2
  HIGH       ##### 5
  MEDIUM     ######## 8
  LOW        ### 3
  INFO       0

Código fuente

#!/usr/bin/env python3
"""
k8s_audit.py - Kubernetes Security Audit Tool (Read-Only Enumeration)

Performs security audits and misconfiguration checks on Kubernetes clusters.
All operations are strictly read-only; no cluster state is ever modified.

Usage modes:
  - Live audit  : connect to a cluster and run one or more audit modules.
  - Report mode : load a previously saved JSON report and display findings.
"""

import argparse
import json
import os
import subprocess
import sys
import textwrap
from datetime import datetime
import time
from typing import Optional


# GLOBAL STATE

# Accumulates every finding; always complete regardless of display filters.
findings: list[dict] = []

# Severity filter for display only (None = show everything).
severity_filter: Optional[set[str]] = None

# When True, individual findings are suppressed; only the summary is printed.
quiet_mode: bool = False


# OUTPUT HELPERS

def banner():
    print("""
+--------------------------------------------------------------+
|          K8S SECURITY AUDIT TOOL  -  Read-Only Mode          |
|          Kubernetes Pentesting & Misconfiguration Scanner    |
+--------------------------------------------------------------+
  All checks are strictly read-only. No cluster state will be modified.
""")


def section(name: str):
    """Print a visible section separator."""
    print(f"\n  {'-' * 60}")
    print(f"  {name}")
    print(f"  {'-' * 60}")


def info(msg: str):
    print(f"  [INFO] {msg}")


def ok(msg: str):
    print(f"  [PASS] {msg}")


def warn(msg: str):
    print(f"  [WARN] {msg}")


def error(msg: str):
    print(f"  [ERROR] {msg}")


def finding(severity: str, category: str, title: str,
            detail: str, remediation: str = ""):
    """
    Record a finding and optionally print it.

    The finding is always appended to the global list so the saved JSON
    report is always complete, regardless of active display filters.
    """
    findings.append({
        "severity":    severity,
        "category":    category,
        "title":       title,
        "detail":      detail,
        "remediation": remediation,
    })

    if quiet_mode:
        return
    if severity_filter is not None and severity not in severity_filter:
        return

    print(f"  [{severity}] {title}")
    if detail:
        for line in textwrap.wrap(detail, width=90):
            print(f"         {line}")
    if remediation:
        print(f"         -> {remediation}")


def print_finding_block(f: dict):
    """Print a single finding dict (used when replaying a saved report)."""
    print(f"  [{f.get('severity', '?')}] {f.get('title', '')}")
    if detail := f.get("detail", ""):
        for line in textwrap.wrap(detail, width=90):
            print(f"         {line}")
    if rem := f.get("remediation", ""):
        print(f"         -> {rem}")


# CREDENTIAL PATTERN LISTS
# Used across multiple modules (secrets, configmaps, env vars).

# Substrings matched against key / variable *names*.
CREDENTIAL_KEY_PATTERNS = (
    "password", "passwd", "pass",
    "token",    "secret", "key",
    "credential", "api_key", "apikey",
    "private",  "cert",   "aws_", "gcp_", "kubeconfig",
)

# Substrings matched against key / variable *values* (assignment-style
# patterns common in config files and env var content).
CREDENTIAL_VALUE_PATTERNS = (
    "password=", "passwd=",  "pass=",
    "token=",    "secret=",
    "aws_access_key", "private key",
)


# NAMESPACE FILTER

class NsFilter:
    """
    Controls which namespaces are in scope for the audit.

    Priority:
      1. --namespaces   -> audit only those namespaces.
      2. --exclude-namespaces -> audit all except those.
      3. Neither flag   -> audit every namespace.
    """

    def __init__(self, include: Optional[list[str]], exclude: Optional[list[str]]):
        self.include = set(include) if include else None
        self.exclude = set(exclude) if exclude else set()

    def allow(self, ns: str) -> bool:
        if self.include is not None:
            return ns in self.include
        return ns not in self.exclude

    def filter_items(self, items: list[dict]) -> list[dict]:
        """Keep resource dicts whose namespace is in scope.
        Cluster-scoped resources (no namespace field) are always kept."""
        return [i for i in items if self.allow(i["metadata"].get("namespace", ""))]

    def filter_ns_list(self, ns_list: list[str]) -> list[str]:
        """Filter a plain list of namespace name strings."""
        return [ns for ns in ns_list if self.allow(ns)]

    def describe(self) -> str:
        if self.include is not None:
            return f"include={sorted(self.include)}"
        if self.exclude:
            return f"exclude={sorted(self.exclude)}"
        return "all namespaces"


# KUBECTL WRAPPER

# Verbs that modify cluster state - blocked as a safety net.
_WRITE_VERBS = {
    "apply", "create", "delete", "patch", "replace",
    "edit", "label", "annotate", "taint", "drain",
    "cordon", "uncordon", "scale", "rollout", "set",
    "exec", "cp", "port-forward", "attach", "run",
}


def kubectl(args: list[str], kubeconfig: Optional[str] = None,
            output: str = "json", namespace: str = "") -> Optional[dict | list | str]:
    """
    Execute a read-only kubectl command and return the parsed result.

    output="json" -> returns a parsed Python dict / list.
    output="raw"  -> returns the raw stdout string (for yes/no queries).

    Raises PermissionError if a write verb is requested.
    Returns None on any error (timeout, bad JSON, kubectl not found, etc.).
    """
    verb = args[0].lower() if args else ""
    if verb in _WRITE_VERBS:
        raise PermissionError(f"Write verb '{verb}' is not allowed in audit mode.")

    cmd = ["kubectl"] + args
    if kubeconfig:
        cmd += ["--kubeconfig", kubeconfig]
    if namespace:
        cmd += ["-n", namespace]
    if output and output != "raw":
        cmd += ["-o", output]

    try:
        result = subprocess.run(cmd, capture_output=True, text=True, timeout=30)
        if result.returncode != 0:
            return None
        if output == "json":
            return json.loads(result.stdout)
        return result.stdout.strip()
    except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError):
        return None


# SMALL SHARED HELPERS

def _get_namespaces(kubeconfig) -> list[str]:
    """Return the list of all namespace names in the cluster."""
    data = kubectl(["get", "namespaces"], kubeconfig, output="json")
    return [n["metadata"]["name"] for n in (data or {}).get("items", [])]


def _cmd_str(container: dict) -> str:
    """Merge a container's command + args into a single string for flag checks."""
    return " ".join(container.get("command", []) + container.get("args", []))


def _all_containers(pod: dict) -> list[dict]:
    """Return regular and init containers for a pod spec."""
    spec = pod.get("spec", {})
    return spec.get("containers", []) + spec.get("initContainers", [])


# AUDIT MODULES

def audit_cluster_info(kubeconfig, nsf: NsFilter):
    """Check server version currency and node health."""
    section("CLUSTER INFORMATION")

    version = kubectl(["version"], kubeconfig, output="json")
    if not version:
        error("Could not retrieve cluster version - check connectivity / kubeconfig.")
        return

    client = version.get("clientVersion", {}).get("gitVersion", "unknown")
    server = version.get("serverVersion", {}).get("gitVersion", "unknown")
    info(f"Client version : {client}")
    info(f"Server version : {server}")

    try:
        major = int(version["serverVersion"]["major"])
        minor = int(version["serverVersion"]["minor"].replace("+", ""))
        if major == 1 and minor < 24:
            finding("HIGH", "cluster-info", "Outdated Kubernetes version",
                    f"Server is running v{major}.{minor}. Versions < 1.24 have known CVEs.",
                    "Upgrade to a supported Kubernetes release.")
        elif major == 1 and minor < 27:
            finding("MEDIUM", "cluster-info", "Kubernetes version approaching EOL",
                    f"Server is running v{major}.{minor}. Consider upgrading.",
                    "Plan upgrade to latest stable release.")
        else:
            ok(f"Kubernetes version {major}.{minor} is recent.")
    except Exception:
        pass  # Version fields missing or in an unexpected format.

    nodes = kubectl(["get", "nodes"], kubeconfig, output="json")
    if not nodes or "items" not in nodes:
        return

    info(f"Nodes found: {len(nodes['items'])}")
    for node in nodes["items"]:
        name  = node["metadata"]["name"]
        roles = [k.split("/")[-1] for k in node["metadata"].get("labels", {})
                 if "node-role.kubernetes.io" in k]
        ready = [c["type"] for c in node["status"].get("conditions", [])
                 if c.get("status") == "True"]
        info(f"  Node: {name}  roles={roles}  conditions={ready}")

        if node["spec"].get("unschedulable"):
            finding("LOW", "cluster-info", f"Node '{name}' is unschedulable (cordoned)",
                    "Cordoned nodes may indicate an ongoing incident or forgotten maintenance.",
                    "Verify node status and uncordon if safe.")


def audit_rbac(kubeconfig, nsf: NsFilter):
    """Inspect ClusterRoleBindings, wildcard ClusterRoles, and default SA token automount."""
    section("RBAC - ROLES & BINDINGS")

    # cluster-admin bindings
    crbs = kubectl(["get", "clusterrolebindings"], kubeconfig, output="json")
    if crbs and "items" in crbs:
        for crb in crbs["items"]:
            if crb.get("roleRef", {}).get("name") != "cluster-admin":
                continue
            crb_name = crb["metadata"]["name"]
            for s in crb.get("subjects") or []:
                kind = s.get("kind")
                name = s.get("name")

                # Skip SA subjects whose namespace is out of scope.
                if kind == "ServiceAccount":
                    sa_ns = s.get("namespace", "")
                    if sa_ns and not nsf.allow(sa_ns):
                        continue

                if name == "system:anonymous":
                    finding("CRITICAL", "rbac",
                            "Anonymous user has cluster-admin",
                            f"ClusterRoleBinding '{crb_name}' grants cluster-admin to anonymous.",
                            "Remove this binding immediately.")
                elif kind == "ServiceAccount":
                    finding("HIGH", "rbac",
                            "ServiceAccount has cluster-admin",
                            f"SA '{s.get('namespace','?')}/{name}' bound via '{crb_name}'.",
                            "Restrict to least-privilege roles.")
                elif kind in ("User", "Group"):
                    finding("MEDIUM", "rbac",
                            f"{kind} '{name}' has cluster-admin",
                            f"Via ClusterRoleBinding '{crb_name}'.",
                            "Audit whether cluster-admin is truly required.")

    # wildcard ClusterRoles
    crs = kubectl(["get", "clusterroles"], kubeconfig, output="json")
    if crs and "items" in crs:
        for cr in crs["items"]:
            name = cr["metadata"]["name"]
            if name.startswith("system:"):
                continue
            for rule in cr.get("rules") or []:
                if "*" in rule.get("verbs", []) and "*" in rule.get("resources", []):
                    finding("HIGH", "rbac",
                            f"ClusterRole '{name}' uses wildcard verbs+resources",
                            "Wildcards grant excessive permissions.",
                            "Replace wildcards with explicit verbs and resources.")
                    break  # One finding per ClusterRole is enough.

    # default SA token automount
    for ns in nsf.filter_ns_list(_get_namespaces(kubeconfig)):
        sa = kubectl(["get", "serviceaccount", "default"],
                     kubeconfig, output="json", namespace=ns)
        if sa and sa.get("automountServiceAccountToken") is not False:
            finding("MEDIUM", "rbac",
                    f"[{ns}] Default SA auto-mounts token",
                    "Auto-mounted tokens in every pod increase the attack surface.",
                    "Set automountServiceAccountToken: false on the default SA.")


def audit_pod_security(kubeconfig, nsf: NsFilter):
    """Check pods for dangerous security configurations."""
    section("POD SECURITY - PRIVILEGED / CAPABILITIES / HOST NAMESPACES")

    pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
    if not pods or "items" not in pods:
        warn("No pods found or insufficient permissions.")
        return

    DANGEROUS_CAPS = {"SYS_ADMIN", "NET_ADMIN", "SYS_PTRACE",
                      "SYS_MODULE", "DAC_READ_SEARCH", "ALL"}

    for pod in nsf.filter_items(pods["items"]):
        ns   = pod["metadata"]["namespace"]
        name = pod["metadata"]["name"]
        spec = pod.get("spec", {})

        # Host namespace sharing lets a compromised pod affect the node directly.
        for flag, label in [("hostNetwork", "hostNetwork"), ("hostPID", "hostPID"), ("hostIPC", "hostIPC")]:
            if spec.get(flag):
                finding("HIGH", "pod-security", f"[{ns}/{name}] {label}: true",
                        f"Pod shares the host {label.replace('host', '').lower()} namespace.",
                        f"Remove {label} unless strictly required.")

        for c in _all_containers(pod):
            sc    = c.get("securityContext", {})
            cname = c.get("name", "?")
            ref   = f"[{ns}/{name}/{cname}]"

            if sc.get("privileged"):
                finding("CRITICAL", "pod-security", f"{ref} Privileged container",
                        "Privileged containers can escape to the host.",
                        "Remove privileged: true and use fine-grained capabilities.")

            if sc.get("allowPrivilegeEscalation") is not False:
                finding("MEDIUM", "pod-security",
                        f"{ref} allowPrivilegeEscalation not set to false",
                        "Container may escalate privileges via setuid binaries.",
                        "Add allowPrivilegeEscalation: false to securityContext.")

            risky = set(sc.get("capabilities", {}).get("add", [])) & DANGEROUS_CAPS
            if risky:
                finding("HIGH", "pod-security", f"{ref} Dangerous capabilities: {risky}",
                        "These capabilities can be used for container escape.",
                        "Drop all capabilities and add only what is needed.")

            if not sc.get("readOnlyRootFilesystem"):
                finding("LOW", "pod-security", f"{ref} No readOnlyRootFilesystem",
                        "A writable root filesystem eases attacker persistence.",
                        "Set readOnlyRootFilesystem: true.")

            if not sc.get("runAsNonRoot") and sc.get("runAsUser", 0) == 0:
                finding("MEDIUM", "pod-security", f"{ref} May run as root",
                        "Container does not enforce non-root execution.",
                        "Set runAsNonRoot: true and a non-zero runAsUser.")

            image = c.get("image", "")
            if image.endswith(":latest") or ":" not in image.split("/")[-1]:
                finding("LOW", "pod-security", f"{ref} Image uses 'latest' or no tag",
                        f"Image: {image}",
                        "Pin images to a specific digest or version tag.")


def audit_network_policies(kubeconfig, nsf: NsFilter):
    """Verify that every in-scope namespace has at least one NetworkPolicy."""
    section("NETWORK POLICIES")

    ns_list = nsf.filter_ns_list(_get_namespaces(kubeconfig))

    netpols    = kubectl(["get", "networkpolicies", "--all-namespaces"],
                         kubeconfig, output="json")
    covered_ns = {np["metadata"]["namespace"]
                  for np in (netpols or {}).get("items", [])}

    for ns in ns_list:
        if ns not in covered_ns:
            finding("MEDIUM", "network-policy",
                    f"Namespace '{ns}' has no NetworkPolicy",
                    "All pods in this namespace can communicate freely.",
                    "Add a default-deny NetworkPolicy and explicit allow rules.")
        else:
            ok(f"Namespace '{ns}' has at least one NetworkPolicy.")

    # Flag policies that allow ingress from the entire internet.
    if netpols and "items" in netpols:
        for np in nsf.filter_items(netpols["items"]):
            ns   = np["metadata"]["namespace"]
            name = np["metadata"]["name"]
            for rule in np.get("spec", {}).get("ingress") or []:
                for frm in rule.get("from") or []:
                    if frm.get("ipBlock", {}).get("cidr") == "0.0.0.0/0":
                        finding("MEDIUM", "network-policy",
                                f"[{ns}/{name}] Ingress from 0.0.0.0/0",
                                "Policy allows ingress from any IP address.",
                                "Restrict CIDR to known source ranges.")


def audit_secrets(kubeconfig, nsf: NsFilter):
    """Look for legacy SA tokens, suspicious secret key names, and plaintext env vars."""
    section("SECRETS & SENSITIVE DATA")

    secrets = kubectl(["get", "secrets", "--all-namespaces"], kubeconfig, output="json")
    if not secrets or "items" not in secrets:
        warn("Could not list secrets (may require elevated permissions).")
        return

    in_scope = nsf.filter_items(secrets["items"])
    info(f"Total secrets in scope: {len(in_scope)}")

    for s in in_scope:
        ns    = s["metadata"]["namespace"]
        name  = s["metadata"]["name"]
        stype = s.get("type", "")
        data  = s.get("data") or {}

        # Long-lived SA token secrets were deprecated in Kubernetes 1.24.
        if stype == "kubernetes.io/service-account-token":
            finding("LOW", "secrets",
                    f"[{ns}/{name}] Legacy SA token secret",
                    "Long-lived SA token secrets are deprecated; prefer bound tokens.",
                    "Remove and rely on projected ServiceAccount tokens.")

        # For Opaque secrets, scan key names for credential-related substrings.
        if stype == "Opaque":
            suspicious = [k for k in data
                          if any(p in k.lower() for p in CREDENTIAL_KEY_PATTERNS)]
            if suspicious:
                finding("INFO", "secrets",
                        f"[{ns}/{name}] Secret contains sensitive-looking keys",
                        f"Keys: {', '.join(suspicious[:5])}",
                        "Ensure secrets are encrypted at rest (EncryptionConfiguration).")

    # Scan pod env vars for hardcoded credentials (secretKeyRef should be used instead).
    pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
    if not pods or "items" not in pods:
        return

    for pod in nsf.filter_items(pods["items"]):
        ns   = pod["metadata"]["namespace"]
        name = pod["metadata"]["name"]
        for c in pod["spec"].get("containers", []):
            for env in c.get("env") or []:
                # Only flag literal "value" fields, not valueFrom references.
                if "value" not in env:
                    continue
                key = env.get("name", "").lower()
                if any(p in key for p in CREDENTIAL_KEY_PATTERNS):
                    finding("HIGH", "secrets",
                            f"[{ns}/{name}] Plaintext sensitive env var '{env['name']}'",
                            "Hardcoded sensitive values in pod env are exposed via the API.",
                            "Use secretKeyRef or a secrets manager instead.")


def audit_api_server(kubeconfig, nsf: NsFilter):
    """
    Test anonymous access and inspect kube-apiserver / etcd process flags.

    kube-system checks are not namespace-filtered because they target
    cluster-level infrastructure that is relevant to every audit scope.
    """
    section("API SERVER & ETCD CONFIGURATION")

    # anonymous access
    anon_pods = kubectl(["auth", "can-i", "list", "pods",
                         "--as=system:anonymous", "-n", "default"],
                        kubeconfig, output="raw")
    if anon_pods and anon_pods.strip().lower() == "yes":
        finding("CRITICAL", "api-server",
                "Anonymous user can list pods in default namespace",
                "API server allows unauthenticated requests.",
                "Set --anonymous-auth=false on kube-apiserver.")
    else:
        ok("Anonymous access to list pods is denied.")

    anon_secrets = kubectl(["auth", "can-i", "get", "secrets",
                            "--as=system:anonymous", "-n", "kube-system"],
                           kubeconfig, output="raw")
    if anon_secrets and anon_secrets.strip().lower() == "yes":
        finding("CRITICAL", "api-server",
                "Anonymous user can read kube-system secrets",
                "Critical secrets (bootstrap tokens, etc.) are exposed.",
                "Set --anonymous-auth=false and review RBAC immediately.")

    # dashboard exposure
    svcs = kubectl(["get", "services", "--all-namespaces"], kubeconfig, output="json")
    if svcs and "items" in svcs:
        for svc in nsf.filter_items(svcs["items"]):
            ns    = svc["metadata"]["namespace"]
            name  = svc["metadata"]["name"]
            stype = svc["spec"].get("type", "ClusterIP")
            if "dashboard" not in name.lower():
                continue
            if stype in ("LoadBalancer", "NodePort"):
                finding("HIGH", "api-server",
                        f"[{ns}/{name}] Kubernetes Dashboard exposed externally",
                        f"Service type: {stype}",
                        "Use kubectl proxy or restrict with NetworkPolicy/authentication.")
            else:
                finding("INFO", "api-server",
                        f"[{ns}/{name}] Kubernetes Dashboard deployed (ClusterIP)",
                        "Ensure access is protected with authentication.",
                        "Use token-based auth and avoid skip-login.")

    # apiserver & etcd startup flags
    kube_pods = kubectl(["get", "pods", "-n", "kube-system"], kubeconfig, output="json")
    if not kube_pods or "items" not in kube_pods:
        return

    for pod in kube_pods["items"]:
        pod_name = pod["metadata"]["name"]
        for c in pod["spec"].get("containers", []):
            cmd = _cmd_str(c)

            if "kube-apiserver" in pod_name or "apiserver" in cmd:
                if "--insecure-port=0" not in cmd and "--insecure-port" in cmd:
                    finding("CRITICAL", "api-server",
                            "kube-apiserver insecure port may be enabled",
                            "The insecure port bypasses authentication and authorisation.",
                            "Set --insecure-port=0 on kube-apiserver.")
                if "--authorization-mode" in cmd and "AlwaysAllow" in cmd:
                    finding("CRITICAL", "api-server",
                            "kube-apiserver uses AlwaysAllow authorisation",
                            "Every request is authorised without checking.",
                            "Use RBAC as the authorisation mode.")
                if "--enable-admission-plugins" in cmd:
                    if "PodSecurityPolicy" not in cmd and "NodeRestriction" not in cmd:
                        finding("MEDIUM", "api-server",
                                "Admission plugins may be missing",
                                "NodeRestriction and security admission plugins are recommended.",
                                "Enable NodeRestriction and appropriate admission controllers.")

            if "etcd" in pod_name:
                if "--client-cert-auth=true" not in cmd:
                    finding("HIGH", "api-server",
                            "etcd may not require client certificate authentication",
                            "etcd without mTLS can be accessed without credentials.",
                            "Set --client-cert-auth=true on etcd.")
                if "--auto-tls=true" in cmd:
                    finding("HIGH", "api-server",
                            "etcd uses auto-TLS (self-signed, no CA validation)",
                            "Auto-TLS does not validate client identity properly.",
                            "Use proper PKI certificates for etcd.")


def audit_service_accounts(kubeconfig, nsf: NsFilter):
    """Detect pods whose service account has a ClusterRoleBinding and token automount on."""
    section("SERVICE ACCOUNTS")

    pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
    if not pods or "items" not in pods:
        return

    # Fetch ClusterRoleBindings once and reuse across all pod checks.
    # cluster-admin bindings
    crbs = kubectl(["get", "clusterrolebindings"], kubeconfig, output="json")
    crb_items = (crbs or {}).get("items", [])

    for pod in nsf.filter_items(pods["items"]):
        ns        = pod["metadata"]["namespace"]
        name      = pod["metadata"]["name"]
        sa        = pod["spec"].get("serviceAccountName", "default")
        automount = pod["spec"].get("automountServiceAccountToken", True)

        # Default SA automount is already covered in the RBAC module.
        if not automount or sa == "default":
            continue

        for crb in crb_items:
            for subj in crb.get("subjects") or []:
                if (subj.get("kind") == "ServiceAccount"
                        and subj.get("name") == sa
                        and subj.get("namespace") == ns):
                    role = crb["roleRef"]["name"]
                    finding("HIGH", "service-accounts",
                            f"[{ns}/{name}] Pod SA '{sa}' has ClusterRoleBinding",
                            f"Role: {role} - token is mounted and reachable from within the pod.",
                            "Scope bindings to namespace roles; disable token automount if possible.")


def audit_ingress_services(kubeconfig, nsf: NsFilter):
    """Identify externally exposed services and Ingress objects lacking TLS."""
    section("INGRESS & EXPOSED SERVICES")

    svcs = kubectl(["get", "services", "--all-namespaces"], kubeconfig, output="json")
    if svcs and "items" in svcs:
        for svc in nsf.filter_items(svcs["items"]):
            ns    = svc["metadata"]["namespace"]
            name  = svc["metadata"]["name"]
            stype = svc["spec"].get("type", "ClusterIP")

            if stype == "NodePort":
                ports = [str(p.get("nodePort", "?"))
                         for p in svc["spec"].get("ports", []) if p.get("nodePort")]
                finding("MEDIUM", "ingress-services",
                        f"[{ns}/{name}] NodePort service",
                        f"Exposed on node ports: {', '.join(ports)}",
                        "Prefer LoadBalancer with firewall rules or use Ingress.")

            if stype == "LoadBalancer":
                lb_ingress = svc["status"].get("loadBalancer", {}).get("ingress", [])
                ips = [i.get("ip") or i.get("hostname", "pending") for i in lb_ingress]
                finding("INFO", "ingress-services",
                        f"[{ns}/{name}] LoadBalancer service",
                        f"External IPs: {', '.join(ips) if ips else 'pending'}",
                        "Ensure only intended services are internet-facing.")

    ingresses = kubectl(["get", "ingresses", "--all-namespaces"],
                        kubeconfig, output="json")
    if not ingresses or "items" not in ingresses:
        return

    for ing in nsf.filter_items(ingresses["items"]):
        ns   = ing["metadata"]["namespace"]
        name = ing["metadata"]["name"]
        spec = ing.get("spec", {})

        if not spec.get("tls"):
            finding("MEDIUM", "ingress-services",
                    f"[{ns}/{name}] Ingress without TLS",
                    "Traffic may be served over plain HTTP.",
                    "Add a TLS section with a valid certificate.")

        for rule in spec.get("rules") or []:
            if not rule.get("host"):
                finding("LOW", "ingress-services",
                        f"[{ns}/{name}] Ingress rule without host (catch-all)",
                        "Catch-all rules may expose unexpected backends.",
                        "Specify explicit host names for all Ingress rules.")


def audit_psa(kubeconfig, nsf: NsFilter):
    """Check PodSecurityPolicies (deprecated) and Pod Security Admission labels."""
    section("POD SECURITY ADMISSION / POD SECURITY POLICIES")

    # PodSecurityPolicy was removed in Kubernetes 1.25.
    psps = kubectl(["get", "podsecuritypolicies"], kubeconfig, output="json")
    if psps and "items" in psps:
        info(f"PodSecurityPolicies found: {len(psps['items'])} (deprecated resource)")
        for psp in psps["items"]:
            name = psp["metadata"]["name"]
            spec = psp.get("spec", {})
            if spec.get("privileged"):
                finding("HIGH", "psa",
                        f"PSP '{name}' allows privileged containers",
                        "Any pod bound to this PSP can run as privileged.",
                        "Migrate to Pod Security Admission and restrict privileged.")
            if spec.get("hostNetwork") or spec.get("hostPID") or spec.get("hostIPC"):
                finding("MEDIUM", "psa",
                        f"PSP '{name}' allows host namespace access",
                        "Pods can access host network/PID/IPC namespaces.",
                        "Disallow host namespaces in PSP spec.")
    else:
        ok("No PodSecurityPolicies found (or not applicable for this cluster version).")

    # Check PSA enforcement labels on each in-scope namespace.
    namespaces = kubectl(["get", "namespaces"], kubeconfig, output="json")
    if not namespaces or "items" not in namespaces:
        return

    ns_map = {n["metadata"]["name"]: n for n in namespaces["items"]}
    for ns_name in nsf.filter_ns_list(list(ns_map)):
        labels  = ns_map[ns_name]["metadata"].get("labels") or {}
        enforce = labels.get("pod-security.kubernetes.io/enforce")
        if not enforce:
            finding("MEDIUM", "psa",
                    f"Namespace '{ns_name}' has no PSA enforce label",
                    "Pod Security Admission is not enforced in this namespace.",
                    "Add pod-security.kubernetes.io/enforce=restricted (or baseline).")
        elif enforce == "privileged":
            finding("HIGH", "psa",
                    f"Namespace '{ns_name}' PSA enforce=privileged",
                    "No pod security restrictions are applied.",
                    "Change the PSA enforce level to 'baseline' or 'restricted'.")
        else:
            ok(f"Namespace '{ns_name}' PSA enforce={enforce}")


def audit_configmaps(kubeconfig, nsf: NsFilter):
    """Scan ConfigMap keys and values for credential-like content."""
    section("CONFIGMAPS - SENSITIVE DATA EXPOSURE")

    cms = kubectl(["get", "configmaps", "--all-namespaces"], kubeconfig, output="json")
    if not cms or "items" not in cms:
        return

    for cm in nsf.filter_items(cms["items"]):
        ns   = cm["metadata"]["namespace"]
        name = cm["metadata"]["name"]
        data = cm.get("data") or {}

        for k, v in data.items():
            # Check the key name first.
            if any(p in k.lower() for p in CREDENTIAL_KEY_PATTERNS):
                finding("MEDIUM", "configmaps",
                        f"[{ns}/{name}] ConfigMap key '{k}' looks sensitive",
                        "Sensitive data should not be stored in ConfigMaps.",
                        "Move to a Kubernetes Secret or external secrets manager.")
                break

            # Then check the value for assignment-style patterns.
            if isinstance(v, str) and len(v) > 10:
                if any(p in v.lower() for p in CREDENTIAL_VALUE_PATTERNS):
                    finding("HIGH", "configmaps",
                            f"[{ns}/{name}] ConfigMap value in key '{k}' may contain credentials",
                            "Credential-like strings found in ConfigMap value.",
                            "Move credentials to Secrets and rotate immediately.")
                    break


def audit_images(kubeconfig, nsf: NsFilter):
    """Flag images from public registries and images not pinned by SHA256 digest."""
    section("IMAGE SECURITY & SUPPLY CHAIN")

    pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
    if not pods or "items" not in pods:
        return

    PUBLIC_REGISTRIES = {"docker.io", "ghcr.io", "quay.io", "gcr.io", "public.ecr.aws"}
    seen: set[str] = set()  # Avoid duplicate findings for the same image.

    for pod in nsf.filter_items(pods["items"]):
        ns   = pod["metadata"]["namespace"]
        name = pod["metadata"]["name"]
        for c in _all_containers(pod):
            image = c.get("image", "")
            if image in seen:
                continue
            seen.add(image)

            if any(reg in image for reg in PUBLIC_REGISTRIES):
                finding("LOW", "images",
                        f"[{ns}/{name}] Image from public registry",
                        f"Image: {image}",
                        "Use a private registry with image scanning enabled.")

            # Without a @sha256: digest a tag can be silently replaced (tag mutation).
            if "@sha256:" not in image:
                finding("LOW", "images",
                        f"[{ns}/{name}] Image not pinned by digest",
                        f"Image: {image}",
                        "Pin images by SHA256 digest for supply-chain integrity.")


def audit_logging(kubeconfig, nsf: NsFilter):
    """Verify API server audit logging flags and detect common monitoring agents."""
    section("LOGGING & AUDIT CONFIGURATION")

    # Inspect kube-apiserver static pod for audit log flags.
    kube_pods = kubectl(["get", "pods", "-n", "kube-system"], kubeconfig, output="json")
    if kube_pods and "items" in kube_pods:
        for pod in kube_pods["items"]:
            for c in pod["spec"].get("containers", []):
                cmd = _cmd_str(c)
                if "kube-apiserver" not in pod["metadata"]["name"] and "apiserver" not in cmd:
                    continue
                if "--audit-log-path" not in cmd:
                    finding("MEDIUM", "logging",
                            "kube-apiserver audit logging not configured",
                            "API requests are not being logged.",
                            "Configure --audit-log-path and --audit-policy-file.")
                else:
                    ok("kube-apiserver audit logging is configured.")
                if "--audit-log-maxage" not in cmd:
                    finding("LOW", "logging",
                            "kube-apiserver audit log max-age not set",
                            "Old audit logs may accumulate or be deleted prematurely.",
                            "Set --audit-log-maxage to comply with your retention policy.")

    # Check whether any well-known monitoring agent is running in the cluster.
    LOG_KEYWORDS = {"fluentd", "fluent-bit", "filebeat", "logstash",
                    "falco", "prometheus", "loki", "grafana"}
    all_pods = kubectl(["get", "pods", "--all-namespaces"], kubeconfig, output="json")
    has_agent = False
    if all_pods and "items" in all_pods:
        for pod in nsf.filter_items(all_pods["items"]):
            if any(kw in pod["metadata"]["name"].lower() for kw in LOG_KEYWORDS):
                has_agent = True
                break

    if has_agent:
        ok("Logging/monitoring agent detected in cluster.")
    else:
        finding("MEDIUM", "logging",
                "No common logging/monitoring agent detected",
                "Falco, Prometheus, Fluentd, etc. were not found.",
                "Deploy a runtime security and logging solution.")


# REPORT LOADING (--input mode)

def load_and_display_report(path: str, severity_filter: Optional[set[str]] = None):
    """
    Load a previously saved JSON report and display it in the same format
    as a live audit, then print the summary counts.
    """
    banner()
    info(f"Loading saved report: {path}")

    try:
        with open(path) as fh:
            report = json.load(fh)
    except FileNotFoundError:
        error(f"File not found: {path}")
        sys.exit(1)
    except json.JSONDecodeError as exc:
        error(f"Invalid JSON in report file: {exc}")
        sys.exit(1)

    info(f"Report generated  : {report.get('generated', 'unknown')}")

    loaded = report.get("findings", [])
    if not loaded:
        warn("No findings found in the report file.")
        print()
        return

    # Group findings by category and display each section.
    by_category: dict[str, list[dict]] = {}
    for f in loaded:
        by_category.setdefault(f.get("category", "uncategorized"), []).append(f)

    for cat, items in by_category.items():
        visible = [f for f in items
                   if severity_filter is None or f.get("severity") in severity_filter]
        if not visible:
            continue
        section(cat.upper())
        for f in visible:
            print_finding_block(f)

    # Summary always reflects the full report totals, not just the filtered view.
    section("SUMMARY REPORT")
    counts = report.get("summary") or {}
    if not counts:
        counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0}
        for f in loaded:
            sev = f.get("severity", "INFO")
            counts[sev] = counts.get(sev, 0) + 1

    total = sum(counts.values())
    print(f"\n  Total findings: {total}")
    for sev, cnt in counts.items():
        bar = "#" * min(cnt, 40)
        print(f"  {sev:<10} {bar} {cnt}")
    print()


# SUMMARY REPORT

def print_summary(output_file: Optional[str] = None):
    """Print the counts-per-severity table and optionally save the report to JSON."""
    section("SUMMARY REPORT")

    counts = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "INFO": 0}
    for f in findings:
        counts[f["severity"]] = counts.get(f["severity"], 0) + 1

    total = sum(counts.values())
    print(f"\n  Total findings: {total}")
    for sev, cnt in counts.items():
        bar = "#" * min(cnt, 40)
        print(f"  {sev:<10} {bar} {cnt}")

    if output_file:
        report = {
            "generated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f" ({time.tzname[time.daylight]})",
            "summary":   counts,
            "findings":  findings,
        }
        with open(output_file, "w") as fh:
            json.dump(report, fh, indent=2)
        print(f"\n  JSON report saved to: {output_file}")

    print()


# CLI

MODULES = {
    "cluster-info":     audit_cluster_info,
    "rbac":             audit_rbac,
    "pod-security":     audit_pod_security,
    "network-policy":   audit_network_policies,
    "secrets":          audit_secrets,
    "api-server":       audit_api_server,
    "service-accounts": audit_service_accounts,
    "ingress":          audit_ingress_services,
    "psa":              audit_psa,
    "configmaps":       audit_configmaps,
    "images":           audit_images,
    "logging":          audit_logging,
}


def main():
    parser = argparse.ArgumentParser(
        prog="k8s_audit.py",
        description="Kubernetes Security Audit Tool - read-only enumeration / pentesting",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=(
            "Available audit modules:\n"
            "  cluster-info\n"
            "  rbac\n"
            "  pod-security\n"
            "  network-policy\n"
            "  secrets\n"
            "  api-server\n"
            "  service-accounts\n"
            "  ingress\n"
            "  psa\n"
            "  configmaps\n"
            "  images\n"
            "  logging\n"
            "\n"
            "Severity levels:\n"
            "  CRITICAL  Immediate risk (e.g. anonymous cluster-admin access)\n"
            "  HIGH      Serious misconfiguration (e.g. privileged container)\n"
            "  MEDIUM    Notable risk requiring attention\n"
            "  LOW       Minor hardening gap\n"
            "  INFO      Informational, no direct risk\n"
            "\n"
            "Examples:\n"
            "  %(prog)s\n"
            "  %(prog)s --quiet\n"
            "  %(prog)s --modules rbac pod-security\n"
            "  %(prog)s --namespaces default kube-system\n"
            "  %(prog)s --exclude-namespaces kube-system kube-public\n"
            "  %(prog)s --kubeconfig /path/to/config --output report.json\n"
            "  %(prog)s --input report.json\n"
            "  %(prog)s --input report.json --severity CRITICAL HIGH\n"
        ),
    )

    parser.add_argument(
        "--kubeconfig", "-k", metavar="FILE", default=None,
        help="Path to kubeconfig file (default: $KUBECONFIG or ~/.kube/config)",
    )
    parser.add_argument(
        "--modules", "-m", nargs="+", metavar="MODULE",
        choices=list(MODULES.keys()), default=None,
        help="Run specific audit module(s). Omit for full audit.",
    )
    parser.add_argument(
        "--output", "-o", metavar="FILE", default=None,
        help="Save findings to a JSON report file after a live audit.",
    )
    parser.add_argument(
        "--input", "-i", metavar="FILE", default=None,
        help="Load and display a previously saved JSON report (no cluster connection).",
    )
    parser.add_argument(
        "--severity", "-s", nargs="+", metavar="LEVEL", default=None,
        choices=["CRITICAL", "HIGH", "MEDIUM", "LOW", "INFO"],
        help="Filter output by severity level (see epilog for valid levels).",
    )
    parser.add_argument(
        "--quiet", "-q", action="store_true", default=False,
        help="Suppress all finding output during the audit; only print the summary at the end.",
    )

    ns_group = parser.add_mutually_exclusive_group()
    ns_group.add_argument(
        "--namespaces", "-n", nargs="+", metavar="NS", default=None,
        help="Audit only these namespaces (space-separated).",
    )
    ns_group.add_argument(
        "--exclude-namespaces", "-e", nargs="+", metavar="NS", default=None,
        help="Skip these namespaces; audit all others (space-separated).",
    )

    args = parser.parse_args()

    # Report-loading mode: display a saved JSON report then exit.
    if args.input:
        load_and_display_report(
            args.input,
            severity_filter=set(args.severity) if args.severity else None,
        )
        sys.exit(0)

    # Live audit mode.
    global severity_filter, quiet_mode
    severity_filter = set(args.severity) if args.severity else None
    quiet_mode      = args.quiet

    nsf = NsFilter(include=args.namespaces, exclude=args.exclude_namespaces)

    banner()
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + f" ({time.tzname[time.daylight]})"
    info(f"Audit started    : {now}")
    kc = args.kubeconfig or os.environ.get("KUBECONFIG", "~/.kube/config")
    info(f"Using kubeconfig : {kc}{'' if args.kubeconfig else ' (default)'}")
    info(f"Namespace scope  : {nsf.describe()}")
    info(f"Severity filter  : {', '.join(sorted(severity_filter)) if severity_filter else 'all'}")
    info(f"Quiet mode       : {'on (only summary will be shown)' if quiet_mode else 'off'}")

    selected = args.modules or list(MODULES.keys())
    info(f"Modules to run   : {', '.join(selected)}\n")

    for mod_name in selected:
        try:
            MODULES[mod_name](args.kubeconfig, nsf)
        except PermissionError as exc:
            error(f"[{mod_name}] Blocked write attempt: {exc}")
        except Exception as exc:
            error(f"[{mod_name}] Unexpected error: {exc}")

    print_summary(args.output)


if __name__ == "__main__":
    main()