From 3406c5ec64afff44d5c71c1aa8fc56fd96715723 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pedro=20Mart=C3=ADn?= Date: Mon, 20 Apr 2026 17:22:05 +0200 Subject: [PATCH] chore(skills): improve prowler-compliance (#10627) --- skills/prowler-compliance/SKILL.md | 578 +++++++++++++++++- .../assets/audit_framework_template.py | 207 +++++++ .../assets/build_inventory.py | 100 +++ .../assets/configs/ccc.yaml | 120 ++++ .../prowler-compliance/assets/dump_section.py | 92 +++ .../assets/parsers/__init__.py | 0 .../assets/parsers/finos_ccc.py | 223 +++++++ .../prowler-compliance/assets/query_checks.py | 86 +++ .../assets/sync_framework.py | 536 ++++++++++++++++ 9 files changed, 1932 insertions(+), 10 deletions(-) create mode 100644 skills/prowler-compliance/assets/audit_framework_template.py create mode 100644 skills/prowler-compliance/assets/build_inventory.py create mode 100644 skills/prowler-compliance/assets/configs/ccc.yaml create mode 100644 skills/prowler-compliance/assets/dump_section.py create mode 100644 skills/prowler-compliance/assets/parsers/__init__.py create mode 100644 skills/prowler-compliance/assets/parsers/finos_ccc.py create mode 100644 skills/prowler-compliance/assets/query_checks.py create mode 100644 skills/prowler-compliance/assets/sync_framework.py diff --git a/skills/prowler-compliance/SKILL.md b/skills/prowler-compliance/SKILL.md index 1853d23d8b..51c68eb05f 100644 --- a/skills/prowler-compliance/SKILL.md +++ b/skills/prowler-compliance/SKILL.md @@ -1,16 +1,28 @@ --- name: prowler-compliance description: > - Creates and manages Prowler compliance frameworks. - Trigger: When working with compliance frameworks (CIS, NIST, PCI-DSS, SOC2, GDPR, ISO27001, ENS, MITRE ATT&CK). + Creates, syncs, audits and manages Prowler compliance frameworks end-to-end. + Covers the four-layer architecture (SDK models → JSON catalogs → output + formatters → API/UI), upstream sync workflows, cloud-auditor check-mapping + reviews, output formatter creation, and framework-specific attribute models. + Trigger: When working with compliance frameworks (CIS, NIST, PCI-DSS, SOC2, + GDPR, ISO27001, ENS, MITRE ATT&CK, CCC, C5, CSA CCM, KISA ISMS-P, + Prowler ThreatScore, FedRAMP, HIPAA), syncing with upstream catalogs, + auditing check-to-requirement mappings, adding output formatters, or fixing + compliance JSON bugs (duplicate IDs, empty Version, wrong Section, stale + check refs). license: Apache-2.0 metadata: author: prowler-cloud - version: "1.1" + version: "1.2" scope: [root, sdk] auto_invoke: - "Creating/updating compliance frameworks" - "Mapping checks to compliance controls" + - "Syncing compliance framework with upstream catalog" + - "Auditing check-to-requirement mappings as a cloud auditor" + - "Adding a compliance output formatter (per-provider class + table dispatcher)" + - "Fixing compliance JSON bugs (duplicate IDs, empty Section, stale refs)" allowed-tools: Read, Edit, Write, Glob, Grep, Bash, WebFetch, WebSearch, Task --- @@ -18,10 +30,82 @@ allowed-tools: Read, Edit, Write, Glob, Grep, Bash, WebFetch, WebSearch, Task Use this skill when: - Creating a new compliance framework for any provider +- **Syncing an existing framework with an upstream source of truth** (CIS, FINOS CCC, CSA CCM, NIST, ENS, etc.) - Adding requirements to existing frameworks - Mapping checks to compliance controls +- **Auditing existing check mappings as a cloud auditor** (user asks "are these mappings correct?", "which checks apply to this requirement?", "review the mappings") +- **Adding a new output formatter** (new framework needs a table dispatcher + per-provider classes + CSV models) +- **Fixing JSON bugs**: duplicate IDs, empty Version, wrong Section, stale check refs, inconsistent FamilyName, padded tangential check mappings +- **Registering a framework in the CLI table dispatcher or API export map** +- Investigating why a finding/check isn't showing under the expected compliance framework in the UI - Understanding compliance framework structures and attributes +## Four-Layer Architecture (Mental Model) + +Prowler compliance is a **four-layer system** hanging off one Pydantic model tree. Bugs usually happen where one layer doesn't match another, so know all four before touching anything. + +### Layer 1: SDK / Core Models — `prowler/lib/check/` + +- **`compliance_models.py`** — Pydantic **v1** model tree (`from pydantic.v1 import`). One `*_Requirement_Attribute` class per framework type + `Generic_Compliance_Requirement_Attribute` as fallback. +- `Compliance_Requirement.Attributes: list[Union[...]]` — **`Generic_Compliance_Requirement_Attribute` MUST be LAST** in the Union or every framework-specific attribute falls through to Generic (Pydantic v1 tries union members in order). +- **`compliance.py`** — runtime linker. `get_check_compliance()` builds the key as `f"{Framework}-{Version}"` **only if `Version` is non-empty**. An empty Version makes the key just `"{Framework}"` — this breaks downstream filters and tests that expect the versioned key. +- `Compliance.get_bulk(provider)` walks `prowler/compliance/{provider}/` and parses every `.json` file. No central index — just directory scan. + +### Layer 2: JSON Frameworks — `prowler/compliance/{provider}/` + +See "Compliance Framework Location" and "Framework-Specific Attribute Structures" sections below. + +### Layer 3: Output Formatters — `prowler/lib/outputs/compliance/{framework}/` + +**Every framework directory follows this exact convention** — do not deviate: + +``` +{framework}/ +├── __init__.py +├── {framework}.py # ONLY get_{framework}_table() — NO function docstring +├── {framework}_{provider}.py # One class per provider (e.g., CCC_AWS, CCC_Azure, CCC_GCP) +└── models.py # One Pydantic v2 BaseModel per provider (CSV columns) +``` + +- **`{framework}.py`** holds the **table dispatcher function** `get_{framework}_table()`. It prints the pass/fail/muted summary table. **Must NOT import `Finding` or `ComplianceOutput`** — doing so creates a circular import with `prowler/lib/outputs/compliance/compliance.py`. Only imports: `colorama`, `tabulate`, `prowler.config.config.orange_color`. +- **`{framework}_{provider}.py`** holds a per-provider class like `CCC_AWS(ComplianceOutput)` with a `transform()` method that walks findings and emits rows. This file IS allowed to import `Finding` because it's not on the dispatcher import chain. +- **`models.py`** holds one Pydantic v2 `BaseModel` per provider. Field names become CSV column headers (**public API** — renaming breaks downstream consumers). +- **Never collapse per-provider files into a unified parameterized class**, even when DRY-tempting. Every framework in Prowler follows the per-provider file pattern and reviewers will reject the refactor. CSV columns differ per provider (`AccountId`/`Region` vs `SubscriptionId`/`Location` vs `ProjectId`/`Location`) — three classes is the convention. +- **No function docstring on `get_{framework}_table()`** — no other framework has one; stay consistent. +- Register in `prowler/lib/outputs/compliance/compliance.py` → `display_compliance_table()` with an `elif compliance_framework.startswith("{framework}_"):` branch. Import the table function at the top of the file. + +### Layer 4: API / UI + +- **API table dispatcher**: `api/src/backend/tasks/jobs/export.py` → `COMPLIANCE_CLASS_MAP` keyed by provider. Uses `startswith` predicates: `(lambda name: name.startswith("ccc_"), CCC_AWS)`. **Never use exact match** (`name == "ccc_aws"`) — it's inconsistent and breaks versioning. +- **API lazy loader**: `api/src/backend/api/compliance.py` — `LazyComplianceTemplate` and `LazyChecksMapping` load compliance per provider on first access. +- **UI mapper routing**: `ui/lib/compliance/compliance-mapper.ts` routes framework names → per-framework mapper. +- **UI per-framework mapper**: `ui/lib/compliance/{framework}.tsx` flattens `Requirements` into a 3-level tree (Framework → Category → Control → Requirement) for the accordion view. Groups by `Attributes[0].FamilyName` and `Attributes[0].Section`. +- **UI detail panel**: `ui/components/compliance/compliance-custom-details/{framework}-details.tsx`. +- **UI types**: `ui/types/compliance.ts` — TypeScript mirrors of the attribute metadata. + +### The CLI Pipeline (end-to-end) + +``` +prowler aws --compliance ccc_aws + ↓ +Compliance.get_bulk("aws") → parses prowler/compliance/aws/*.json + ↓ +update_checks_metadata_with_compliance() → attaches compliance info to CheckMetadata + ↓ +execute_checks() → runs checks, produces Finding objects + ↓ +get_check_compliance(finding, "aws", bulk_checks_metadata) + → dict "{Framework}-{Version}" → [requirement_ids] + ↓ +CCC_AWS(findings, compliance).transform() → per-provider class builds CSV rows + ↓ +batch_write_data_to_file() → writes {output_filename}_ccc_aws.csv + ↓ +display_compliance_table() → get_ccc_table() → prints stdout summary +``` + +--- + ## Compliance Framework Location Frameworks are JSON files located in: `prowler/compliance/{provider}/{framework_name}_{provider}.json` @@ -455,14 +539,453 @@ Prowler ThreatScore is a custom security scoring framework developed by Prowler - **M365:** `cis_4.0_m365.json`, `iso27001_2022_m365.json` - **NHN:** `iso27001_2022_nhn.json` +## Workflow A: Sync a Framework With an Upstream Catalog + +Use when the framework is maintained upstream (CIS Benchmarks, FINOS CCC, CSA CCM, NIST, ENS, etc.) and Prowler needs to catch up. + +### Step 1 — Cache the upstream source + +Download every upstream file to a local cache so subsequent iterations don't hit the network. For FINOS CCC: + +```bash +mkdir -p /tmp/ccc_upstream +catalogs="core/ccc storage/object management/auditlog management/logging ..." +for p in $catalogs; do + safe=$(echo "$p" | tr '/' '_') + gh api "repos/finos/common-cloud-controls/contents/catalogs/$p/controls.yaml" \ + -H "Accept: application/vnd.github.raw" > "/tmp/ccc_upstream/${safe}.yaml" +done +``` + +### Step 2 — Run the generic sync runner against a framework config + +The sync tooling is split into three layers so adding a new framework only takes a YAML config (and optionally a new parser module for an unfamiliar upstream format): + +``` +skills/prowler-compliance/assets/ +├── sync_framework.py # generic runner — works for any framework +├── configs/ +│ └── ccc.yaml # per-framework config (canonical example) +└── parsers/ + ├── __init__.py + └── finos_ccc.py # parser module for FINOS CCC YAML +``` + +**For frameworks that already have a config + parser** (today: FINOS CCC), run: + +```bash +python skills/prowler-compliance/assets/sync_framework.py \ + skills/prowler-compliance/assets/configs/ccc.yaml +``` + +The runner loads the config, validates it, dynamically imports the parser declared in `parser.module`, calls `parser.parse_upstream(config) -> list[dict]`, then applies generic post-processing (id uniqueness safety net, `FamilyName` normalization, legacy check-mapping preservation) and writes the provider JSONs. + +**To add a new framework sync**: + +1. **Write a config file** at `skills/prowler-compliance/assets/configs/{framework}.yaml`. See `configs/ccc.yaml` as the canonical example. Required top-level sections: + - `framework` — `name`, `display_name`, `version` (**never empty** — empty Version silently breaks `get_check_compliance()` key construction, so the runner refuses to start), `description_template` (accepts `{provider_display}`, `{provider_key}`, `{framework_name}`, `{framework_display}`, `{version}` placeholders). + - `providers` — list of `{key, display}` pairs, one per Prowler provider the framework targets. + - `output.path_template` — supports `{provider}`, `{framework}`, `{version}` placeholders. Examples: `"prowler/compliance/{provider}/ccc_{provider}.json"` for unversioned file names, `"prowler/compliance/{provider}/cis_{version}_{provider}.json"` for versioned ones. + - `upstream.dir` — local cache directory (populate via Step 1). + - `parser.module` — name of the module under `parsers/` to load (without `.py`). Everything else under `parser.` is opaque to the runner and passed to the parser as config. + - `post_processing.check_preservation.primary_key` — top-level field name for the primary legacy-mapping lookup (almost always `Id`). + - `post_processing.check_preservation.fallback_keys` — **config-driven fallback keys** for preserving check mappings when ids change. Each entry is a list of `Attributes[0]` field names composed into a tuple. Examples: + - CCC: `- [Section, Applicability]` (because `Applicability` is a CCC-only attribute, verified in `compliance_models.py:213`). + - CIS would use `- [Section, Profile]`. + - NIST would use `- [ItemId]`. + - List-valued fields (like `Applicability`) are automatically frozen to `frozenset` so the tuple is hashable. + - `post_processing.family_name_normalization` (optional) — map of raw → canonical `FamilyName` values. The UI groups by `Attributes[0].FamilyName` exactly, so inconsistent upstream variants otherwise become separate tree branches. + +2. **Reuse an existing parser** if the upstream format matches one (currently only `finos_ccc` exists). Otherwise, **write a new parser** at `parsers/{name}.py` implementing: + + ```python + def parse_upstream(config: dict) -> list[dict]: + """Return Prowler-format requirements {Id, Description, Attributes: [...], Checks: []}. + + Ids MUST be unique in the returned list. The runner raises ValueError + on duplicates — it does NOT silently renumber, because mutating a + canonical upstream id (e.g. CIS '1.1.1' or NIST 'AC-2(1)') would be + catastrophic. The parser owns all upstream-format quirks: foreign-prefix + rewriting, genuine collision renumbering, shape handling. + """ + ``` + + The parser reads its own settings from `config['upstream']` and `config['parser']`. It does NOT load existing Prowler JSONs (the runner does that for check preservation) and does NOT write output (the runner does that too). + +**Gotchas the runner already handles for you** (learned from the FINOS CCC v2025.10 sync — they're documented here so you don't re-discover them): + +- **Multiple upstream YAML shapes**. Most FINOS CCC catalogs use `control-families: [...]`, but `storage/object` uses a top-level `controls: [...]` with a `family: "CCC.X.Y"` reference id and no human-readable family name. A parser that only handles shape 1 silently drops the shape-2 catalog — this exact bug dropped ObjStor from Prowler for a full iteration. `parsers/finos_ccc.py` handles both shapes; if you write a new parser for a similar format, test with at least one file of each shape. +- **Whitespace collapse**. Upstream YAML multi-line block scalars (`|`) preserve newlines. Prowler stores descriptions single-line. Collapse with `" ".join(value.split())` before emitting (see `parsers/finos_ccc.py::clean()`). +- **Foreign-prefix AR id rewriting**. Upstream sometimes aliases requirements across catalogs by keeping the original prefix (e.g., `CCC.AuditLog.CN08.AR01` appears nested under `CCC.Logging.CN03`). Rewrite the foreign id to fit its parent control: `CCC.Logging.CN03.AR01`. This logic is parser-specific because the id structure varies per framework (CCC uses 3-dot depth; CIS uses numeric dots; NIST uses `AC-2(1)`). +- **Genuine upstream collision renumbering**. Sometimes upstream has a real typo where two different requirements share the same id (e.g., `CCC.Core.CN14.AR02` defined twice for 30-day and 14-day backup variants). Renumber the second copy to the next free AR number (`.AR03`). The parser handles this; the runner asserts the final list has unique ids as a safety net. +- **Existing check mapping preservation**. The runner uses the `primary_key` + `fallback_keys` declared in config to look up the old `Checks` list for each requirement. For CCC this means primary index by `Id` plus fallback index by `(Section, frozenset(Applicability))` — the fallback recovers mappings for requirements whose ids were rewritten or renumbered by the parser. +- **FamilyName normalization**. Configured via `post_processing.family_name_normalization` — no code changes needed to collapse upstream variants like `"Logging & Monitoring"` → `"Logging and Monitoring"`. +- **Populate `Version`**. The runner refuses to start on empty `framework.version` — fail-fast replaces the silent bug where `get_check_compliance()` would build the key as just `"{Framework}"`. + +### Step 3 — Validate before committing + +```python +from prowler.lib.check.compliance_models import Compliance +for prov in ['aws', 'azure', 'gcp']: + c = Compliance.parse_file(f"prowler/compliance/{prov}/ccc_{prov}.json") + print(f"{prov}: {len(c.Requirements)} reqs, version={c.Version}") +``` + +Any `ValidationError` means the Attribute fields don't match the `*_Requirement_Attribute` model. Either fix the JSON or extend the model in `compliance_models.py` (remember: Generic stays last). + +### Step 4 — Verify every check id exists + +```python +import json +from pathlib import Path +for prov in ['aws', 'azure', 'gcp']: + existing = {p.stem.replace('.metadata','') + for p in Path(f'prowler/providers/{prov}/services').rglob('*.metadata.json')} + with open(f'prowler/compliance/{prov}/ccc_{prov}.json') as f: + data = json.load(f) + refs = {c for r in data['Requirements'] for c in r['Checks']} + missing = refs - existing + assert not missing, f"{prov} missing: {missing}" +``` + +A stale check id silently becomes dead weight — no finding will ever map to it. This pre-validation **must run on every write**; bake it into the generator script. + +### Step 5 — Add an attribute model if needed + +Only if the framework has fields beyond `Generic_Compliance_Requirement_Attribute`. Add the class to `prowler/lib/check/compliance_models.py` and register it in `Compliance_Requirement.Attributes: list[Union[...]]`. **Generic stays last.** + +--- + +## Workflow B: Audit Check Mappings as a Cloud Auditor + +Use when the user asks to review existing mappings ("are these correct?", "verify that the checks apply", "audit the CCC mappings"). This is the highest-value compliance task — it surfaces padded mappings with zero actual coverage and missing mappings for legitimate coverage. + +### The golden rule + +> A Prowler check's title/risk MUST **literally describe what the requirement text says**. "Related" is not enough. If no check actually addresses the requirement, leave `Checks: []` (MANUAL) — **honest MANUAL is worth more than padded coverage**. + +### Audit process + +**Step 1 — Build a per-provider check inventory** (cache in `/tmp/`): + +```python +import json +from pathlib import Path +for provider in ['aws', 'azure', 'gcp']: + inv = {} + for meta in Path(f'prowler/providers/{provider}/services').rglob('*.metadata.json'): + with open(meta) as f: + d = json.load(f) + cid = d.get('CheckID') or meta.stem.replace('.metadata','') + inv[cid] = { + 'service': d.get('ServiceName', ''), + 'title': d.get('CheckTitle', ''), + 'risk': d.get('Risk', ''), + 'description': d.get('Description', ''), + } + with open(f'/tmp/checks_{provider}.json', 'w') as f: + json.dump(inv, f, indent=2) +``` + +**Step 2 — Keyword/service query helper** — see [assets/query_checks.py](assets/query_checks.py): + +```bash +python assets/query_checks.py aws encryption transit # keyword AND-search +python assets/query_checks.py aws --service iam # all iam checks +python assets/query_checks.py aws --id kms_cmk_rotation_enabled # full metadata +``` + +**Step 3 — Dump a framework section with current mappings** — see [assets/dump_section.py](assets/dump_section.py): + +```bash +python assets/dump_section.py ccc "CCC.Core." # all Core ARs across 3 providers +python assets/dump_section.py ccc "CCC.AuditLog." # all AuditLog ARs +``` + +**Step 4 — Encode explicit REPLACE decisions** — see [assets/audit_framework_template.py](assets/audit_framework_template.py). Structure: + +```python +DECISIONS = {} + +DECISIONS["CCC.Core.CN01.AR01"] = { + "aws": [ + "cloudfront_distributions_https_enabled", + "cloudfront_distributions_origin_traffic_encrypted", + # ... + ], + "azure": [ + "storage_secure_transfer_required_is_enabled", + "app_minimum_tls_version_12", + # ... + ], + "gcp": [ + "cloudsql_instance_ssl_connections", + ], + # Missing provider key = leave the legacy mapping untouched +} + +# Empty list = EXPLICITLY MANUAL (overwrites legacy) +DECISIONS["CCC.Core.CN01.AR07"] = { + "aws": [], # Prowler has no IANA port/protocol check + "azure": [], + "gcp": [], +} +``` + +**REPLACE, not PATCH.** Encoding every mapping as a full list (not add/remove delta) makes the audit reproducible and surfaces hidden assumptions from the legacy data. + +**Step 5 — Pre-validation**. The audit script MUST validate every check id against the inventory and **abort with stderr listing typos**. Common typos caught during a real audit: + +- `fsx_file_system_encryption_at_rest_using_kms` (doesn't exist) +- `cosmosdb_account_encryption_at_rest_with_cmk` (doesn't exist) +- `sqlserver_geo_replication` (doesn't exist) +- `redshift_cluster_audit_logging` (should be `redshift_cluster_encrypted_at_rest`) +- `postgresql_flexible_server_require_secure_transport` (should be `postgresql_flexible_server_enforce_ssl_enabled`) +- `storage_secure_transfer_required_enabled` (should be `storage_secure_transfer_required_is_enabled`) +- `sqlserver_minimum_tls_version_12` (should be `sqlserver_recommended_minimal_tls_version`) + +**Step 6 — Apply + validate + test**: + +```bash +python /path/to/audit_script.py # applies decisions, pre-validates +python -m pytest tests/lib/outputs/compliance/ tests/lib/check/ -q +``` + +### Audit Reference Table: Requirement Text → Prowler Checks + +Use this table to map CCC-style / NIST-style / ISO-style requirements to the checks that actually verify them. Built from a real audit of 172 CCC ARs × 3 providers. + +| Requirement text | AWS checks | Azure checks | GCP checks | +|---|---|---|---| +| **TLS in transit enforced** | `cloudfront_distributions_https_enabled`, `s3_bucket_secure_transport_policy`, `elbv2_ssl_listeners`, `elbv2_insecure_ssl_ciphers`, `elb_ssl_listeners`, `elb_insecure_ssl_ciphers`, `opensearch_service_domains_https_communications_enforced`, `rds_instance_transport_encrypted`, `redshift_cluster_in_transit_encryption_enabled`, `elasticache_redis_cluster_in_transit_encryption_enabled`, `dynamodb_accelerator_cluster_in_transit_encryption_enabled`, `dms_endpoint_ssl_enabled`, `kafka_cluster_in_transit_encryption_enabled`, `transfer_server_in_transit_encryption_enabled`, `glue_database_connections_ssl_enabled`, `sns_subscription_not_using_http_endpoints` | `storage_secure_transfer_required_is_enabled`, `storage_ensure_minimum_tls_version_12`, `postgresql_flexible_server_enforce_ssl_enabled`, `mysql_flexible_server_ssl_connection_enabled`, `mysql_flexible_server_minimum_tls_version_12`, `sqlserver_recommended_minimal_tls_version`, `app_minimum_tls_version_12`, `app_ensure_http_is_redirected_to_https`, `app_ftp_deployment_disabled` | `cloudsql_instance_ssl_connections` (almost only option) | +| **TLS 1.3 specifically** | Partial: `cloudfront_distributions_using_deprecated_ssl_protocols`, `elb*_insecure_ssl_ciphers`, `*_minimum_tls_version_12` | Partial: `*_minimum_tls_version_12` checks | None — accept as MANUAL | +| **SSH / port 22 hardening** | `ec2_instance_port_ssh_exposed_to_internet`, `ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22`, `ec2_networkacl_allow_ingress_tcp_port_22` | `network_ssh_internet_access_restricted`, `vm_linux_enforce_ssh_authentication` | `compute_firewall_ssh_access_from_the_internet_allowed`, `compute_instance_block_project_wide_ssh_keys_disabled`, `compute_project_os_login_enabled`, `compute_project_os_login_2fa_enabled` | +| **mTLS (mutual TLS)** | `kafka_cluster_mutual_tls_authentication_enabled`, `apigateway_restapi_client_certificate_enabled` | `app_client_certificates_on` | None — MANUAL | +| **Data at rest encrypted** | `s3_bucket_default_encryption`, `s3_bucket_kms_encryption`, `ec2_ebs_default_encryption`, `ec2_ebs_volume_encryption`, `rds_instance_storage_encrypted`, `rds_cluster_storage_encrypted`, `rds_snapshots_encrypted`, `dynamodb_tables_kms_cmk_encryption_enabled`, `redshift_cluster_encrypted_at_rest`, `neptune_cluster_storage_encrypted`, `documentdb_cluster_storage_encrypted`, `opensearch_service_domains_encryption_at_rest_enabled`, `kinesis_stream_encrypted_at_rest`, `firehose_stream_encrypted_at_rest`, `sns_topics_kms_encryption_at_rest_enabled`, `sqs_queues_server_side_encryption_enabled`, `efs_encryption_at_rest_enabled`, `athena_workgroup_encryption`, `glue_data_catalogs_metadata_encryption_enabled`, `backup_vaults_encrypted`, `backup_recovery_point_encrypted`, `cloudtrail_kms_encryption_enabled`, `cloudwatch_log_group_kms_encryption_enabled`, `eks_cluster_kms_cmk_encryption_in_secrets_enabled`, `sagemaker_notebook_instance_encryption_enabled`, `apigateway_restapi_cache_encrypted`, `kafka_cluster_encryption_at_rest_uses_cmk`, `dynamodb_accelerator_cluster_encryption_enabled`, `storagegateway_fileshare_encryption_enabled` | `storage_infrastructure_encryption_is_enabled`, `storage_ensure_encryption_with_customer_managed_keys`, `vm_ensure_attached_disks_encrypted_with_cmk`, `vm_ensure_unattached_disks_encrypted_with_cmk`, `sqlserver_tde_encryption_enabled`, `sqlserver_tde_encrypted_with_cmk`, `databricks_workspace_cmk_encryption_enabled`, `monitor_storage_account_with_activity_logs_cmk_encrypted` | `compute_instance_encryption_with_csek_enabled`, `dataproc_encrypted_with_cmks_disabled`, `bigquery_dataset_cmk_encryption`, `bigquery_table_cmk_encryption` | +| **CMEK required (customer-managed keys)** | `kms_cmk_are_used` | `storage_ensure_encryption_with_customer_managed_keys`, `vm_ensure_attached_disks_encrypted_with_cmk`, `vm_ensure_unattached_disks_encrypted_with_cmk`, `sqlserver_tde_encrypted_with_cmk`, `databricks_workspace_cmk_encryption_enabled` | `bigquery_dataset_cmk_encryption`, `bigquery_table_cmk_encryption`, `dataproc_encrypted_with_cmks_disabled`, `compute_instance_encryption_with_csek_enabled` | +| **Key rotation enabled** | `kms_cmk_rotation_enabled` | `keyvault_key_rotation_enabled`, `storage_key_rotation_90_days` | `kms_key_rotation_enabled` | +| **MFA for UI access** | `iam_root_mfa_enabled`, `iam_root_hardware_mfa_enabled`, `iam_user_mfa_enabled_console_access`, `iam_user_hardware_mfa_enabled`, `iam_administrator_access_with_mfa`, `cognito_user_pool_mfa_enabled` | `entra_privileged_user_has_mfa`, `entra_non_privileged_user_has_mfa`, `entra_user_with_vm_access_has_mfa`, `entra_security_defaults_enabled` | `compute_project_os_login_2fa_enabled` | +| **API access / credentials** | `iam_no_root_access_key`, `iam_user_no_setup_initial_access_key`, `apigateway_restapi_authorizers_enabled`, `apigateway_restapi_public_with_authorizer`, `apigatewayv2_api_authorizers_enabled` | `entra_conditional_access_policy_require_mfa_for_management_api`, `app_function_access_keys_configured`, `app_function_identity_is_configured` | `apikeys_api_restrictions_configured`, `apikeys_key_exists`, `apikeys_key_rotated_in_90_days` | +| **Log all admin/config changes** | `cloudtrail_multi_region_enabled`, `cloudtrail_multi_region_enabled_logging_management_events`, `cloudtrail_cloudwatch_logging_enabled`, `cloudtrail_log_file_validation_enabled`, `cloudwatch_log_metric_filter_*`, `cloudwatch_changes_to_*_alarm_configured`, `config_recorder_all_regions_enabled` | `monitor_diagnostic_settings_exists`, `monitor_diagnostic_setting_with_appropriate_categories`, `monitor_alert_*` | `iam_audit_logs_enabled`, `logging_log_metric_filter_and_alert_for_*`, `logging_sink_created` | +| **Log integrity (digital signatures)** | `cloudtrail_log_file_validation_enabled` (exact) | None | None | +| **Public access denied** | `s3_bucket_public_access`, `s3_bucket_public_list_acl`, `s3_bucket_public_write_acl`, `s3_account_level_public_access_blocks`, `apigateway_restapi_public`, `awslambda_function_url_public`, `awslambda_function_not_publicly_accessible`, `rds_instance_no_public_access`, `rds_snapshots_public_access`, `ec2_securitygroup_allow_ingress_from_internet_to_all_ports`, `sns_topics_not_publicly_accessible`, `sqs_queues_not_publicly_accessible` | `storage_blob_public_access_level_is_disabled`, `storage_ensure_private_endpoints_in_storage_accounts`, `containerregistry_not_publicly_accessible`, `keyvault_private_endpoints`, `app_function_not_publicly_accessible`, `aks_clusters_public_access_disabled`, `network_http_internet_access_restricted` | `cloudstorage_bucket_public_access`, `compute_instance_public_ip`, `cloudsql_instance_public_ip`, `compute_firewall_*_access_from_the_internet_allowed` | +| **IAM least privilege** | `iam_*_no_administrative_privileges`, `iam_policy_allows_privilege_escalation`, `iam_inline_policy_allows_privilege_escalation`, `iam_role_administratoraccess_policy`, `iam_group_administrator_access_policy`, `iam_user_administrator_access_policy`, `iam_policy_attached_only_to_group_or_roles`, `iam_role_cross_service_confused_deputy_prevention` | `iam_role_user_access_admin_restricted`, `iam_subscription_roles_owner_custom_not_created`, `iam_custom_role_has_permissions_to_administer_resource_locks` | `iam_sa_no_administrative_privileges`, `iam_no_service_roles_at_project_level`, `iam_role_kms_enforce_separation_of_duties`, `iam_role_sa_enforce_separation_of_duties` | +| **Password policy** | `iam_password_policy_minimum_length_14`, `iam_password_policy_uppercase`, `iam_password_policy_lowercase`, `iam_password_policy_symbol`, `iam_password_policy_number`, `iam_password_policy_expires_passwords_within_90_days_or_less`, `iam_password_policy_reuse_24` | None | None | +| **Credential rotation / unused** | `iam_rotate_access_key_90_days`, `iam_user_accesskey_unused`, `iam_user_console_access_unused` | None | `iam_sa_user_managed_key_rotate_90_days`, `iam_sa_user_managed_key_unused`, `iam_service_account_unused` | +| **VPC / flow logs** | `vpc_flow_logs_enabled` | `network_flow_log_captured_sent`, `network_watcher_enabled`, `network_flow_log_more_than_90_days` | `compute_subnet_flow_logs_enabled` | +| **Backup / DR / Multi-AZ** | `backup_vaults_exist`, `backup_plans_exist`, `backup_reportplans_exist`, `rds_instance_backup_enabled`, `rds_*_protected_by_backup_plan`, `rds_cluster_multi_az`, `neptune_cluster_backup_enabled`, `documentdb_cluster_backup_enabled`, `efs_have_backup_enabled`, `s3_bucket_cross_region_replication`, `dynamodb_table_protected_by_backup_plan` | `vm_backup_enabled`, `vm_sufficient_daily_backup_retention_period`, `storage_geo_redundant_enabled` | `cloudsql_instance_automated_backups`, `cloudstorage_bucket_log_retention_policy_lock`, `cloudstorage_bucket_sufficient_retention_period` | +| **Access analysis / discovery** | `accessanalyzer_enabled`, `accessanalyzer_enabled_without_findings` | None specific | `iam_account_access_approval_enabled`, `iam_cloud_asset_inventory_enabled` | +| **Object lock / retention** | `s3_bucket_object_lock`, `s3_bucket_object_versioning`, `s3_bucket_lifecycle_enabled`, `cloudtrail_bucket_requires_mfa_delete`, `s3_bucket_no_mfa_delete` | `storage_ensure_soft_delete_is_enabled`, `storage_blob_versioning_is_enabled`, `storage_ensure_file_shares_soft_delete_is_enabled` | `cloudstorage_bucket_log_retention_policy_lock`, `cloudstorage_bucket_soft_delete_enabled`, `cloudstorage_bucket_versioning_enabled`, `cloudstorage_bucket_sufficient_retention_period` | +| **Uniform bucket-level access** | `s3_bucket_acl_prohibited` | `storage_account_key_access_disabled`, `storage_default_to_entra_authorization_enabled` | `cloudstorage_bucket_uniform_bucket_level_access` | +| **Container vulnerability scanning** | `ecr_registry_scan_images_on_push_enabled`, `ecr_repositories_scan_vulnerabilities_in_latest_image` | `defender_container_images_scan_enabled`, `defender_container_images_resolved_vulnerabilities` | `artifacts_container_analysis_enabled`, `gcr_container_scanning_enabled` | +| **WAF / rate limiting** | `wafv2_webacl_with_rules`, `waf_*_webacl_with_rules`, `wafv2_webacl_logging_enabled`, `waf_global_webacl_logging_enabled` | None | None | +| **Deployment region restriction** | `organizations_scp_check_deny_regions` | None | None | +| **Secrets automatic rotation** | `secretsmanager_automatic_rotation_enabled`, `secretsmanager_secret_rotated_periodically` | `keyvault_rbac_secret_expiration_set`, `keyvault_non_rbac_secret_expiration_set` | None | +| **Certificate management** | `acm_certificates_expiration_check`, `acm_certificates_with_secure_key_algorithms`, `acm_certificates_transparency_logs_enabled` | `keyvault_key_expiration_set_in_non_rbac`, `keyvault_rbac_key_expiration_set`, `keyvault_non_rbac_secret_expiration_set` | None | +| **GenAI guardrails / input/output filtering** | `bedrock_guardrail_prompt_attack_filter_enabled`, `bedrock_guardrail_sensitive_information_filter_enabled`, `bedrock_agent_guardrail_enabled`, `bedrock_model_invocation_logging_enabled`, `bedrock_api_key_no_administrative_privileges`, `bedrock_api_key_no_long_term_credentials` | None | None | +| **ML dev environment security** | `sagemaker_notebook_instance_root_access_disabled`, `sagemaker_notebook_instance_without_direct_internet_access_configured`, `sagemaker_notebook_instance_vpc_settings_configured`, `sagemaker_models_vpc_settings_configured`, `sagemaker_training_jobs_vpc_settings_configured`, `sagemaker_training_jobs_network_isolation_enabled`, `sagemaker_training_jobs_volume_and_output_encryption_enabled` | None | None | +| **Threat detection / anomalous behavior** | `cloudtrail_threat_detection_enumeration`, `cloudtrail_threat_detection_privilege_escalation`, `cloudtrail_threat_detection_llm_jacking`, `guardduty_is_enabled`, `guardduty_no_high_severity_findings` | None | None | +| **Serverless private access** | `awslambda_function_inside_vpc`, `awslambda_function_not_publicly_accessible`, `awslambda_function_url_public` | `app_function_not_publicly_accessible` | None | + +### What Prowler Does NOT Cover (accept MANUAL honestly) + +Don't pad mappings for these — mark `Checks: []` and move on: + +- **TLS 1.3 version specifically** — Prowler verifies TLS is enforced, not always the exact version +- **IANA port-protocol consistency** — no check for "protocol running on its assigned port" +- **mTLS on most Azure/GCP services** — limited to App Service client certs on Azure, nothing on GCP +- **Rate limiting** on monitoring endpoints, load balancers, serverless invocations, vector ingestion +- **Session cookie expiry** (LB stickiness) +- **HTTP header scrubbing** (Server, X-Powered-By) +- **Certificate transparency verification for imports** +- **Model version pinning, red teaming, AI quality review** +- **Vector embedding validation, dimensional constraints, ANN vs exact search** +- **Secret region replication** (cross-region residency) +- **Lifecycle cleanup policies on container registries** +- **Row-level / column-level security in data warehouses** +- **Deployment region restriction on Azure/GCP** (AWS has `organizations_scp_check_deny_regions`, others don't) +- **Cross-tenant alert silencing permissions** +- **Field-level masking in logs** +- **Managed view enforcement for database access** +- **Automatic MFA delete on all S3 buckets** (only CloudTrail bucket variant exists for some frameworks — AWS has the generic `s3_bucket_no_mfa_delete` though) + +--- + +## Workflow C: Add a New Output Formatter + +Use when a new framework needs its own CSV columns or terminal table. Follow the c5/csa/ens layout exactly: + +```bash +mkdir -p prowler/lib/outputs/compliance/{framework} +touch prowler/lib/outputs/compliance/{framework}/__init__.py +``` + +### Step 1 — Create `{framework}.py` (table dispatcher ONLY) + +Copy from `prowler/lib/outputs/compliance/c5/c5.py` and change the function name + framework string. The `diff` between your file and `c5.py` should be just those two lines. **No function docstring** — other frameworks don't have one, stay consistent. + +### Step 2 — Create `models.py` + +One Pydantic v2 `BaseModel` per provider. Field names become CSV column headers (public API — don't rename later without a migration). + +```python +from typing import Optional +from pydantic import BaseModel + +class {Framework}_AWSModel(BaseModel): + Provider: str + Description: str + AccountId: str + Region: str + AssessmentDate: str + Requirements_Id: str + Requirements_Description: str + # ... provider-specific columns + Status: str + StatusExtended: str + ResourceId: str + ResourceName: str + CheckId: str + Muted: bool +``` + +### Step 3 — Create `{framework}_{provider}.py` for each provider + +Copy from `prowler/lib/outputs/compliance/c5/c5_aws.py` etc. Contains the `{Framework}_AWS(ComplianceOutput)` class with `transform()` that walks findings and emits model rows. This file IS allowed to import `Finding`. + +### Step 4 — Register everywhere + +**`prowler/lib/outputs/compliance/compliance.py`** (CLI table dispatcher): +```python +from prowler.lib.outputs.compliance.{framework}.{framework} import get_{framework}_table + +def display_compliance_table(...): + ... + elif compliance_framework.startswith("{framework}_"): + get_{framework}_table(findings, bulk_checks_metadata, + compliance_framework, output_filename, + output_directory, compliance_overview) +``` + +**`prowler/__main__.py`** (CLI output writer per provider): +Add imports at the top: +```python +from prowler.lib.outputs.compliance.{framework}.{framework}_aws import {Framework}_AWS +from prowler.lib.outputs.compliance.{framework}.{framework}_azure import {Framework}_Azure +from prowler.lib.outputs.compliance.{framework}.{framework}_gcp import {Framework}_GCP +``` +Add provider-specific `elif compliance_name.startswith("{framework}_"):` branches that instantiate the class and call `batch_write_data_to_file()`. + +**`api/src/backend/tasks/jobs/export.py`** (API export dispatcher): +```python +from prowler.lib.outputs.compliance.{framework}.{framework}_aws import {Framework}_AWS +# ... azure, gcp + +COMPLIANCE_CLASS_MAP = { + "aws": [ + # ... + (lambda name: name.startswith("{framework}_"), {Framework}_AWS), + ], + # ... azure, gcp +} +``` + +**Always use `startswith`**, never `name == "framework_aws"`. Exact match is a regression. + +### Step 5 — Add tests + +Create `tests/lib/outputs/compliance/{framework}/` with `{framework}_aws_test.py`, `{framework}_azure_test.py`, `{framework}_gcp_test.py`. See the test template in [references/test_template.md](references/test_template.md). + +Add fixtures to `tests/lib/outputs/compliance/fixtures.py`: one `Compliance` object per provider with 1 evaluated + 1 manual requirement to exercise both code paths in `transform()`. + +### Circular import warning + +**The table dispatcher file (`{framework}.py`) MUST NOT import `Finding`** (directly or transitively). The cycle is: + +``` +compliance.compliance imports get_{framework}_table + → {framework}.py imports ComplianceOutput + → compliance_output imports Finding + → finding imports get_check_compliance from compliance.compliance + → CIRCULAR +``` + +Keep `{framework}.py` bare — only `colorama`, `tabulate`, `prowler.config.config`. Put anything that imports `Finding` in the per-provider `{framework}_{provider}.py` files. + +--- + +## Conventions and Hard-Won Gotchas + +These are lessons from the FINOS CCC v2025.10 sync + 172-AR audit pass (April 2026). Learn them once; save days of debugging. + +1. **Per-provider files are non-negotiable.** Never collapse `{framework}_aws.py`, `{framework}_azure.py`, `{framework}_gcp.py` into a single parameterized class, no matter how DRY-tempting. Every other framework in the codebase follows the per-provider pattern and reviewers will reject the refactor. The CSV column names differ per provider — three classes is the convention. +2. **`{framework}.py` has NO function docstring.** Other frameworks don't have them. Don't add one to be "helpful". +3. **Circular import protection**: the table dispatcher file MUST NOT import `Finding` (directly or transitively). Split the code so `{framework}.py` only has `get_{framework}_table()` with bare imports, and `{framework}_{provider}.py` holds the class that needs `Finding`. +4. **`Generic_Compliance_Requirement_Attribute` is the fallback** — in the `Compliance_Requirement.Attributes` Union in `compliance_models.py`, Generic MUST be LAST because Pydantic v1 tries union members in order. Putting Generic first means every framework-specific attribute falls through to Generic and the specific model is never used. +5. **Pydantic v1 imports.** `from pydantic.v1 import BaseModel` in `compliance_models.py` — not v2. Mixing causes validation errors. Pydantic v2 is used in the CSV models (`models.py`) — that's fine because they're separate trees. +6. **`get_check_compliance()` key format** is `f"{Framework}-{Version}"` ONLY if Version is set. Empty Version → key is `"{Framework}"` (no version suffix). Tests that mock compliance dicts must match this exact format — when a framework ships with `Version: ""`, downstream code and tests break silently. +7. **CSV column names from `models.py` are public API.** Don't rename a field without migrating downstream consumers — CSV headers change. +8. **Upstream YAML multi-line scalars** (`|` block scalars) preserve newlines. Collapse to single-line with `" ".join(value.split())` before writing to JSON. +9. **Upstream catalogs can use multiple shapes.** FINOS CCC uses `control-families: [...]` in most catalogs but `controls: [...]` at the top level in `storage/object`. Any sync script must handle both or silently drop entire catalogs. +10. **Foreign-prefix AR ids.** Upstream sometimes "imports" requirements from one catalog into another by keeping the original id prefix (e.g., `CCC.AuditLog.CN08.AR01` appearing under `CCC.Logging.CN03`). Prowler's compliance model requires unique ids within a catalog — rewrite the foreign id to fit the parent control: `CCC.AuditLog.CN08.AR01` (inside `CCC.Logging.CN03`) → `CCC.Logging.CN03.AR01`. +11. **Genuine upstream id collisions.** Sometimes upstream has a real typo where two different requirements share the same id (e.g., `CCC.Core.CN14.AR02` defined twice for 30-day and 14-day backup variants). Renumber the second copy to the next free AR number. Preserve check mappings by matching on `(Section, frozenset(Applicability))` since the renumbered id won't match by id. +12. **`COMPLIANCE_CLASS_MAP` in `export.py` uses `startswith` predicates** for all modern frameworks. Exact match (`name == "ccc_aws"`) is an anti-pattern — it was present for CCC until April 2026 and was the reason CCC couldn't have versioned variants. +13. **Pre-validate every check id** against the per-provider inventory before writing the JSON. A typo silently creates an unreferenced check that will fail when findings try to map to it. The audit script MUST abort with stderr listing typos, not swallow them. +14. **REPLACE is better than PATCH** for audit decisions. Encoding every mapping explicitly makes the audit reproducible and surfaces hidden assumptions from the legacy data. A PATCH system that adds/removes is too easy to forget. +15. **When no check applies, MANUAL is correct.** Do not pad mappings with tangential checks "just in case". Prowler's compliance reports are meant to be actionable — padding them with noise breaks that. Honest manual reqs can be mapped later when new checks land. +16. **UI groups by `Attributes[0].FamilyName` and `Attributes[0].Section`.** If FamilyName has inconsistent variants within the same JSON (e.g., "Logging & Monitoring" vs "Logging and Monitoring"), the UI renders them as separate categories. Section empty → the requirement falls into an orphan control with label "". Normalize before shipping. +17. **Provider coverage is asymmetric.** AWS has dense coverage (~586 checks across 80+ services): in-transit encryption, IAM, database encryption, backup. Azure (~167 checks) and GCP (~102 checks) are thinner especially for in-transit encryption, mTLS, and ML/AI. Accept the asymmetry in mappings — don't force GCP parity where Prowler genuinely can't verify. + +--- + +## Useful One-Liners + +```bash +# Count requirements per service prefix (CCC, CIS sections, etc.) +jq -r '.Requirements[].Id | split(".")[1]' prowler/compliance/aws/ccc_aws.json | sort | uniq -c + +# Find duplicate requirement IDs +jq -r '.Requirements[].Id' file.json | sort | uniq -d + +# Count manual requirements (no checks) +jq '[.Requirements[] | select((.Checks | length) == 0)] | length' file.json + +# List all unique check references in a framework +jq -r '.Requirements[].Checks[]' file.json | sort -u + +# List all unique Sections (to spot inconsistency) +jq '[.Requirements[].Attributes[0].Section] | unique' file.json + +# List all unique FamilyNames (to spot inconsistency) +jq '[.Requirements[].Attributes[0].FamilyName] | unique' file.json + +# Diff requirement ids between two versions of the same framework +diff <(jq -r '.Requirements[].Id' a.json | sort) <(jq -r '.Requirements[].Id' b.json | sort) + +# Find where a check id is used across all frameworks +grep -rl "my_check_name" prowler/compliance/ + +# Check if a Prowler check exists +find prowler/providers/aws/services -name "{check_id}.metadata.json" + +# Validate a JSON with Pydantic +python -c "from prowler.lib.check.compliance_models import Compliance; print(Compliance.parse_file('prowler/compliance/aws/ccc_aws.json').Framework)" +``` + +--- + ## Best Practices 1. **Requirement IDs**: Follow the original framework numbering exactly (e.g., "1.1", "A.5.1", "T1190", "ac_2_1") -2. **Check Mapping**: Map to existing checks when possible. Use `Checks: []` for manual-only requirements +2. **Check Mapping**: Map to existing checks when possible. Use `Checks: []` for manual-only requirements — honest MANUAL beats padded coverage 3. **Completeness**: Include all framework requirements, even those without automated checks -4. **Version Control**: Include framework version in `Name` and `Version` fields +4. **Version Control**: Include framework version in `Name` and `Version` fields. **Never leave `Version: ""`** — it breaks `get_check_compliance()` key format 5. **File Naming**: Use format `{framework}_{version}_{provider}.json` -6. **Validation**: Prowler validates JSON against Pydantic models at startup - invalid JSON will cause errors +6. **Validation**: Prowler validates JSON against Pydantic models at startup — invalid JSON will cause errors +7. **Pre-validate check ids** against the provider's `*.metadata.json` inventory before every commit +8. **Normalize FamilyName and Section** to avoid inconsistent UI tree branches +9. **Register everywhere**: SDK model (if needed) → `compliance.py` dispatcher → `__main__.py` CLI writer → `export.py` API map → UI mapper. Skipping any layer results in silent failures +10. **Audit, don't pad**: when reviewing mappings, apply the golden rule — the check's title/risk MUST literally describe what the requirement text says. Tangential relation doesn't count ## Commands @@ -482,11 +1005,46 @@ prowler aws --compliance cis_5.0_aws -M csv json html ## Code References -- **Compliance Models:** `prowler/lib/check/compliance_models.py` -- **Compliance Processing:** `prowler/lib/check/compliance.py` -- **Compliance Output:** `prowler/lib/outputs/compliance/` +### Layer 1 — SDK / Core +- **Compliance Models:** `prowler/lib/check/compliance_models.py` (Pydantic v1 model tree) +- **Compliance Processing / Linker:** `prowler/lib/check/compliance.py` (`get_check_compliance`, `update_checks_metadata_with_compliance`) +- **Check Utils:** `prowler/lib/check/utils.py` (`list_compliance_modules`) + +### Layer 2 — JSON Catalogs +- **Framework JSONs:** `prowler/compliance/{provider}/` (auto-discovered via directory walk) + +### Layer 3 — Output Formatters +- **Per-framework folders:** `prowler/lib/outputs/compliance/{framework}/` +- **Shared base class:** `prowler/lib/outputs/compliance/compliance_output.py` (`ComplianceOutput` + `batch_write_data_to_file`) +- **CLI table dispatcher:** `prowler/lib/outputs/compliance/compliance.py` (`display_compliance_table`) +- **Finding model:** `prowler/lib/outputs/finding.py` (**do not import transitively from table dispatcher files — circular import**) +- **CLI writer:** `prowler/__main__.py` (per-provider `elif compliance_name.startswith(...)` branches that instantiate per-provider classes) + +### Layer 4 — API / UI +- **API lazy loader:** `api/src/backend/api/compliance.py` (`LazyComplianceTemplate`, `LazyChecksMapping`) +- **API export dispatcher:** `api/src/backend/tasks/jobs/export.py` (`COMPLIANCE_CLASS_MAP` with `startswith` predicates) +- **UI framework router:** `ui/lib/compliance/compliance-mapper.ts` +- **UI per-framework mapper:** `ui/lib/compliance/{framework}.tsx` +- **UI detail panel:** `ui/components/compliance/compliance-custom-details/{framework}-details.tsx` +- **UI types:** `ui/types/compliance.ts` +- **UI icon:** `ui/components/icons/compliance/{framework}.svg` + registration in `IconCompliance.tsx` + +### Tests +- **Output formatter tests:** `tests/lib/outputs/compliance/{framework}/{framework}_{provider}_test.py` +- **Shared fixtures:** `tests/lib/outputs/compliance/fixtures.py` ## Resources -- **Templates:** See [assets/](assets/) for framework JSON templates +- **JSON Templates:** See [assets/](assets/) for framework JSON templates (cis, ens, iso27001, mitre_attack, prowler_threatscore, generic) +- **Config-driven compliance sync** (any upstream-backed framework): + - [assets/sync_framework.py](assets/sync_framework.py) — generic runner. Loads a YAML config, dynamically imports the declared parser, applies generic post-processing (id uniqueness safety net, `FamilyName` normalization, legacy check-mapping preservation with config-driven fallback keys), and writes the provider JSONs with Pydantic post-validation. Framework-agnostic — works for any compliance framework. + - [assets/configs/ccc.yaml](assets/configs/ccc.yaml) — canonical config example (FINOS CCC v2025.10). Copy and adapt for new frameworks. + - [assets/parsers/finos_ccc.py](assets/parsers/finos_ccc.py) — FINOS CCC YAML parser. Handles both upstream shapes (`control-families` and top-level `controls`), foreign-prefix AR rewriting, and genuine collision renumbering. Exposes `parse_upstream(config) -> list[dict]`. + - [assets/parsers/](assets/parsers/) — add new parser modules here for unfamiliar upstream formats (NIST OSCAL JSON, MITRE STIX, CIS Benchmarks, etc.). Each parser is a `{name}.py` file implementing `parse_upstream(config) -> list[dict]` with guaranteed-unique ids. +- **Reusable audit tooling** (added April 2026 after the FINOS CCC v2025.10 sync): + - [assets/audit_framework_template.py](assets/audit_framework_template.py) — explicit REPLACE decision ledger with pre-validation against the per-provider inventory. Drop-in template for auditing any framework. + - [assets/query_checks.py](assets/query_checks.py) — keyword/service/id query helper over `/tmp/checks_{provider}.json`. + - [assets/dump_section.py](assets/dump_section.py) — dumps every AR for a given id prefix across all 3 providers with current check mappings. + - [assets/build_inventory.py](assets/build_inventory.py) — generates `/tmp/checks_{provider}.json` from `*.metadata.json` files. - **Documentation:** See [references/compliance-docs.md](references/compliance-docs.md) for additional resources +- **Related skill:** [prowler-compliance-review](../prowler-compliance-review/SKILL.md) — PR review checklist and validator script for compliance framework PRs diff --git a/skills/prowler-compliance/assets/audit_framework_template.py b/skills/prowler-compliance/assets/audit_framework_template.py new file mode 100644 index 0000000000..f2d58603d7 --- /dev/null +++ b/skills/prowler-compliance/assets/audit_framework_template.py @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 +""" +Cloud-auditor pass template for any Prowler compliance framework. + +Encode explicit REPLACE decisions per (requirement_id, provider) pair below. +Each decision FULLY overwrites the legacy Checks list for that requirement. + +Workflow: + 1. Run build_inventory.py first to cache per-provider check metadata. + 2. Run dump_section.py to see current mappings for the catalog you're auditing. + 3. Fill in DECISIONS below with explicit check lists. + 4. Run this script — it pre-validates every check id against the inventory + and aborts with stderr listing typos before writing. + +Decision rules (apply as a hostile cloud auditor): + - The Prowler check's title/risk MUST literally describe what the AR text says. + "Related" is not enough. + - If no check actually addresses the requirement, leave `[]` (= MANUAL). + HONEST MANUAL is worth more than padded coverage. + - Missing provider key = leave the legacy mapping untouched. + - Empty list `[]` = explicitly MANUAL (overwrites legacy). + +Usage: + # 1. Copy this file to /tmp/audit_.py and fill in DECISIONS + # 2. Edit FRAMEWORK_KEY below to match your framework file naming + # 3. Run: + python /tmp/audit_.py +""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +# --------------------------------------------------------------------------- +# Configure for your framework +# --------------------------------------------------------------------------- + +# Framework file basename inside prowler/compliance/{provider}/. +# If your framework is called "cis_5.0_aws.json", FRAMEWORK_KEY is "cis_5.0". +# If the file is "ccc_aws.json", FRAMEWORK_KEY is "ccc". +FRAMEWORK_KEY = "ccc" + +# Which providers to apply decisions to. +PROVIDERS = ["aws", "azure", "gcp"] + +PROWLER_DIR = Path("prowler/compliance") +CHECK_INV = {prov: Path(f"/tmp/checks_{prov}.json") for prov in PROVIDERS} + + +# --------------------------------------------------------------------------- +# DECISIONS — encode one entry per requirement you want to audit +# --------------------------------------------------------------------------- + +# DECISIONS[requirement_id][provider] = list[str] of check ids +# See SKILL.md → "Audit Reference Table: Requirement Text → Prowler Checks" +# for a comprehensive mapping cheat sheet built from a 172-AR CCC audit. + +DECISIONS: dict[str, dict[str, list[str]]] = {} + +# ---- Example entries (delete and replace with your own) ---- + +# Example 1: TLS in transit enforced (non-SSH traffic) +# DECISIONS["CCC.Core.CN01.AR01"] = { +# "aws": [ +# "cloudfront_distributions_https_enabled", +# "cloudfront_distributions_origin_traffic_encrypted", +# "s3_bucket_secure_transport_policy", +# "elbv2_ssl_listeners", +# "rds_instance_transport_encrypted", +# "kafka_cluster_in_transit_encryption_enabled", +# "redshift_cluster_in_transit_encryption_enabled", +# "opensearch_service_domains_https_communications_enforced", +# ], +# "azure": [ +# "storage_secure_transfer_required_is_enabled", +# "app_minimum_tls_version_12", +# "postgresql_flexible_server_enforce_ssl_enabled", +# "sqlserver_recommended_minimal_tls_version", +# ], +# "gcp": [ +# "cloudsql_instance_ssl_connections", +# ], +# } + +# Example 2: MANUAL — no Prowler check exists +# DECISIONS["CCC.Core.CN01.AR07"] = { +# "aws": [], # no IANA port/protocol check exists in Prowler +# "azure": [], +# "gcp": [], +# } + +# Example 3: Reuse a decision for multiple sibling ARs +# DECISIONS["CCC.ObjStor.CN05.AR02"] = DECISIONS["CCC.ObjStor.CN05.AR01"] + + +# --------------------------------------------------------------------------- +# Driver — do not edit below +# --------------------------------------------------------------------------- + +def load_inventory(provider: str) -> dict: + path = CHECK_INV[provider] + if not path.exists(): + raise SystemExit( + f"Check inventory missing: {path}\n" + f"Run: python skills/prowler-compliance/assets/build_inventory.py {provider}" + ) + with open(path) as f: + return json.load(f) + + +def resolve_json_path(provider: str) -> Path: + """Resolve the JSON file path for a given provider. + + Handles both shapes: {FRAMEWORK_KEY}_{provider}.json (ccc_aws.json) and + cases where FRAMEWORK_KEY already contains the provider suffix. + """ + candidates = [ + PROWLER_DIR / provider / f"{FRAMEWORK_KEY}_{provider}.json", + PROWLER_DIR / provider / f"{FRAMEWORK_KEY}.json", + ] + for c in candidates: + if c.exists(): + return c + raise SystemExit( + f"Could not find framework JSON for provider={provider} " + f"with FRAMEWORK_KEY={FRAMEWORK_KEY}. Tried: {candidates}" + ) + + +def plan_for_provider( + provider: str, +) -> tuple[Path, dict, tuple[int, int, int], list[tuple[str, str]]]: + """Build the updated JSON for one provider without writing it. + + Returns (path, mutated_data, (touched, added, removed), unknowns). + Writing is deferred to a second pass so that a typo in any provider + aborts the whole run before any file on disk changes. + """ + path = resolve_json_path(provider) + with open(path) as f: + data = json.load(f) + inv = load_inventory(provider) + + touched = 0 + add_count = 0 + rm_count = 0 + unknown: list[tuple[str, str]] = [] + + for req in data["Requirements"]: + rid = req["Id"] + if rid not in DECISIONS or provider not in DECISIONS[rid]: + continue + new_checks = list(dict.fromkeys(DECISIONS[rid][provider])) + for c in new_checks: + if c not in inv: + unknown.append((rid, c)) + before = set(req.get("Checks") or []) + after = set(new_checks) + rm_count += len(before - after) + add_count += len(after - before) + req["Checks"] = new_checks + touched += 1 + + return path, data, (touched, add_count, rm_count), unknown + + +def main() -> int: + if not DECISIONS: + print("No DECISIONS encoded. Fill in the DECISIONS dict and re-run.") + return 1 + print(f"Applying {len(DECISIONS)} decisions to framework '{FRAMEWORK_KEY}'...") + + # Pass 1: validate every provider before touching disk. A typo in any + # provider must abort the run before ANY file has been rewritten. + plans: list[tuple[str, Path, dict, tuple[int, int, int]]] = [] + all_unknown: list[tuple[str, str, str]] = [] + for provider in PROVIDERS: + path, data, counts, unknown = plan_for_provider(provider) + for rid, c in unknown: + all_unknown.append((provider, rid, c)) + plans.append((provider, path, data, counts)) + + if all_unknown: + print("\n!! UNKNOWN CHECK IDS (typos?):", file=sys.stderr) + for provider, rid, c in all_unknown: + print(f" {provider} {rid} -> {c}", file=sys.stderr) + print( + "\nAborting: fix the check ids above and re-run. " + "No files were modified.", + file=sys.stderr, + ) + return 2 + + # Pass 2: all providers validated cleanly — write. + for provider, path, data, (touched, added, removed) in plans: + with open(path, "w") as f: + json.dump(data, f, indent=2, ensure_ascii=False) + f.write("\n") + print( + f" {provider}: touched={touched} added={added} removed={removed}" + ) + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/prowler-compliance/assets/build_inventory.py b/skills/prowler-compliance/assets/build_inventory.py new file mode 100644 index 0000000000..f743aa75a5 --- /dev/null +++ b/skills/prowler-compliance/assets/build_inventory.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +Build a per-provider check inventory by scanning Prowler's check metadata files. + +Outputs one JSON per provider at /tmp/checks_{provider}.json with the shape: + { + "check_id": { + "service": "...", + "subservice": "...", + "resource": "...", + "severity": "...", + "title": "...", + "description": "...", + "risk": "..." + }, + ... + } + +This is the reference used by audit_framework_template.py for pre-validation +(every check id in the audit ledger must exist in the inventory) and by +query_checks.py for keyword/service lookup. + +Usage: + python skills/prowler-compliance/assets/build_inventory.py + # Or for a specific provider: + python skills/prowler-compliance/assets/build_inventory.py aws + +Output: + /tmp/checks_{provider}.json for every provider discovered under + prowler/providers/ with a services/ directory. +""" +from __future__ import annotations + +import json +import sys +from pathlib import Path + +PROVIDERS_ROOT = Path("prowler/providers") + + +def discover_providers() -> list[str]: + """Return every provider that currently has a services/ directory. + + Derived from the filesystem so new providers are picked up automatically + and stale hard-coded lists cannot drift from the repo. + """ + if not PROVIDERS_ROOT.exists(): + return [] + return sorted( + p.name + for p in PROVIDERS_ROOT.iterdir() + if p.is_dir() and (p / "services").is_dir() + ) + + +def build_for_provider(provider: str) -> dict: + inventory: dict[str, dict] = {} + base = Path(f"prowler/providers/{provider}/services") + if not base.exists(): + print(f" skip {provider}: no services directory", file=sys.stderr) + return inventory + for meta_path in base.rglob("*.metadata.json"): + try: + with open(meta_path) as f: + data = json.load(f) + except Exception as exc: + print(f" warn: cannot parse {meta_path}: {exc}", file=sys.stderr) + continue + cid = data.get("CheckID") or meta_path.stem.replace(".metadata", "") + inventory[cid] = { + "service": data.get("ServiceName", ""), + "subservice": data.get("SubServiceName", ""), + "resource": data.get("ResourceType", ""), + "severity": data.get("Severity", ""), + "title": data.get("CheckTitle", ""), + "description": data.get("Description", ""), + "risk": data.get("Risk", ""), + } + return inventory + + +def main() -> int: + providers = sys.argv[1:] or discover_providers() + if not providers: + print( + f"error: no providers found under {PROVIDERS_ROOT}/", + file=sys.stderr, + ) + return 1 + for provider in providers: + inv = build_for_provider(provider) + out_path = Path(f"/tmp/checks_{provider}.json") + with open(out_path, "w") as f: + json.dump(inv, f, indent=2) + print(f" {provider}: {len(inv)} checks → {out_path}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/prowler-compliance/assets/configs/ccc.yaml b/skills/prowler-compliance/assets/configs/ccc.yaml new file mode 100644 index 0000000000..deb757ffc9 --- /dev/null +++ b/skills/prowler-compliance/assets/configs/ccc.yaml @@ -0,0 +1,120 @@ +# FINOS Common Cloud Controls (CCC) sync config for sync_framework.py. +# +# Usage: +# python skills/prowler-compliance/assets/sync_framework.py \ +# skills/prowler-compliance/assets/configs/ccc.yaml +# +# Prerequisite: run the upstream fetch step from SKILL.md Workflow A Step 1 to +# populate upstream.dir with the raw FINOS catalog YAML files. + +framework: + name: CCC + display_name: Common Cloud Controls Catalog (CCC) + version: v2025.10 + # The {provider_display} placeholder is replaced at output time with the + # per-provider display string from the providers list below. + description_template: "Common Cloud Controls Catalog (CCC) for {provider_display}" + +providers: + - key: aws + display: AWS + - key: azure + display: Azure + - key: gcp + display: GCP + +output: + # Supported placeholders: {provider}, {framework}, {version}. + # For versioned frameworks like CIS the template would be + # "prowler/compliance/{provider}/cis_{version}_{provider}.json". + path_template: "prowler/compliance/{provider}/ccc_{provider}.json" + +upstream: + # Directory containing the cached FINOS catalog YAMLs. Populate via + # SKILL.md Workflow A Step 1 (gh api raw download commands). + dir: /tmp/ccc_upstream + fetch_docs: "See SKILL.md Workflow A Step 1 for gh api fetch commands" + +parser: + # Name of the parser module under parsers/ (loaded dynamically by the + # runner). For FINOS CCC YAML this is always finos_ccc. + module: finos_ccc + + # FINOS CCC catalog files in load order. Core first so its ARs render + # first in the output JSON. + catalog_files: + - core_ccc.yaml + - management_auditlog.yaml + - management_logging.yaml + - management_monitoring.yaml + - storage_object.yaml + - networking_loadbalancer.yaml + - networking_vpc.yaml + - crypto_key.yaml + - crypto_secrets.yaml + - database_warehouse.yaml + - database_vector.yaml + - database_relational.yaml + - devtools_build.yaml + - devtools_container-registry.yaml + - identity_iam.yaml + - ai-ml_gen-ai.yaml + - ai-ml_mlde.yaml + - app-integration_message.yaml + - compute_serverless-computing.yaml + + # Shape-2 catalogs (storage/object) reference the family via id only + # (e.g. "CCC.ObjStor.Data") with no human-readable title or description + # in the YAML. Map the suffix (after the last dot) to a canonical title + # and description so the generated JSON has consistent FamilyName fields + # regardless of upstream shape. + family_id_title: + Data: Data + IAM: Identity and Access Management + Identity: Identity and Access Management + Encryption: Encryption + Logging: Logging and Monitoring + Network: Network Security + Availability: Availability + Integrity: Integrity + Confidentiality: Confidentiality + family_id_description: + Data: "The Data control family ensures the confidentiality, integrity, availability, and sovereignty of data across its lifecycle." + IAM: "The Identity and Access Management control family ensures that only trusted and authenticated entities can access resources." + +post_processing: + # Collapse FamilyName variants that appear inconsistently across upstream + # catalogs. The Prowler UI groups by Attributes[0].FamilyName exactly, + # so each variant would otherwise become a separate tree branch. + family_name_normalization: + "Logging & Monitoring": "Logging and Monitoring" + "Logging and Metrics Publication": "Logging and Monitoring" + + # Preserve existing Checks lists from the legacy Prowler JSON when + # regenerating. The runner builds two lookup tables from the legacy + # output: a primary index by Id, and fallback indexes composed of + # attribute field names. + # + # primary_key: the top-level requirement field to use as the primary + # lookup key (almost always "Id") + # fallback_keys: a list of composite keys. Each composite key is a list + # of Attributes[0] field names to join into a tuple. List-valued fields + # (like Applicability) are frozen to frozenset so the tuple is hashable. + # + # CCC uses (Section, Applicability) because Applicability is a CCC-only + # top-level attribute field. CIS would use (Section, Profile). NIST would + # use (ItemId,). The fallback is how renumbered or rewritten ids still + # recover their check mappings. + # + # legacy_path_template (optional): path to read legacy Checks FROM. + # Defaults to output.path_template, which is correct for unversioned + # frameworks (like CCC) where regeneration overwrites the same file. + # For versioned frameworks that write to a new file on each version + # bump (e.g. cis_5.1_aws.json while the legacy mappings live in + # cis_5.0_aws.json), set this to the previous-version path so Checks + # are preserved instead of lost: + # legacy_path_template: "prowler/compliance/{provider}/cis_5.0_{provider}.json" + check_preservation: + primary_key: Id + fallback_keys: + - [Section, Applicability] diff --git a/skills/prowler-compliance/assets/dump_section.py b/skills/prowler-compliance/assets/dump_section.py new file mode 100644 index 0000000000..ca2fff0e1b --- /dev/null +++ b/skills/prowler-compliance/assets/dump_section.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 +""" +Dump every requirement of a compliance framework for a given id prefix across +providers, with their current Check mappings. + +Useful for reviewing a whole control family in one pass before encoding audit +decisions in audit_framework_template.py. + +Usage: + # Dump all CCC.Core requirements across aws/azure/gcp + python skills/prowler-compliance/assets/dump_section.py ccc "CCC.Core." + + # Dump all CIS 5.0 section 1 requirements for AWS only + python skills/prowler-compliance/assets/dump_section.py cis_5.0_aws "1." + +Arguments: + framework_key: file prefix inside prowler/compliance/{provider}/ without + the provider suffix. Examples: + - "ccc" → loads ccc_aws.json / ccc_azure.json / ccc_gcp.json + - "cis_5.0_aws" → loads only that one file + - "iso27001_2022" → loads all providers + id_prefix: Requirement id prefix to filter by (e.g. "CCC.Core.", + "1.1.", "A.5."). +""" +from __future__ import annotations + +import json +import sys +from collections import defaultdict +from pathlib import Path + +PROWLER_COMPLIANCE_DIR = Path("prowler/compliance") + + +def main() -> int: + if len(sys.argv) < 3: + print(__doc__) + return 1 + + framework_key = sys.argv[1] + id_prefix = sys.argv[2] + + # Find matching JSON files across all providers + candidates: list[tuple[str, Path]] = [] + for prov_dir in sorted(PROWLER_COMPLIANCE_DIR.iterdir()): + if not prov_dir.is_dir(): + continue + for json_path in prov_dir.glob("*.json"): + stem = json_path.stem + if stem == framework_key or stem.startswith(f"{framework_key}_") \ + or stem == f"{framework_key}_{prov_dir.name}": + candidates.append((prov_dir.name, json_path)) + + if not candidates: + print(f"No files matching '{framework_key}'", file=sys.stderr) + return 2 + + discovered_providers = sorted({prov for prov, _ in candidates}) + + by_id: dict[str, dict] = defaultdict(dict) + for prov, path in candidates: + with open(path) as f: + data = json.load(f) + for req in data["Requirements"]: + if req["Id"].startswith(id_prefix): + by_id[req["Id"]][prov] = { + "desc": req.get("Description", ""), + "sec": (req.get("Attributes") or [{}])[0].get("Section", ""), + "obj": (req.get("Attributes") or [{}])[0].get( + "SubSectionObjective", "" + ), + "checks": req.get("Checks") or [], + } + + for ar_id in sorted(by_id): + rows = by_id[ar_id] + sample = next(iter(rows.values())) + print(f"\n### {ar_id}") + print(f" desc: {sample['desc']}") + if sample["sec"]: + print(f" sec : {sample['sec']}") + if sample["obj"]: + print(f" obj : {sample['obj']}") + for prov in discovered_providers: + if prov in rows: + checks = rows[prov]["checks"] + print(f" {prov}: ({len(checks)}) {checks}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/prowler-compliance/assets/parsers/__init__.py b/skills/prowler-compliance/assets/parsers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/skills/prowler-compliance/assets/parsers/finos_ccc.py b/skills/prowler-compliance/assets/parsers/finos_ccc.py new file mode 100644 index 0000000000..a613b15857 --- /dev/null +++ b/skills/prowler-compliance/assets/parsers/finos_ccc.py @@ -0,0 +1,223 @@ +""" +FINOS Common Cloud Controls (CCC) YAML parser. + +Reads cached upstream YAML files and emits Prowler-format requirements +(``{Id, Description, Attributes: [...], Checks: []}``). This module is +agnostic to providers, JSON output paths, framework metadata and legacy +check-mapping preservation — those are handled by ``sync_framework.py``. + +Contract +-------- +``parse_upstream(config: dict) -> list[dict]`` + Returns a list of Prowler-format requirement dicts with **guaranteed + unique ids**. Foreign-prefix AR rewriting and genuine collision + renumbering both happen inside this module — the runner treats id + uniqueness as a contract violation, not as something to fix. + +Config keys consumed +-------------------- +This parser reads the following config entries (the rest of the config is +opaque to it): + +- ``upstream.dir`` — directory containing the cached YAMLs +- ``parser.catalog_files`` — ordered list of YAML filenames to load +- ``parser.family_id_title`` — suffix → canonical family title (shape 2) +- ``parser.family_id_description`` — suffix → family description (shape 2) + +Upstream shapes +--------------- +FINOS CCC catalogs come in two shapes: + +1. ``control-families: [{title, description, controls: [...]}]`` + (used by most catalogs) +2. ``controls: [{id, family: "CCC.X.Y", ...}]`` (no families wrapper; used + by ``storage/object``). The ``family`` field references a family id with + no human-readable title in the file — the title/description come from + ``config.parser.family_id_title`` / ``family_id_description``. + +Id rewriting rules +------------------ +- **Foreign-prefix rewriting**: upstream intentionally aliases requirements + across catalogs by keeping the original prefix (e.g. ``CCC.AuditLog.CN08.AR01`` + appears nested under ``CCC.Logging.CN03``). Prowler requires unique ids + within a catalog file, so we rename the AR to fit its parent control: + ``CCC.Logging.CN03.AR01``. See ``rewrite_ar_id()``. +- **Genuine collision renumbering**: sometimes upstream has a real typo + where two distinct requirements share the same id (e.g. + ``CCC.Core.CN14.AR02`` appears twice for 30-day and 14-day backup variants). + The second copy is renumbered to the next free AR number within the + control. See the ``seen_ids`` logic in ``emit_requirement()``. +""" +from __future__ import annotations + +from pathlib import Path + +import yaml + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def clean(value: str | None) -> str: + """Trim and collapse internal whitespace/newlines into single spaces. + + Upstream YAML uses ``|`` block scalars that preserve newlines; Prowler + stores descriptions as single-line text. + """ + if not value: + return "" + return " ".join(value.split()) + + +def flatten_mappings(mappings): + """Convert upstream ``{reference-id, entries: [{reference-id, ...}]}`` to + Prowler's ``{ReferenceId, Identifiers: [...]}``. + """ + if not mappings: + return [] + out = [] + for m in mappings: + ids = [] + for entry in m.get("entries") or []: + eid = entry.get("reference-id") + if eid: + ids.append(eid) + out.append({"ReferenceId": m.get("reference-id", ""), "Identifiers": ids}) + return out + + +def ar_prefix(ar_id: str) -> str: + """Return the first three dot-segments of an AR id (the parent control). + + e.g. ``CCC.Core.CN01.AR01`` -> ``CCC.Core.CN01``. + """ + return ".".join(ar_id.split(".")[:3]) + + +def rewrite_ar_id(parent_control_id: str, original_ar_id: str, ar_index: int) -> str: + """If an AR's id doesn't share its parent control's prefix, rename it. + + Example + ------- + parent ``CCC.Logging.CN03`` + AR id ``CCC.AuditLog.CN08.AR01`` with + index 0 -> ``CCC.Logging.CN03.AR01``. + """ + if ar_prefix(original_ar_id) == parent_control_id: + return original_ar_id + return f"{parent_control_id}.AR{ar_index + 1:02d}" + + +def emit_requirement( + control: dict, + family_name: str, + family_desc: str, + seen_ids: set[str], + requirements: list[dict], +) -> None: + """Translate one FINOS control + its assessment-requirements into + Prowler-format requirement dicts and append them to ``requirements``. + + Applies foreign-prefix rewriting and genuine-collision renumbering so + the final list is guaranteed to have unique ids. + """ + control_id = clean(control.get("id")) + control_title = clean(control.get("title")) + section = f"{control_id} {control_title}".strip() + objective = clean(control.get("objective")) + threat_mappings = flatten_mappings(control.get("threat-mappings")) + guideline_mappings = flatten_mappings(control.get("guideline-mappings")) + ars = control.get("assessment-requirements") or [] + for idx, ar in enumerate(ars): + raw_id = clean(ar.get("id")) + if not raw_id: + continue + new_id = rewrite_ar_id(control_id, raw_id, idx) + # Renumber on genuine upstream collision (find next free AR number) + if new_id in seen_ids: + base = ".".join(new_id.split(".")[:-1]) + n = 1 + while f"{base}.AR{n:02d}" in seen_ids: + n += 1 + new_id = f"{base}.AR{n:02d}" + seen_ids.add(new_id) + + requirements.append( + { + "Id": new_id, + "Description": clean(ar.get("text")), + "Attributes": [ + { + "FamilyName": family_name, + "FamilyDescription": family_desc, + "Section": section, + "SubSection": "", + "SubSectionObjective": objective, + "Applicability": list(ar.get("applicability") or []), + "Recommendation": clean(ar.get("recommendation")), + "SectionThreatMappings": threat_mappings, + "SectionGuidelineMappings": guideline_mappings, + } + ], + "Checks": [], + } + ) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def parse_upstream(config: dict) -> list[dict]: + """Walk upstream YAMLs and emit Prowler-format requirements. + + Handles both top-level shapes (``control-families`` and ``controls``). + Ids are guaranteed unique in the returned list. + """ + upstream_dir = Path(config["upstream"]["dir"]) + parser_cfg = config.get("parser") or {} + catalog_files = parser_cfg.get("catalog_files") or [] + family_id_title = parser_cfg.get("family_id_title") or {} + family_id_description = parser_cfg.get("family_id_description") or {} + + requirements: list[dict] = [] + seen_ids: set[str] = set() + + for filename in catalog_files: + path = upstream_dir / filename + if not path.exists(): + # parser.catalog_files is the closed set of upstream catalogs + # that define the framework. Silently skipping a missing file + # would emit valid-looking JSON with part of the framework + # dropped, defeating the whole point of a canonical sync. + raise FileNotFoundError( + f"upstream catalog file not found: {path}\n" + f" hint: refresh the upstream cache (see SKILL.md Workflow A " + f"Step 1), or remove {filename!r} from parser.catalog_files " + f"if it has been retired upstream." + ) + with open(path) as f: + doc = yaml.safe_load(f) or {} + + # Shape 1: control-families wrapper + for family in doc.get("control-families") or []: + family_name = clean(family.get("title")) + family_desc = clean(family.get("description")) + for control in family.get("controls") or []: + emit_requirement( + control, family_name, family_desc, seen_ids, requirements + ) + + # Shape 2: top-level controls with family reference id + for control in doc.get("controls") or []: + family_ref = clean(control.get("family")) + suffix = family_ref.split(".")[-1] if family_ref else "" + family_name = family_id_title.get(suffix, suffix or "Data") + family_desc = family_id_description.get(suffix, "") + emit_requirement( + control, family_name, family_desc, seen_ids, requirements + ) + + return requirements diff --git a/skills/prowler-compliance/assets/query_checks.py b/skills/prowler-compliance/assets/query_checks.py new file mode 100644 index 0000000000..46405be982 --- /dev/null +++ b/skills/prowler-compliance/assets/query_checks.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python3 +""" +Keyword/service/id lookup over a Prowler check inventory produced by +build_inventory.py. + +Usage: + # Keyword AND-search across id + title + risk + description + python skills/prowler-compliance/assets/query_checks.py aws encryption transit + + # Show all checks for a service + python skills/prowler-compliance/assets/query_checks.py aws --service iam + + # Show full metadata for one check id + python skills/prowler-compliance/assets/query_checks.py aws --id kms_cmk_rotation_enabled +""" +from __future__ import annotations + +import json +import sys + + +def main() -> int: + if len(sys.argv) < 3: + print(__doc__) + return 1 + + provider = sys.argv[1] + try: + with open(f"/tmp/checks_{provider}.json") as f: + inv = json.load(f) + except FileNotFoundError: + print( + f"No inventory for {provider}. Run build_inventory.py first.", + file=sys.stderr, + ) + return 2 + + if sys.argv[2] == "--service": + if len(sys.argv) < 4: + print("usage: --service ") + return 1 + svc = sys.argv[3] + hits = [cid for cid in sorted(inv) if inv[cid].get("service") == svc] + for cid in hits: + print(f" {cid}") + print(f" {inv[cid].get('title', '')}") + print(f"\n{len(hits)} checks in service '{svc}'") + elif sys.argv[2] == "--id": + if len(sys.argv) < 4: + print("usage: --id ") + return 1 + cid = sys.argv[3] + if cid not in inv: + print(f"NOT FOUND: {cid}") + return 3 + m = inv[cid] + print(f"== {cid} ==") + print(f"service : {m.get('service')}") + print(f"severity: {m.get('severity')}") + print(f"resource: {m.get('resource')}") + print(f"title : {m.get('title')}") + print(f"desc : {m.get('description', '')[:500]}") + print(f"risk : {m.get('risk', '')[:500]}") + else: + keywords = [k.lower() for k in sys.argv[2:]] + hits = 0 + for cid in sorted(inv): + m = inv[cid] + blob = " ".join( + [ + cid, + m.get("title", ""), + m.get("risk", ""), + m.get("description", ""), + ] + ).lower() + if all(k in blob for k in keywords): + hits += 1 + print(f" {cid} [{m.get('service', '')}]") + print(f" {m.get('title', '')[:120]}") + print(f"\n{hits} matches for {' + '.join(keywords)}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/skills/prowler-compliance/assets/sync_framework.py b/skills/prowler-compliance/assets/sync_framework.py new file mode 100644 index 0000000000..9e070f2691 --- /dev/null +++ b/skills/prowler-compliance/assets/sync_framework.py @@ -0,0 +1,536 @@ +#!/usr/bin/env python3 +""" +Generic, config-driven compliance framework sync runner. + +Usage: + python skills/prowler-compliance/assets/sync_framework.py \ + skills/prowler-compliance/assets/configs/ccc.yaml + +Pipeline: + 1. Load and validate the YAML config (fail fast on missing or empty + required fields — notably ``framework.version``, which silently + breaks ``get_check_compliance()`` key construction if empty). + 2. Dynamically import the parser module declared in ``parser.module`` + (resolved as ``parsers.{name}`` under this script's directory). + 3. Call ``parser.parse_upstream(config) -> list[dict]`` to get raw + Prowler-format requirements. The parser owns all upstream-format + quirks (foreign-prefix AR rewriting, collision renumbering, shape + handling) and MUST return ids that are unique within the returned + list. + 4. **Safety net**: assert id uniqueness. The runner raises + ``ValueError`` on any duplicate — it does NOT silently renumber, + because mutating a canonical upstream id (e.g. CIS ``1.1.1`` or + NIST ``AC-2(1)``) would be catastrophic. + 5. Apply generic ``FamilyName`` normalization from + ``post_processing.family_name_normalization`` (optional). + 6. Preserve legacy ``Checks`` lists from the existing Prowler JSON + using a config-driven primary key + fallback key chain. CCC uses + ``(Section, Applicability)`` as fallback; CIS would use + ``(Section, Profile)``; NIST would use ``(ItemId,)``. + For versioned frameworks (e.g. ``cis__.json``) + where a version bump writes to a brand-new file, set + ``post_processing.check_preservation.legacy_path_template`` to + point at the previous version's file so its Checks are preserved + instead of silently lost. Defaults to ``output.path_template`` + when omitted, which is correct for unversioned frameworks. + 7. Wrap each provider's requirements in the framework metadata dict + built from the config templates. + 8. Write each provider's JSON to the path resolved from + ``output.path_template`` (supports ``{framework}``, ``{version}`` + and ``{provider}`` placeholders). + 9. Pydantic-validate the written JSON via ``Compliance.parse_file()`` + and report the load counts per provider. + +The runner is strictly generic — it never mentions CCC, knows nothing +about YAML shapes, and can handle any upstream-backed framework given a +parser module and a config file. +""" +from __future__ import annotations + +import importlib +import json +import sys +from pathlib import Path +from typing import Any + +import yaml + +# Make sibling `parsers/` package importable regardless of the runner's +# invocation directory. +_SCRIPT_DIR = Path(__file__).resolve().parent +if str(_SCRIPT_DIR) not in sys.path: + sys.path.insert(0, str(_SCRIPT_DIR)) + + +# --------------------------------------------------------------------------- +# Config loading and validation +# --------------------------------------------------------------------------- + + +class ConfigError(ValueError): + """Raised when the sync config is malformed or missing required fields.""" + + +def _require(cfg: dict, dotted_path: str) -> Any: + """Fetch a dotted-path key from nested dicts. Raises ConfigError on + missing or empty values (empty-string, empty-list, None).""" + current: Any = cfg + parts = dotted_path.split(".") + for i, part in enumerate(parts): + if not isinstance(current, dict) or part not in current: + raise ConfigError(f"config: missing required field '{dotted_path}'") + current = current[part] + if current in ("", None, [], {}): + raise ConfigError(f"config: field '{dotted_path}' must not be empty") + return current + + +def load_config(path: Path) -> dict: + if not path.exists(): + raise ConfigError(f"config file not found: {path}") + with open(path) as f: + cfg = yaml.safe_load(f) or {} + if not isinstance(cfg, dict): + raise ConfigError(f"config root must be a mapping, got {type(cfg).__name__}") + + # Required fields — fail fast. Empty Version in particular silently + # breaks get_check_compliance() key construction. + _require(cfg, "framework.name") + _require(cfg, "framework.display_name") + _require(cfg, "framework.version") + _require(cfg, "framework.description_template") + _require(cfg, "providers") + _require(cfg, "output.path_template") + _require(cfg, "upstream.dir") + _require(cfg, "parser.module") + _require(cfg, "post_processing.check_preservation.primary_key") + + providers = cfg["providers"] + if not isinstance(providers, list) or not providers: + raise ConfigError("config: 'providers' must be a non-empty list") + for idx, p in enumerate(providers): + if not isinstance(p, dict) or "key" not in p or "display" not in p: + raise ConfigError( + f"config: providers[{idx}] must have 'key' and 'display' fields" + ) + + return cfg + + +# --------------------------------------------------------------------------- +# Parser loading +# --------------------------------------------------------------------------- + + +def load_parser(parser_module_name: str): + try: + return importlib.import_module(f"parsers.{parser_module_name}") + except ImportError as exc: + raise ConfigError( + f"cannot import parser 'parsers.{parser_module_name}': {exc}" + ) from exc + + +# --------------------------------------------------------------------------- +# Post-processing: id uniqueness safety net +# --------------------------------------------------------------------------- + + +def assert_unique_ids(requirements: list[dict]) -> None: + """Enforce the parser contract: every requirement must have a unique Id. + + The runner never renumbers silently — a duplicate is a parser bug. + """ + seen: set[str] = set() + dups: list[str] = [] + for req in requirements: + rid = req.get("Id") + if not rid: + raise ValueError(f"requirement missing Id: {req}") + if rid in seen: + dups.append(rid) + seen.add(rid) + if dups: + raise ValueError( + f"parser returned duplicate requirement ids: {sorted(set(dups))}" + ) + + +# --------------------------------------------------------------------------- +# Post-processing: FamilyName normalization +# --------------------------------------------------------------------------- + + +def normalize_family_names(requirements: list[dict], norm_map: dict[str, str]) -> None: + """Apply ``Attributes[0].FamilyName`` normalization in place.""" + if not norm_map: + return + for req in requirements: + for attr in req.get("Attributes") or []: + name = attr.get("FamilyName") + if name in norm_map: + attr["FamilyName"] = norm_map[name] + + +# --------------------------------------------------------------------------- +# Post-processing: legacy check-mapping preservation +# --------------------------------------------------------------------------- + + +def _freeze(value: Any) -> Any: + """Make a value hashable for use in composite lookup keys. + + Lists become frozensets (order-insensitive match). Scalars pass through. + """ + if isinstance(value, list): + return frozenset(value) + return value + + +def _build_fallback_key(attrs: dict, field_names: list[str]) -> tuple | None: + """Build a composite tuple key from the given attribute field names. + + Returns None if any field is missing or falsy — that key will be + skipped (the lookup table just won't have an entry for it). + """ + parts = [] + for name in field_names: + if name not in attrs: + return None + value = attrs[name] + if value in ("", None, [], {}): + return None + parts.append(_freeze(value)) + return tuple(parts) + + +def load_legacy_check_maps( + legacy_path: Path, + primary_key: str, + fallback_keys: list[list[str]], +) -> tuple[dict[str, list[str]], list[dict[tuple, list[str]]]]: + """Read the existing Prowler JSON and build lookup tables for check + preservation. + + Fails fast on ambiguous preservation keys. If two distinct legacy + requirements share the same primary value or the same fallback tuple, + merging their ``Checks`` silently would corrupt the preserved mapping + for unrelated requirements. Raises ``ValueError`` listing every + conflict so the user can either dedupe the legacy data or strengthen + ``check_preservation`` in the sync config. + + Returns + ------- + by_primary : dict + ``{primary_value: [checks]}`` — e.g. ``{ar_id: [checks]}``. + by_fallback : list[dict] + One lookup dict per entry in ``fallback_keys``. Each maps a + composite tuple key to its preserved checks list. + """ + by_primary: dict[str, list[str]] = {} + by_fallback: list[dict[tuple, list[str]]] = [{} for _ in fallback_keys] + + if not legacy_path.exists(): + return by_primary, by_fallback + + with open(legacy_path) as f: + data = json.load(f) + + # Track which legacy requirement Ids contributed to each bucket so we + # can surface ambiguity after the scan completes. + primary_sources: dict[str, list[str]] = {} + fallback_sources: list[dict[tuple, list[str]]] = [{} for _ in fallback_keys] + + for req in data.get("Requirements") or []: + legacy_id = req.get("Id") or "" + checks = req.get("Checks") or [] + + pv = req.get(primary_key) + if pv: + primary_sources.setdefault(pv, []).append(legacy_id) + bucket = by_primary.setdefault(pv, []) + for c in checks: + if c not in bucket: + bucket.append(c) + + attributes = req.get("Attributes") or [] + if not attributes: + continue + attrs = attributes[0] + for i, field_names in enumerate(fallback_keys): + key = _build_fallback_key(attrs, field_names) + if key is None: + continue + fallback_sources[i].setdefault(key, []).append(legacy_id) + bucket = by_fallback[i].setdefault(key, []) + for c in checks: + if c not in bucket: + bucket.append(c) + + conflicts: list[str] = [] + for pv, ids in primary_sources.items(): + if len(ids) > 1: + conflicts.append( + f"primary_key={primary_key!r} value={pv!r} shared by {ids}" + ) + for i, field_names in enumerate(fallback_keys): + for key, ids in fallback_sources[i].items(): + if len(ids) > 1: + conflicts.append( + f"fallback_key={field_names} value={key!r} shared by {ids}" + ) + if conflicts: + details = "\n - ".join(conflicts) + raise ValueError( + f"ambiguous preservation keys in {legacy_path} — cannot " + f"faithfully preserve Checks across distinct requirements:\n" + f" - {details}\n" + f"Fix: dedupe the legacy JSON, or strengthen " + f"'post_processing.check_preservation' in the sync config " + f"(e.g. add a more discriminating field to fallback_keys)." + ) + + return by_primary, by_fallback + + +def lookup_preserved_checks( + req: dict, + by_primary: dict, + by_fallback: list[dict], + primary_key: str, + fallback_keys: list[list[str]], +) -> list[str]: + """Return preserved check ids for a requirement, trying the primary + key first then each fallback in order.""" + pv = req.get(primary_key) + if pv and pv in by_primary: + return list(by_primary[pv]) + attributes = req.get("Attributes") or [] + if not attributes: + return [] + attrs = attributes[0] + for i, field_names in enumerate(fallback_keys): + key = _build_fallback_key(attrs, field_names) + if key and key in by_fallback[i]: + return list(by_fallback[i][key]) + return [] + + +# --------------------------------------------------------------------------- +# Provider output assembly +# --------------------------------------------------------------------------- + + +def resolve_output_path(template: str, framework: dict, provider_key: str) -> Path: + return Path( + template.format( + provider=provider_key, + framework=framework["name"].lower(), + version=framework["version"], + ) + ) + + +def build_provider_json( + config: dict, + provider: dict, + base_requirements: list[dict], +) -> tuple[dict, dict[str, int]]: + """Produce the provider-specific JSON dict ready to dump. + + Returns ``(json_dict, counts)`` where ``counts`` tracks how each + requirement's checks were resolved (primary, fallback, or none). + """ + framework = config["framework"] + preservation = config["post_processing"]["check_preservation"] + primary_key = preservation["primary_key"] + fallback_keys = preservation.get("fallback_keys") or [] + + # For versioned frameworks, the file we WRITE (output.path_template + # resolved at the new version) is not the file we want to READ legacy + # Checks from. Allow the config to override the legacy source path so + # a version bump can still preserve mappings from the previous file. + legacy_template = ( + preservation.get("legacy_path_template") + or config["output"]["path_template"] + ) + legacy_path = resolve_output_path( + legacy_template, framework, provider["key"] + ) + by_primary, by_fallback = load_legacy_check_maps( + legacy_path, primary_key, fallback_keys + ) + + counts = {"primary": 0, "fallback": 0, "none": 0} + enriched: list[dict] = [] + for req in base_requirements: + # Try primary key first + pv = req.get(primary_key) + checks: list[str] = [] + source = "none" + if pv and pv in by_primary: + checks = list(by_primary[pv]) + source = "primary" + else: + attributes = req.get("Attributes") or [] + if attributes: + attrs = attributes[0] + for i, field_names in enumerate(fallback_keys): + key = _build_fallback_key(attrs, field_names) + if key and key in by_fallback[i]: + checks = list(by_fallback[i][key]) + source = "fallback" + break + counts[source] += 1 + enriched.append( + { + "Id": req["Id"], + "Description": req["Description"], + # Shallow-copy attribute dicts so providers don't share refs + "Attributes": [dict(a) for a in req.get("Attributes") or []], + "Checks": checks, + } + ) + + description = framework["description_template"].format( + provider_display=provider["display"], + provider_key=provider["key"], + framework_name=framework["name"], + framework_display=framework["display_name"], + version=framework["version"], + ) + out = { + "Framework": framework["name"], + "Version": framework["version"], + "Provider": provider["display"], + "Name": framework["display_name"], + "Description": description, + "Requirements": enriched, + } + return out, counts + + +# --------------------------------------------------------------------------- +# Pydantic post-validation +# --------------------------------------------------------------------------- + + +def pydantic_validate(json_path: Path) -> int: + """Import Prowler lazily so the runner still works without Prowler + installed (validation step is skipped in that case).""" + try: + from prowler.lib.check.compliance_models import Compliance + except ImportError: + print( + " note: prowler package not importable — skipping Pydantic validation", + file=sys.stderr, + ) + return -1 + try: + parsed = Compliance.parse_file(str(json_path)) + except Exception as exc: + raise RuntimeError( + f"Pydantic validation failed for {json_path}: {exc}" + ) from exc + return len(parsed.Requirements) + + +# --------------------------------------------------------------------------- +# Driver +# --------------------------------------------------------------------------- + + +def main() -> int: + if len(sys.argv) != 2: + print("usage: sync_framework.py ", file=sys.stderr) + return 1 + + config_path = Path(sys.argv[1]) + try: + config = load_config(config_path) + except ConfigError as exc: + print(f"config error: {exc}", file=sys.stderr) + return 2 + + framework_name = config["framework"]["name"] + upstream_dir = Path(config["upstream"]["dir"]) + if not upstream_dir.exists(): + print( + f"error: upstream cache dir {upstream_dir} not found\n" + f" hint: {config['upstream'].get('fetch_docs', '(see SKILL.md Workflow A Step 1)')}", + file=sys.stderr, + ) + return 3 + + parser_module_name = config["parser"]["module"] + print( + f"Sync: framework={framework_name} version={config['framework']['version']} " + f"parser={parser_module_name}" + ) + + try: + parser = load_parser(parser_module_name) + except ConfigError as exc: + print(f"parser error: {exc}", file=sys.stderr) + return 4 + + print(f"Parsing upstream from {upstream_dir}...") + try: + base_requirements = parser.parse_upstream(config) + except FileNotFoundError as exc: + # A missing catalog declared in parser.catalog_files is a hard + # failure: emitting JSON with part of the framework silently + # dropped would violate the canonical-sync contract. + print(f"upstream error: {exc}", file=sys.stderr) + return 6 + print(f" parser returned {len(base_requirements)} requirements") + + # Safety-net: parser contract + try: + assert_unique_ids(base_requirements) + except ValueError as exc: + print(f"parser contract violation: {exc}", file=sys.stderr) + return 5 + + # Post-processing: family name normalization + norm_map = ( + config.get("post_processing", {}) + .get("family_name_normalization") + or {} + ) + normalize_family_names(base_requirements, norm_map) + + # Per-provider output + print() + for provider in config["providers"]: + provider_json, counts = build_provider_json( + config, provider, base_requirements + ) + out_path = resolve_output_path( + config["output"]["path_template"], + config["framework"], + provider["key"], + ) + out_path.parent.mkdir(parents=True, exist_ok=True) + with open(out_path, "w") as f: + json.dump(provider_json, f, indent=2, ensure_ascii=False) + f.write("\n") + + validated = pydantic_validate(out_path) + validated_msg = ( + f" pydantic_reqs={validated}" if validated >= 0 else " pydantic=skipped" + ) + print( + f" {provider['key']}: total={len(provider_json['Requirements'])} " + f"matched_primary={counts['primary']} " + f"matched_fallback={counts['fallback']} " + f"new_or_unmatched={counts['none']}{validated_msg}" + ) + print(f" wrote {out_path}") + + print("\nDone.") + return 0 + + +if __name__ == "__main__": + sys.exit(main())