Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
5e4f5b0ffd chore(deps-dev): bump pytest from 8.3.5 to 9.0.2
Bumps [pytest](https://github.com/pytest-dev/pytest) from 8.3.5 to 9.0.2.
- [Release notes](https://github.com/pytest-dev/pytest/releases)
- [Changelog](https://github.com/pytest-dev/pytest/blob/main/CHANGELOG.rst)
- [Commits](https://github.com/pytest-dev/pytest/compare/8.3.5...9.0.2)

---
updated-dependencies:
- dependency-name: pytest
  dependency-version: 9.0.2
  dependency-type: direct:development
  update-type: version-update:semver-major
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-04-13 12:12:41 +00:00
11 changed files with 28 additions and 1896 deletions

34
poetry.lock generated
View File

@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.2.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -1888,7 +1888,6 @@ files = [
{file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"},
{file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"},
]
markers = {dev = "platform_system == \"Windows\" or sys_platform == \"win32\""}
[[package]]
name = "contextlib2"
@@ -3084,7 +3083,7 @@ files = [
[package.dependencies]
attrs = ">=22.2.0"
jsonschema-specifications = ">=2023.3.6"
jsonschema-specifications = ">=2023.03.6"
referencing = ">=0.28.4"
rpds-py = ">=0.7.1"
@@ -3164,7 +3163,7 @@ files = [
]
[package.dependencies]
certifi = ">=14.5.14"
certifi = ">=14.05.14"
durationpy = ">=0.7"
google-auth = ">=1.0.1"
oauthlib = ">=3.2.2"
@@ -4978,7 +4977,7 @@ files = [
]
[package.dependencies]
astroid = ">=3.3.8,<=3.4.0.dev0"
astroid = ">=3.3.8,<=3.4.0-dev0"
colorama = {version = ">=0.4.5", markers = "sys_platform == \"win32\""}
dill = [
{version = ">=0.2", markers = "python_version < \"3.11\""},
@@ -5074,26 +5073,27 @@ diagrams = ["jinja2", "railroad-diagrams"]
[[package]]
name = "pytest"
version = "8.3.5"
version = "9.0.2"
description = "pytest: simple powerful testing with Python"
optional = false
python-versions = ">=3.8"
python-versions = ">=3.10"
groups = ["dev"]
files = [
{file = "pytest-8.3.5-py3-none-any.whl", hash = "sha256:c69214aa47deac29fad6c2a4f590b9c4a9fdb16a403176fe154b79c0b4d4d820"},
{file = "pytest-8.3.5.tar.gz", hash = "sha256:f4efe70cc14e511565ac476b57c279e12a855b11f48f212af1080ef2263d3845"},
{file = "pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b"},
{file = "pytest-9.0.2.tar.gz", hash = "sha256:75186651a92bd89611d1d9fc20f0b4345fd827c41ccd5c299a868a05d70edf11"},
]
[package.dependencies]
colorama = {version = "*", markers = "sys_platform == \"win32\""}
exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version < \"3.11\""}
iniconfig = "*"
packaging = "*"
colorama = {version = ">=0.4", markers = "sys_platform == \"win32\""}
exceptiongroup = {version = ">=1", markers = "python_version < \"3.11\""}
iniconfig = ">=1.0.1"
packaging = ">=22"
pluggy = ">=1.5,<2"
pygments = ">=2.7.2"
tomli = {version = ">=1", markers = "python_version < \"3.11\""}
[package.extras]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
dev = ["argcomplete", "attrs (>=19.2)", "hypothesis (>=3.56)", "mock", "requests", "setuptools", "xmlschema"]
[[package]]
name = "pytest-cov"
@@ -5824,10 +5824,10 @@ files = [
]
[package.dependencies]
botocore = ">=1.37.4,<2.0a0"
botocore = ">=1.37.4,<2.0a.0"
[package.extras]
crt = ["botocore[crt] (>=1.37.4,<2.0a0)"]
crt = ["botocore[crt] (>=1.37.4,<2.0a.0)"]
[[package]]
name = "safety"
@@ -6745,4 +6745,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = ">=3.10,<3.13"
content-hash = "4050d3a95f5bc5448576ca0361fd899b35aa04de28d379cdfd3c2b0db67848ad"
content-hash = "df4910c598915fb11969028276d0ab7fd3833deb068d2ae50e42f3feeae605ec"

View File

@@ -129,7 +129,7 @@ openapi-schema-validator = "0.6.3"
openapi-spec-validator = "0.7.1"
pre-commit = "4.2.0"
pylint = "3.3.4"
pytest = "8.3.5"
pytest = "9.0.2"
pytest-cov = "6.0.0"
pytest-env = "1.1.5"
pytest-randomly = "3.16.0"

View File

@@ -1,28 +1,16 @@
---
name: prowler-compliance
description: >
Creates, syncs, audits and manages Prowler compliance frameworks end-to-end.
Covers the four-layer architecture (SDK models → JSON catalogs → output
formatters → API/UI), upstream sync workflows, cloud-auditor check-mapping
reviews, output formatter creation, and framework-specific attribute models.
Trigger: When working with compliance frameworks (CIS, NIST, PCI-DSS, SOC2,
GDPR, ISO27001, ENS, MITRE ATT&CK, CCC, C5, CSA CCM, KISA ISMS-P,
Prowler ThreatScore, FedRAMP, HIPAA), syncing with upstream catalogs,
auditing check-to-requirement mappings, adding output formatters, or fixing
compliance JSON bugs (duplicate IDs, empty Version, wrong Section, stale
check refs).
Creates and manages Prowler compliance frameworks.
Trigger: When working with compliance frameworks (CIS, NIST, PCI-DSS, SOC2, GDPR, ISO27001, ENS, MITRE ATT&CK).
license: Apache-2.0
metadata:
author: prowler-cloud
version: "1.2"
version: "1.1"
scope: [root, sdk]
auto_invoke:
- "Creating/updating compliance frameworks"
- "Mapping checks to compliance controls"
- "Syncing compliance framework with upstream catalog"
- "Auditing check-to-requirement mappings as a cloud auditor"
- "Adding a compliance output formatter (per-provider class + table dispatcher)"
- "Fixing compliance JSON bugs (duplicate IDs, empty Section, stale refs)"
allowed-tools: Read, Edit, Write, Glob, Grep, Bash, WebFetch, WebSearch, Task
---
@@ -30,82 +18,10 @@ allowed-tools: Read, Edit, Write, Glob, Grep, Bash, WebFetch, WebSearch, Task
Use this skill when:
- Creating a new compliance framework for any provider
- **Syncing an existing framework with an upstream source of truth** (CIS, FINOS CCC, CSA CCM, NIST, ENS, etc.)
- Adding requirements to existing frameworks
- Mapping checks to compliance controls
- **Auditing existing check mappings as a cloud auditor** (user asks "are these mappings correct?", "which checks apply to this requirement?", "review the mappings")
- **Adding a new output formatter** (new framework needs a table dispatcher + per-provider classes + CSV models)
- **Fixing JSON bugs**: duplicate IDs, empty Version, wrong Section, stale check refs, inconsistent FamilyName, padded tangential check mappings
- **Registering a framework in the CLI table dispatcher or API export map**
- Investigating why a finding/check isn't showing under the expected compliance framework in the UI
- Understanding compliance framework structures and attributes
## Four-Layer Architecture (Mental Model)
Prowler compliance is a **four-layer system** hanging off one Pydantic model tree. Bugs usually happen where one layer doesn't match another, so know all four before touching anything.
### Layer 1: SDK / Core Models — `prowler/lib/check/`
- **`compliance_models.py`** — Pydantic **v1** model tree (`from pydantic.v1 import`). One `*_Requirement_Attribute` class per framework type + `Generic_Compliance_Requirement_Attribute` as fallback.
- `Compliance_Requirement.Attributes: list[Union[...]]`**`Generic_Compliance_Requirement_Attribute` MUST be LAST** in the Union or every framework-specific attribute falls through to Generic (Pydantic v1 tries union members in order).
- **`compliance.py`** — runtime linker. `get_check_compliance()` builds the key as `f"{Framework}-{Version}"` **only if `Version` is non-empty**. An empty Version makes the key just `"{Framework}"` — this breaks downstream filters and tests that expect the versioned key.
- `Compliance.get_bulk(provider)` walks `prowler/compliance/{provider}/` and parses every `.json` file. No central index — just directory scan.
### Layer 2: JSON Frameworks — `prowler/compliance/{provider}/`
See "Compliance Framework Location" and "Framework-Specific Attribute Structures" sections below.
### Layer 3: Output Formatters — `prowler/lib/outputs/compliance/{framework}/`
**Every framework directory follows this exact convention** — do not deviate:
```
{framework}/
├── __init__.py
├── {framework}.py # ONLY get_{framework}_table() — NO function docstring
├── {framework}_{provider}.py # One class per provider (e.g., CCC_AWS, CCC_Azure, CCC_GCP)
└── models.py # One Pydantic v2 BaseModel per provider (CSV columns)
```
- **`{framework}.py`** holds the **table dispatcher function** `get_{framework}_table()`. It prints the pass/fail/muted summary table. **Must NOT import `Finding` or `ComplianceOutput`** — doing so creates a circular import with `prowler/lib/outputs/compliance/compliance.py`. Only imports: `colorama`, `tabulate`, `prowler.config.config.orange_color`.
- **`{framework}_{provider}.py`** holds a per-provider class like `CCC_AWS(ComplianceOutput)` with a `transform()` method that walks findings and emits rows. This file IS allowed to import `Finding` because it's not on the dispatcher import chain.
- **`models.py`** holds one Pydantic v2 `BaseModel` per provider. Field names become CSV column headers (**public API** — renaming breaks downstream consumers).
- **Never collapse per-provider files into a unified parameterized class**, even when DRY-tempting. Every framework in Prowler follows the per-provider file pattern and reviewers will reject the refactor. CSV columns differ per provider (`AccountId`/`Region` vs `SubscriptionId`/`Location` vs `ProjectId`/`Location`) — three classes is the convention.
- **No function docstring on `get_{framework}_table()`** — no other framework has one; stay consistent.
- Register in `prowler/lib/outputs/compliance/compliance.py``display_compliance_table()` with an `elif compliance_framework.startswith("{framework}_"):` branch. Import the table function at the top of the file.
### Layer 4: API / UI
- **API table dispatcher**: `api/src/backend/tasks/jobs/export.py``COMPLIANCE_CLASS_MAP` keyed by provider. Uses `startswith` predicates: `(lambda name: name.startswith("ccc_"), CCC_AWS)`. **Never use exact match** (`name == "ccc_aws"`) — it's inconsistent and breaks versioning.
- **API lazy loader**: `api/src/backend/api/compliance.py``LazyComplianceTemplate` and `LazyChecksMapping` load compliance per provider on first access.
- **UI mapper routing**: `ui/lib/compliance/compliance-mapper.ts` routes framework names → per-framework mapper.
- **UI per-framework mapper**: `ui/lib/compliance/{framework}.tsx` flattens `Requirements` into a 3-level tree (Framework → Category → Control → Requirement) for the accordion view. Groups by `Attributes[0].FamilyName` and `Attributes[0].Section`.
- **UI detail panel**: `ui/components/compliance/compliance-custom-details/{framework}-details.tsx`.
- **UI types**: `ui/types/compliance.ts` — TypeScript mirrors of the attribute metadata.
### The CLI Pipeline (end-to-end)
```
prowler aws --compliance ccc_aws
Compliance.get_bulk("aws") → parses prowler/compliance/aws/*.json
update_checks_metadata_with_compliance() → attaches compliance info to CheckMetadata
execute_checks() → runs checks, produces Finding objects
get_check_compliance(finding, "aws", bulk_checks_metadata)
→ dict "{Framework}-{Version}" → [requirement_ids]
CCC_AWS(findings, compliance).transform() → per-provider class builds CSV rows
batch_write_data_to_file() → writes {output_filename}_ccc_aws.csv
display_compliance_table() → get_ccc_table() → prints stdout summary
```
---
## Compliance Framework Location
Frameworks are JSON files located in: `prowler/compliance/{provider}/{framework_name}_{provider}.json`
@@ -539,453 +455,14 @@ Prowler ThreatScore is a custom security scoring framework developed by Prowler
- **M365:** `cis_4.0_m365.json`, `iso27001_2022_m365.json`
- **NHN:** `iso27001_2022_nhn.json`
## Workflow A: Sync a Framework With an Upstream Catalog
Use when the framework is maintained upstream (CIS Benchmarks, FINOS CCC, CSA CCM, NIST, ENS, etc.) and Prowler needs to catch up.
### Step 1 — Cache the upstream source
Download every upstream file to a local cache so subsequent iterations don't hit the network. For FINOS CCC:
```bash
mkdir -p /tmp/ccc_upstream
catalogs="core/ccc storage/object management/auditlog management/logging ..."
for p in $catalogs; do
safe=$(echo "$p" | tr '/' '_')
gh api "repos/finos/common-cloud-controls/contents/catalogs/$p/controls.yaml" \
-H "Accept: application/vnd.github.raw" > "/tmp/ccc_upstream/${safe}.yaml"
done
```
### Step 2 — Run the generic sync runner against a framework config
The sync tooling is split into three layers so adding a new framework only takes a YAML config (and optionally a new parser module for an unfamiliar upstream format):
```
skills/prowler-compliance/assets/
├── sync_framework.py # generic runner — works for any framework
├── configs/
│ └── ccc.yaml # per-framework config (canonical example)
└── parsers/
├── __init__.py
└── finos_ccc.py # parser module for FINOS CCC YAML
```
**For frameworks that already have a config + parser** (today: FINOS CCC), run:
```bash
python skills/prowler-compliance/assets/sync_framework.py \
skills/prowler-compliance/assets/configs/ccc.yaml
```
The runner loads the config, validates it, dynamically imports the parser declared in `parser.module`, calls `parser.parse_upstream(config) -> list[dict]`, then applies generic post-processing (id uniqueness safety net, `FamilyName` normalization, legacy check-mapping preservation) and writes the provider JSONs.
**To add a new framework sync**:
1. **Write a config file** at `skills/prowler-compliance/assets/configs/{framework}.yaml`. See `configs/ccc.yaml` as the canonical example. Required top-level sections:
- `framework``name`, `display_name`, `version` (**never empty** — empty Version silently breaks `get_check_compliance()` key construction, so the runner refuses to start), `description_template` (accepts `{provider_display}`, `{provider_key}`, `{framework_name}`, `{framework_display}`, `{version}` placeholders).
- `providers` — list of `{key, display}` pairs, one per Prowler provider the framework targets.
- `output.path_template` — supports `{provider}`, `{framework}`, `{version}` placeholders. Examples: `"prowler/compliance/{provider}/ccc_{provider}.json"` for unversioned file names, `"prowler/compliance/{provider}/cis_{version}_{provider}.json"` for versioned ones.
- `upstream.dir` — local cache directory (populate via Step 1).
- `parser.module` — name of the module under `parsers/` to load (without `.py`). Everything else under `parser.` is opaque to the runner and passed to the parser as config.
- `post_processing.check_preservation.primary_key` — top-level field name for the primary legacy-mapping lookup (almost always `Id`).
- `post_processing.check_preservation.fallback_keys`**config-driven fallback keys** for preserving check mappings when ids change. Each entry is a list of `Attributes[0]` field names composed into a tuple. Examples:
- CCC: `- [Section, Applicability]` (because `Applicability` is a CCC-only attribute, verified in `compliance_models.py:213`).
- CIS would use `- [Section, Profile]`.
- NIST would use `- [ItemId]`.
- List-valued fields (like `Applicability`) are automatically frozen to `frozenset` so the tuple is hashable.
- `post_processing.family_name_normalization` (optional) — map of raw → canonical `FamilyName` values. The UI groups by `Attributes[0].FamilyName` exactly, so inconsistent upstream variants otherwise become separate tree branches.
2. **Reuse an existing parser** if the upstream format matches one (currently only `finos_ccc` exists). Otherwise, **write a new parser** at `parsers/{name}.py` implementing:
```python
def parse_upstream(config: dict) -> list[dict]:
"""Return Prowler-format requirements {Id, Description, Attributes: [...], Checks: []}.
Ids MUST be unique in the returned list. The runner raises ValueError
on duplicates — it does NOT silently renumber, because mutating a
canonical upstream id (e.g. CIS '1.1.1' or NIST 'AC-2(1)') would be
catastrophic. The parser owns all upstream-format quirks: foreign-prefix
rewriting, genuine collision renumbering, shape handling.
"""
```
The parser reads its own settings from `config['upstream']` and `config['parser']`. It does NOT load existing Prowler JSONs (the runner does that for check preservation) and does NOT write output (the runner does that too).
**Gotchas the runner already handles for you** (learned from the FINOS CCC v2025.10 sync — they're documented here so you don't re-discover them):
- **Multiple upstream YAML shapes**. Most FINOS CCC catalogs use `control-families: [...]`, but `storage/object` uses a top-level `controls: [...]` with a `family: "CCC.X.Y"` reference id and no human-readable family name. A parser that only handles shape 1 silently drops the shape-2 catalog — this exact bug dropped ObjStor from Prowler for a full iteration. `parsers/finos_ccc.py` handles both shapes; if you write a new parser for a similar format, test with at least one file of each shape.
- **Whitespace collapse**. Upstream YAML multi-line block scalars (`|`) preserve newlines. Prowler stores descriptions single-line. Collapse with `" ".join(value.split())` before emitting (see `parsers/finos_ccc.py::clean()`).
- **Foreign-prefix AR id rewriting**. Upstream sometimes aliases requirements across catalogs by keeping the original prefix (e.g., `CCC.AuditLog.CN08.AR01` appears nested under `CCC.Logging.CN03`). Rewrite the foreign id to fit its parent control: `CCC.Logging.CN03.AR01`. This logic is parser-specific because the id structure varies per framework (CCC uses 3-dot depth; CIS uses numeric dots; NIST uses `AC-2(1)`).
- **Genuine upstream collision renumbering**. Sometimes upstream has a real typo where two different requirements share the same id (e.g., `CCC.Core.CN14.AR02` defined twice for 30-day and 14-day backup variants). Renumber the second copy to the next free AR number (`.AR03`). The parser handles this; the runner asserts the final list has unique ids as a safety net.
- **Existing check mapping preservation**. The runner uses the `primary_key` + `fallback_keys` declared in config to look up the old `Checks` list for each requirement. For CCC this means primary index by `Id` plus fallback index by `(Section, frozenset(Applicability))` — the fallback recovers mappings for requirements whose ids were rewritten or renumbered by the parser.
- **FamilyName normalization**. Configured via `post_processing.family_name_normalization` — no code changes needed to collapse upstream variants like `"Logging & Monitoring"` → `"Logging and Monitoring"`.
- **Populate `Version`**. The runner refuses to start on empty `framework.version` — fail-fast replaces the silent bug where `get_check_compliance()` would build the key as just `"{Framework}"`.
### Step 3 — Validate before committing
```python
from prowler.lib.check.compliance_models import Compliance
for prov in ['aws', 'azure', 'gcp']:
c = Compliance.parse_file(f"prowler/compliance/{prov}/ccc_{prov}.json")
print(f"{prov}: {len(c.Requirements)} reqs, version={c.Version}")
```
Any `ValidationError` means the Attribute fields don't match the `*_Requirement_Attribute` model. Either fix the JSON or extend the model in `compliance_models.py` (remember: Generic stays last).
### Step 4 — Verify every check id exists
```python
import json
from pathlib import Path
for prov in ['aws', 'azure', 'gcp']:
existing = {p.stem.replace('.metadata','')
for p in Path(f'prowler/providers/{prov}/services').rglob('*.metadata.json')}
with open(f'prowler/compliance/{prov}/ccc_{prov}.json') as f:
data = json.load(f)
refs = {c for r in data['Requirements'] for c in r['Checks']}
missing = refs - existing
assert not missing, f"{prov} missing: {missing}"
```
A stale check id silently becomes dead weight — no finding will ever map to it. This pre-validation **must run on every write**; bake it into the generator script.
### Step 5 — Add an attribute model if needed
Only if the framework has fields beyond `Generic_Compliance_Requirement_Attribute`. Add the class to `prowler/lib/check/compliance_models.py` and register it in `Compliance_Requirement.Attributes: list[Union[...]]`. **Generic stays last.**
---
## Workflow B: Audit Check Mappings as a Cloud Auditor
Use when the user asks to review existing mappings ("are these correct?", "verify that the checks apply", "audit the CCC mappings"). This is the highest-value compliance task — it surfaces padded mappings with zero actual coverage and missing mappings for legitimate coverage.
### The golden rule
> A Prowler check's title/risk MUST **literally describe what the requirement text says**. "Related" is not enough. If no check actually addresses the requirement, leave `Checks: []` (MANUAL) — **honest MANUAL is worth more than padded coverage**.
### Audit process
**Step 1 — Build a per-provider check inventory** (cache in `/tmp/`):
```python
import json
from pathlib import Path
for provider in ['aws', 'azure', 'gcp']:
inv = {}
for meta in Path(f'prowler/providers/{provider}/services').rglob('*.metadata.json'):
with open(meta) as f:
d = json.load(f)
cid = d.get('CheckID') or meta.stem.replace('.metadata','')
inv[cid] = {
'service': d.get('ServiceName', ''),
'title': d.get('CheckTitle', ''),
'risk': d.get('Risk', ''),
'description': d.get('Description', ''),
}
with open(f'/tmp/checks_{provider}.json', 'w') as f:
json.dump(inv, f, indent=2)
```
**Step 2 — Keyword/service query helper** — see [assets/query_checks.py](assets/query_checks.py):
```bash
python assets/query_checks.py aws encryption transit # keyword AND-search
python assets/query_checks.py aws --service iam # all iam checks
python assets/query_checks.py aws --id kms_cmk_rotation_enabled # full metadata
```
**Step 3 — Dump a framework section with current mappings** — see [assets/dump_section.py](assets/dump_section.py):
```bash
python assets/dump_section.py ccc "CCC.Core." # all Core ARs across 3 providers
python assets/dump_section.py ccc "CCC.AuditLog." # all AuditLog ARs
```
**Step 4 — Encode explicit REPLACE decisions** — see [assets/audit_framework_template.py](assets/audit_framework_template.py). Structure:
```python
DECISIONS = {}
DECISIONS["CCC.Core.CN01.AR01"] = {
"aws": [
"cloudfront_distributions_https_enabled",
"cloudfront_distributions_origin_traffic_encrypted",
# ...
],
"azure": [
"storage_secure_transfer_required_is_enabled",
"app_minimum_tls_version_12",
# ...
],
"gcp": [
"cloudsql_instance_ssl_connections",
],
# Missing provider key = leave the legacy mapping untouched
}
# Empty list = EXPLICITLY MANUAL (overwrites legacy)
DECISIONS["CCC.Core.CN01.AR07"] = {
"aws": [], # Prowler has no IANA port/protocol check
"azure": [],
"gcp": [],
}
```
**REPLACE, not PATCH.** Encoding every mapping as a full list (not add/remove delta) makes the audit reproducible and surfaces hidden assumptions from the legacy data.
**Step 5 — Pre-validation**. The audit script MUST validate every check id against the inventory and **abort with stderr listing typos**. Common typos caught during a real audit:
- `fsx_file_system_encryption_at_rest_using_kms` (doesn't exist)
- `cosmosdb_account_encryption_at_rest_with_cmk` (doesn't exist)
- `sqlserver_geo_replication` (doesn't exist)
- `redshift_cluster_audit_logging` (should be `redshift_cluster_encrypted_at_rest`)
- `postgresql_flexible_server_require_secure_transport` (should be `postgresql_flexible_server_enforce_ssl_enabled`)
- `storage_secure_transfer_required_enabled` (should be `storage_secure_transfer_required_is_enabled`)
- `sqlserver_minimum_tls_version_12` (should be `sqlserver_recommended_minimal_tls_version`)
**Step 6 — Apply + validate + test**:
```bash
python /path/to/audit_script.py # applies decisions, pre-validates
python -m pytest tests/lib/outputs/compliance/ tests/lib/check/ -q
```
### Audit Reference Table: Requirement Text → Prowler Checks
Use this table to map CCC-style / NIST-style / ISO-style requirements to the checks that actually verify them. Built from a real audit of 172 CCC ARs × 3 providers.
| Requirement text | AWS checks | Azure checks | GCP checks |
|---|---|---|---|
| **TLS in transit enforced** | `cloudfront_distributions_https_enabled`, `s3_bucket_secure_transport_policy`, `elbv2_ssl_listeners`, `elbv2_insecure_ssl_ciphers`, `elb_ssl_listeners`, `elb_insecure_ssl_ciphers`, `opensearch_service_domains_https_communications_enforced`, `rds_instance_transport_encrypted`, `redshift_cluster_in_transit_encryption_enabled`, `elasticache_redis_cluster_in_transit_encryption_enabled`, `dynamodb_accelerator_cluster_in_transit_encryption_enabled`, `dms_endpoint_ssl_enabled`, `kafka_cluster_in_transit_encryption_enabled`, `transfer_server_in_transit_encryption_enabled`, `glue_database_connections_ssl_enabled`, `sns_subscription_not_using_http_endpoints` | `storage_secure_transfer_required_is_enabled`, `storage_ensure_minimum_tls_version_12`, `postgresql_flexible_server_enforce_ssl_enabled`, `mysql_flexible_server_ssl_connection_enabled`, `mysql_flexible_server_minimum_tls_version_12`, `sqlserver_recommended_minimal_tls_version`, `app_minimum_tls_version_12`, `app_ensure_http_is_redirected_to_https`, `app_ftp_deployment_disabled` | `cloudsql_instance_ssl_connections` (almost only option) |
| **TLS 1.3 specifically** | Partial: `cloudfront_distributions_using_deprecated_ssl_protocols`, `elb*_insecure_ssl_ciphers`, `*_minimum_tls_version_12` | Partial: `*_minimum_tls_version_12` checks | None — accept as MANUAL |
| **SSH / port 22 hardening** | `ec2_instance_port_ssh_exposed_to_internet`, `ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22`, `ec2_networkacl_allow_ingress_tcp_port_22` | `network_ssh_internet_access_restricted`, `vm_linux_enforce_ssh_authentication` | `compute_firewall_ssh_access_from_the_internet_allowed`, `compute_instance_block_project_wide_ssh_keys_disabled`, `compute_project_os_login_enabled`, `compute_project_os_login_2fa_enabled` |
| **mTLS (mutual TLS)** | `kafka_cluster_mutual_tls_authentication_enabled`, `apigateway_restapi_client_certificate_enabled` | `app_client_certificates_on` | None — MANUAL |
| **Data at rest encrypted** | `s3_bucket_default_encryption`, `s3_bucket_kms_encryption`, `ec2_ebs_default_encryption`, `ec2_ebs_volume_encryption`, `rds_instance_storage_encrypted`, `rds_cluster_storage_encrypted`, `rds_snapshots_encrypted`, `dynamodb_tables_kms_cmk_encryption_enabled`, `redshift_cluster_encrypted_at_rest`, `neptune_cluster_storage_encrypted`, `documentdb_cluster_storage_encrypted`, `opensearch_service_domains_encryption_at_rest_enabled`, `kinesis_stream_encrypted_at_rest`, `firehose_stream_encrypted_at_rest`, `sns_topics_kms_encryption_at_rest_enabled`, `sqs_queues_server_side_encryption_enabled`, `efs_encryption_at_rest_enabled`, `athena_workgroup_encryption`, `glue_data_catalogs_metadata_encryption_enabled`, `backup_vaults_encrypted`, `backup_recovery_point_encrypted`, `cloudtrail_kms_encryption_enabled`, `cloudwatch_log_group_kms_encryption_enabled`, `eks_cluster_kms_cmk_encryption_in_secrets_enabled`, `sagemaker_notebook_instance_encryption_enabled`, `apigateway_restapi_cache_encrypted`, `kafka_cluster_encryption_at_rest_uses_cmk`, `dynamodb_accelerator_cluster_encryption_enabled`, `storagegateway_fileshare_encryption_enabled` | `storage_infrastructure_encryption_is_enabled`, `storage_ensure_encryption_with_customer_managed_keys`, `vm_ensure_attached_disks_encrypted_with_cmk`, `vm_ensure_unattached_disks_encrypted_with_cmk`, `sqlserver_tde_encryption_enabled`, `sqlserver_tde_encrypted_with_cmk`, `databricks_workspace_cmk_encryption_enabled`, `monitor_storage_account_with_activity_logs_cmk_encrypted` | `compute_instance_encryption_with_csek_enabled`, `dataproc_encrypted_with_cmks_disabled`, `bigquery_dataset_cmk_encryption`, `bigquery_table_cmk_encryption` |
| **CMEK required (customer-managed keys)** | `kms_cmk_are_used` | `storage_ensure_encryption_with_customer_managed_keys`, `vm_ensure_attached_disks_encrypted_with_cmk`, `vm_ensure_unattached_disks_encrypted_with_cmk`, `sqlserver_tde_encrypted_with_cmk`, `databricks_workspace_cmk_encryption_enabled` | `bigquery_dataset_cmk_encryption`, `bigquery_table_cmk_encryption`, `dataproc_encrypted_with_cmks_disabled`, `compute_instance_encryption_with_csek_enabled` |
| **Key rotation enabled** | `kms_cmk_rotation_enabled` | `keyvault_key_rotation_enabled`, `storage_key_rotation_90_days` | `kms_key_rotation_enabled` |
| **MFA for UI access** | `iam_root_mfa_enabled`, `iam_root_hardware_mfa_enabled`, `iam_user_mfa_enabled_console_access`, `iam_user_hardware_mfa_enabled`, `iam_administrator_access_with_mfa`, `cognito_user_pool_mfa_enabled` | `entra_privileged_user_has_mfa`, `entra_non_privileged_user_has_mfa`, `entra_user_with_vm_access_has_mfa`, `entra_security_defaults_enabled` | `compute_project_os_login_2fa_enabled` |
| **API access / credentials** | `iam_no_root_access_key`, `iam_user_no_setup_initial_access_key`, `apigateway_restapi_authorizers_enabled`, `apigateway_restapi_public_with_authorizer`, `apigatewayv2_api_authorizers_enabled` | `entra_conditional_access_policy_require_mfa_for_management_api`, `app_function_access_keys_configured`, `app_function_identity_is_configured` | `apikeys_api_restrictions_configured`, `apikeys_key_exists`, `apikeys_key_rotated_in_90_days` |
| **Log all admin/config changes** | `cloudtrail_multi_region_enabled`, `cloudtrail_multi_region_enabled_logging_management_events`, `cloudtrail_cloudwatch_logging_enabled`, `cloudtrail_log_file_validation_enabled`, `cloudwatch_log_metric_filter_*`, `cloudwatch_changes_to_*_alarm_configured`, `config_recorder_all_regions_enabled` | `monitor_diagnostic_settings_exists`, `monitor_diagnostic_setting_with_appropriate_categories`, `monitor_alert_*` | `iam_audit_logs_enabled`, `logging_log_metric_filter_and_alert_for_*`, `logging_sink_created` |
| **Log integrity (digital signatures)** | `cloudtrail_log_file_validation_enabled` (exact) | None | None |
| **Public access denied** | `s3_bucket_public_access`, `s3_bucket_public_list_acl`, `s3_bucket_public_write_acl`, `s3_account_level_public_access_blocks`, `apigateway_restapi_public`, `awslambda_function_url_public`, `awslambda_function_not_publicly_accessible`, `rds_instance_no_public_access`, `rds_snapshots_public_access`, `ec2_securitygroup_allow_ingress_from_internet_to_all_ports`, `sns_topics_not_publicly_accessible`, `sqs_queues_not_publicly_accessible` | `storage_blob_public_access_level_is_disabled`, `storage_ensure_private_endpoints_in_storage_accounts`, `containerregistry_not_publicly_accessible`, `keyvault_private_endpoints`, `app_function_not_publicly_accessible`, `aks_clusters_public_access_disabled`, `network_http_internet_access_restricted` | `cloudstorage_bucket_public_access`, `compute_instance_public_ip`, `cloudsql_instance_public_ip`, `compute_firewall_*_access_from_the_internet_allowed` |
| **IAM least privilege** | `iam_*_no_administrative_privileges`, `iam_policy_allows_privilege_escalation`, `iam_inline_policy_allows_privilege_escalation`, `iam_role_administratoraccess_policy`, `iam_group_administrator_access_policy`, `iam_user_administrator_access_policy`, `iam_policy_attached_only_to_group_or_roles`, `iam_role_cross_service_confused_deputy_prevention` | `iam_role_user_access_admin_restricted`, `iam_subscription_roles_owner_custom_not_created`, `iam_custom_role_has_permissions_to_administer_resource_locks` | `iam_sa_no_administrative_privileges`, `iam_no_service_roles_at_project_level`, `iam_role_kms_enforce_separation_of_duties`, `iam_role_sa_enforce_separation_of_duties` |
| **Password policy** | `iam_password_policy_minimum_length_14`, `iam_password_policy_uppercase`, `iam_password_policy_lowercase`, `iam_password_policy_symbol`, `iam_password_policy_number`, `iam_password_policy_expires_passwords_within_90_days_or_less`, `iam_password_policy_reuse_24` | None | None |
| **Credential rotation / unused** | `iam_rotate_access_key_90_days`, `iam_user_accesskey_unused`, `iam_user_console_access_unused` | None | `iam_sa_user_managed_key_rotate_90_days`, `iam_sa_user_managed_key_unused`, `iam_service_account_unused` |
| **VPC / flow logs** | `vpc_flow_logs_enabled` | `network_flow_log_captured_sent`, `network_watcher_enabled`, `network_flow_log_more_than_90_days` | `compute_subnet_flow_logs_enabled` |
| **Backup / DR / Multi-AZ** | `backup_vaults_exist`, `backup_plans_exist`, `backup_reportplans_exist`, `rds_instance_backup_enabled`, `rds_*_protected_by_backup_plan`, `rds_cluster_multi_az`, `neptune_cluster_backup_enabled`, `documentdb_cluster_backup_enabled`, `efs_have_backup_enabled`, `s3_bucket_cross_region_replication`, `dynamodb_table_protected_by_backup_plan` | `vm_backup_enabled`, `vm_sufficient_daily_backup_retention_period`, `storage_geo_redundant_enabled` | `cloudsql_instance_automated_backups`, `cloudstorage_bucket_log_retention_policy_lock`, `cloudstorage_bucket_sufficient_retention_period` |
| **Access analysis / discovery** | `accessanalyzer_enabled`, `accessanalyzer_enabled_without_findings` | None specific | `iam_account_access_approval_enabled`, `iam_cloud_asset_inventory_enabled` |
| **Object lock / retention** | `s3_bucket_object_lock`, `s3_bucket_object_versioning`, `s3_bucket_lifecycle_enabled`, `cloudtrail_bucket_requires_mfa_delete`, `s3_bucket_no_mfa_delete` | `storage_ensure_soft_delete_is_enabled`, `storage_blob_versioning_is_enabled`, `storage_ensure_file_shares_soft_delete_is_enabled` | `cloudstorage_bucket_log_retention_policy_lock`, `cloudstorage_bucket_soft_delete_enabled`, `cloudstorage_bucket_versioning_enabled`, `cloudstorage_bucket_sufficient_retention_period` |
| **Uniform bucket-level access** | `s3_bucket_acl_prohibited` | `storage_account_key_access_disabled`, `storage_default_to_entra_authorization_enabled` | `cloudstorage_bucket_uniform_bucket_level_access` |
| **Container vulnerability scanning** | `ecr_registry_scan_images_on_push_enabled`, `ecr_repositories_scan_vulnerabilities_in_latest_image` | `defender_container_images_scan_enabled`, `defender_container_images_resolved_vulnerabilities` | `artifacts_container_analysis_enabled`, `gcr_container_scanning_enabled` |
| **WAF / rate limiting** | `wafv2_webacl_with_rules`, `waf_*_webacl_with_rules`, `wafv2_webacl_logging_enabled`, `waf_global_webacl_logging_enabled` | None | None |
| **Deployment region restriction** | `organizations_scp_check_deny_regions` | None | None |
| **Secrets automatic rotation** | `secretsmanager_automatic_rotation_enabled`, `secretsmanager_secret_rotated_periodically` | `keyvault_rbac_secret_expiration_set`, `keyvault_non_rbac_secret_expiration_set` | None |
| **Certificate management** | `acm_certificates_expiration_check`, `acm_certificates_with_secure_key_algorithms`, `acm_certificates_transparency_logs_enabled` | `keyvault_key_expiration_set_in_non_rbac`, `keyvault_rbac_key_expiration_set`, `keyvault_non_rbac_secret_expiration_set` | None |
| **GenAI guardrails / input/output filtering** | `bedrock_guardrail_prompt_attack_filter_enabled`, `bedrock_guardrail_sensitive_information_filter_enabled`, `bedrock_agent_guardrail_enabled`, `bedrock_model_invocation_logging_enabled`, `bedrock_api_key_no_administrative_privileges`, `bedrock_api_key_no_long_term_credentials` | None | None |
| **ML dev environment security** | `sagemaker_notebook_instance_root_access_disabled`, `sagemaker_notebook_instance_without_direct_internet_access_configured`, `sagemaker_notebook_instance_vpc_settings_configured`, `sagemaker_models_vpc_settings_configured`, `sagemaker_training_jobs_vpc_settings_configured`, `sagemaker_training_jobs_network_isolation_enabled`, `sagemaker_training_jobs_volume_and_output_encryption_enabled` | None | None |
| **Threat detection / anomalous behavior** | `cloudtrail_threat_detection_enumeration`, `cloudtrail_threat_detection_privilege_escalation`, `cloudtrail_threat_detection_llm_jacking`, `guardduty_is_enabled`, `guardduty_no_high_severity_findings` | None | None |
| **Serverless private access** | `awslambda_function_inside_vpc`, `awslambda_function_not_publicly_accessible`, `awslambda_function_url_public` | `app_function_not_publicly_accessible` | None |
### What Prowler Does NOT Cover (accept MANUAL honestly)
Don't pad mappings for these — mark `Checks: []` and move on:
- **TLS 1.3 version specifically** — Prowler verifies TLS is enforced, not always the exact version
- **IANA port-protocol consistency** — no check for "protocol running on its assigned port"
- **mTLS on most Azure/GCP services** — limited to App Service client certs on Azure, nothing on GCP
- **Rate limiting** on monitoring endpoints, load balancers, serverless invocations, vector ingestion
- **Session cookie expiry** (LB stickiness)
- **HTTP header scrubbing** (Server, X-Powered-By)
- **Certificate transparency verification for imports**
- **Model version pinning, red teaming, AI quality review**
- **Vector embedding validation, dimensional constraints, ANN vs exact search**
- **Secret region replication** (cross-region residency)
- **Lifecycle cleanup policies on container registries**
- **Row-level / column-level security in data warehouses**
- **Deployment region restriction on Azure/GCP** (AWS has `organizations_scp_check_deny_regions`, others don't)
- **Cross-tenant alert silencing permissions**
- **Field-level masking in logs**
- **Managed view enforcement for database access**
- **Automatic MFA delete on all S3 buckets** (only CloudTrail bucket variant exists for some frameworks — AWS has the generic `s3_bucket_no_mfa_delete` though)
---
## Workflow C: Add a New Output Formatter
Use when a new framework needs its own CSV columns or terminal table. Follow the c5/csa/ens layout exactly:
```bash
mkdir -p prowler/lib/outputs/compliance/{framework}
touch prowler/lib/outputs/compliance/{framework}/__init__.py
```
### Step 1 — Create `{framework}.py` (table dispatcher ONLY)
Copy from `prowler/lib/outputs/compliance/c5/c5.py` and change the function name + framework string. The `diff` between your file and `c5.py` should be just those two lines. **No function docstring** — other frameworks don't have one, stay consistent.
### Step 2 — Create `models.py`
One Pydantic v2 `BaseModel` per provider. Field names become CSV column headers (public API — don't rename later without a migration).
```python
from typing import Optional
from pydantic import BaseModel
class {Framework}_AWSModel(BaseModel):
Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
# ... provider-specific columns
Status: str
StatusExtended: str
ResourceId: str
ResourceName: str
CheckId: str
Muted: bool
```
### Step 3 — Create `{framework}_{provider}.py` for each provider
Copy from `prowler/lib/outputs/compliance/c5/c5_aws.py` etc. Contains the `{Framework}_AWS(ComplianceOutput)` class with `transform()` that walks findings and emits model rows. This file IS allowed to import `Finding`.
### Step 4 — Register everywhere
**`prowler/lib/outputs/compliance/compliance.py`** (CLI table dispatcher):
```python
from prowler.lib.outputs.compliance.{framework}.{framework} import get_{framework}_table
def display_compliance_table(...):
...
elif compliance_framework.startswith("{framework}_"):
get_{framework}_table(findings, bulk_checks_metadata,
compliance_framework, output_filename,
output_directory, compliance_overview)
```
**`prowler/__main__.py`** (CLI output writer per provider):
Add imports at the top:
```python
from prowler.lib.outputs.compliance.{framework}.{framework}_aws import {Framework}_AWS
from prowler.lib.outputs.compliance.{framework}.{framework}_azure import {Framework}_Azure
from prowler.lib.outputs.compliance.{framework}.{framework}_gcp import {Framework}_GCP
```
Add provider-specific `elif compliance_name.startswith("{framework}_"):` branches that instantiate the class and call `batch_write_data_to_file()`.
**`api/src/backend/tasks/jobs/export.py`** (API export dispatcher):
```python
from prowler.lib.outputs.compliance.{framework}.{framework}_aws import {Framework}_AWS
# ... azure, gcp
COMPLIANCE_CLASS_MAP = {
"aws": [
# ...
(lambda name: name.startswith("{framework}_"), {Framework}_AWS),
],
# ... azure, gcp
}
```
**Always use `startswith`**, never `name == "framework_aws"`. Exact match is a regression.
### Step 5 — Add tests
Create `tests/lib/outputs/compliance/{framework}/` with `{framework}_aws_test.py`, `{framework}_azure_test.py`, `{framework}_gcp_test.py`. See the test template in [references/test_template.md](references/test_template.md).
Add fixtures to `tests/lib/outputs/compliance/fixtures.py`: one `Compliance` object per provider with 1 evaluated + 1 manual requirement to exercise both code paths in `transform()`.
### Circular import warning
**The table dispatcher file (`{framework}.py`) MUST NOT import `Finding`** (directly or transitively). The cycle is:
```
compliance.compliance imports get_{framework}_table
→ {framework}.py imports ComplianceOutput
→ compliance_output imports Finding
→ finding imports get_check_compliance from compliance.compliance
→ CIRCULAR
```
Keep `{framework}.py` bare — only `colorama`, `tabulate`, `prowler.config.config`. Put anything that imports `Finding` in the per-provider `{framework}_{provider}.py` files.
---
## Conventions and Hard-Won Gotchas
These are lessons from the FINOS CCC v2025.10 sync + 172-AR audit pass (April 2026). Learn them once; save days of debugging.
1. **Per-provider files are non-negotiable.** Never collapse `{framework}_aws.py`, `{framework}_azure.py`, `{framework}_gcp.py` into a single parameterized class, no matter how DRY-tempting. Every other framework in the codebase follows the per-provider pattern and reviewers will reject the refactor. The CSV column names differ per provider — three classes is the convention.
2. **`{framework}.py` has NO function docstring.** Other frameworks don't have them. Don't add one to be "helpful".
3. **Circular import protection**: the table dispatcher file MUST NOT import `Finding` (directly or transitively). Split the code so `{framework}.py` only has `get_{framework}_table()` with bare imports, and `{framework}_{provider}.py` holds the class that needs `Finding`.
4. **`Generic_Compliance_Requirement_Attribute` is the fallback** — in the `Compliance_Requirement.Attributes` Union in `compliance_models.py`, Generic MUST be LAST because Pydantic v1 tries union members in order. Putting Generic first means every framework-specific attribute falls through to Generic and the specific model is never used.
5. **Pydantic v1 imports.** `from pydantic.v1 import BaseModel` in `compliance_models.py` — not v2. Mixing causes validation errors. Pydantic v2 is used in the CSV models (`models.py`) — that's fine because they're separate trees.
6. **`get_check_compliance()` key format** is `f"{Framework}-{Version}"` ONLY if Version is set. Empty Version → key is `"{Framework}"` (no version suffix). Tests that mock compliance dicts must match this exact format — when a framework ships with `Version: ""`, downstream code and tests break silently.
7. **CSV column names from `models.py` are public API.** Don't rename a field without migrating downstream consumers — CSV headers change.
8. **Upstream YAML multi-line scalars** (`|` block scalars) preserve newlines. Collapse to single-line with `" ".join(value.split())` before writing to JSON.
9. **Upstream catalogs can use multiple shapes.** FINOS CCC uses `control-families: [...]` in most catalogs but `controls: [...]` at the top level in `storage/object`. Any sync script must handle both or silently drop entire catalogs.
10. **Foreign-prefix AR ids.** Upstream sometimes "imports" requirements from one catalog into another by keeping the original id prefix (e.g., `CCC.AuditLog.CN08.AR01` appearing under `CCC.Logging.CN03`). Prowler's compliance model requires unique ids within a catalog — rewrite the foreign id to fit the parent control: `CCC.AuditLog.CN08.AR01` (inside `CCC.Logging.CN03`) → `CCC.Logging.CN03.AR01`.
11. **Genuine upstream id collisions.** Sometimes upstream has a real typo where two different requirements share the same id (e.g., `CCC.Core.CN14.AR02` defined twice for 30-day and 14-day backup variants). Renumber the second copy to the next free AR number. Preserve check mappings by matching on `(Section, frozenset(Applicability))` since the renumbered id won't match by id.
12. **`COMPLIANCE_CLASS_MAP` in `export.py` uses `startswith` predicates** for all modern frameworks. Exact match (`name == "ccc_aws"`) is an anti-pattern — it was present for CCC until April 2026 and was the reason CCC couldn't have versioned variants.
13. **Pre-validate every check id** against the per-provider inventory before writing the JSON. A typo silently creates an unreferenced check that will fail when findings try to map to it. The audit script MUST abort with stderr listing typos, not swallow them.
14. **REPLACE is better than PATCH** for audit decisions. Encoding every mapping explicitly makes the audit reproducible and surfaces hidden assumptions from the legacy data. A PATCH system that adds/removes is too easy to forget.
15. **When no check applies, MANUAL is correct.** Do not pad mappings with tangential checks "just in case". Prowler's compliance reports are meant to be actionable — padding them with noise breaks that. Honest manual reqs can be mapped later when new checks land.
16. **UI groups by `Attributes[0].FamilyName` and `Attributes[0].Section`.** If FamilyName has inconsistent variants within the same JSON (e.g., "Logging & Monitoring" vs "Logging and Monitoring"), the UI renders them as separate categories. Section empty → the requirement falls into an orphan control with label "". Normalize before shipping.
17. **Provider coverage is asymmetric.** AWS has dense coverage (~586 checks across 80+ services): in-transit encryption, IAM, database encryption, backup. Azure (~167 checks) and GCP (~102 checks) are thinner especially for in-transit encryption, mTLS, and ML/AI. Accept the asymmetry in mappings — don't force GCP parity where Prowler genuinely can't verify.
---
## Useful One-Liners
```bash
# Count requirements per service prefix (CCC, CIS sections, etc.)
jq -r '.Requirements[].Id | split(".")[1]' prowler/compliance/aws/ccc_aws.json | sort | uniq -c
# Find duplicate requirement IDs
jq -r '.Requirements[].Id' file.json | sort | uniq -d
# Count manual requirements (no checks)
jq '[.Requirements[] | select((.Checks | length) == 0)] | length' file.json
# List all unique check references in a framework
jq -r '.Requirements[].Checks[]' file.json | sort -u
# List all unique Sections (to spot inconsistency)
jq '[.Requirements[].Attributes[0].Section] | unique' file.json
# List all unique FamilyNames (to spot inconsistency)
jq '[.Requirements[].Attributes[0].FamilyName] | unique' file.json
# Diff requirement ids between two versions of the same framework
diff <(jq -r '.Requirements[].Id' a.json | sort) <(jq -r '.Requirements[].Id' b.json | sort)
# Find where a check id is used across all frameworks
grep -rl "my_check_name" prowler/compliance/
# Check if a Prowler check exists
find prowler/providers/aws/services -name "{check_id}.metadata.json"
# Validate a JSON with Pydantic
python -c "from prowler.lib.check.compliance_models import Compliance; print(Compliance.parse_file('prowler/compliance/aws/ccc_aws.json').Framework)"
```
---
## Best Practices
1. **Requirement IDs**: Follow the original framework numbering exactly (e.g., "1.1", "A.5.1", "T1190", "ac_2_1")
2. **Check Mapping**: Map to existing checks when possible. Use `Checks: []` for manual-only requirements — honest MANUAL beats padded coverage
2. **Check Mapping**: Map to existing checks when possible. Use `Checks: []` for manual-only requirements
3. **Completeness**: Include all framework requirements, even those without automated checks
4. **Version Control**: Include framework version in `Name` and `Version` fields. **Never leave `Version: ""`** — it breaks `get_check_compliance()` key format
4. **Version Control**: Include framework version in `Name` and `Version` fields
5. **File Naming**: Use format `{framework}_{version}_{provider}.json`
6. **Validation**: Prowler validates JSON against Pydantic models at startup invalid JSON will cause errors
7. **Pre-validate check ids** against the provider's `*.metadata.json` inventory before every commit
8. **Normalize FamilyName and Section** to avoid inconsistent UI tree branches
9. **Register everywhere**: SDK model (if needed) → `compliance.py` dispatcher → `__main__.py` CLI writer → `export.py` API map → UI mapper. Skipping any layer results in silent failures
10. **Audit, don't pad**: when reviewing mappings, apply the golden rule — the check's title/risk MUST literally describe what the requirement text says. Tangential relation doesn't count
6. **Validation**: Prowler validates JSON against Pydantic models at startup - invalid JSON will cause errors
## Commands
@@ -1005,46 +482,11 @@ prowler aws --compliance cis_5.0_aws -M csv json html
## Code References
### Layer 1 — SDK / Core
- **Compliance Models:** `prowler/lib/check/compliance_models.py` (Pydantic v1 model tree)
- **Compliance Processing / Linker:** `prowler/lib/check/compliance.py` (`get_check_compliance`, `update_checks_metadata_with_compliance`)
- **Check Utils:** `prowler/lib/check/utils.py` (`list_compliance_modules`)
### Layer 2 — JSON Catalogs
- **Framework JSONs:** `prowler/compliance/{provider}/` (auto-discovered via directory walk)
### Layer 3 — Output Formatters
- **Per-framework folders:** `prowler/lib/outputs/compliance/{framework}/`
- **Shared base class:** `prowler/lib/outputs/compliance/compliance_output.py` (`ComplianceOutput` + `batch_write_data_to_file`)
- **CLI table dispatcher:** `prowler/lib/outputs/compliance/compliance.py` (`display_compliance_table`)
- **Finding model:** `prowler/lib/outputs/finding.py` (**do not import transitively from table dispatcher files — circular import**)
- **CLI writer:** `prowler/__main__.py` (per-provider `elif compliance_name.startswith(...)` branches that instantiate per-provider classes)
### Layer 4 — API / UI
- **API lazy loader:** `api/src/backend/api/compliance.py` (`LazyComplianceTemplate`, `LazyChecksMapping`)
- **API export dispatcher:** `api/src/backend/tasks/jobs/export.py` (`COMPLIANCE_CLASS_MAP` with `startswith` predicates)
- **UI framework router:** `ui/lib/compliance/compliance-mapper.ts`
- **UI per-framework mapper:** `ui/lib/compliance/{framework}.tsx`
- **UI detail panel:** `ui/components/compliance/compliance-custom-details/{framework}-details.tsx`
- **UI types:** `ui/types/compliance.ts`
- **UI icon:** `ui/components/icons/compliance/{framework}.svg` + registration in `IconCompliance.tsx`
### Tests
- **Output formatter tests:** `tests/lib/outputs/compliance/{framework}/{framework}_{provider}_test.py`
- **Shared fixtures:** `tests/lib/outputs/compliance/fixtures.py`
- **Compliance Models:** `prowler/lib/check/compliance_models.py`
- **Compliance Processing:** `prowler/lib/check/compliance.py`
- **Compliance Output:** `prowler/lib/outputs/compliance/`
## Resources
- **JSON Templates:** See [assets/](assets/) for framework JSON templates (cis, ens, iso27001, mitre_attack, prowler_threatscore, generic)
- **Config-driven compliance sync** (any upstream-backed framework):
- [assets/sync_framework.py](assets/sync_framework.py) — generic runner. Loads a YAML config, dynamically imports the declared parser, applies generic post-processing (id uniqueness safety net, `FamilyName` normalization, legacy check-mapping preservation with config-driven fallback keys), and writes the provider JSONs with Pydantic post-validation. Framework-agnostic — works for any compliance framework.
- [assets/configs/ccc.yaml](assets/configs/ccc.yaml) — canonical config example (FINOS CCC v2025.10). Copy and adapt for new frameworks.
- [assets/parsers/finos_ccc.py](assets/parsers/finos_ccc.py) — FINOS CCC YAML parser. Handles both upstream shapes (`control-families` and top-level `controls`), foreign-prefix AR rewriting, and genuine collision renumbering. Exposes `parse_upstream(config) -> list[dict]`.
- [assets/parsers/](assets/parsers/) — add new parser modules here for unfamiliar upstream formats (NIST OSCAL JSON, MITRE STIX, CIS Benchmarks, etc.). Each parser is a `{name}.py` file implementing `parse_upstream(config) -> list[dict]` with guaranteed-unique ids.
- **Reusable audit tooling** (added April 2026 after the FINOS CCC v2025.10 sync):
- [assets/audit_framework_template.py](assets/audit_framework_template.py) — explicit REPLACE decision ledger with pre-validation against the per-provider inventory. Drop-in template for auditing any framework.
- [assets/query_checks.py](assets/query_checks.py) — keyword/service/id query helper over `/tmp/checks_{provider}.json`.
- [assets/dump_section.py](assets/dump_section.py) — dumps every AR for a given id prefix across all 3 providers with current check mappings.
- [assets/build_inventory.py](assets/build_inventory.py) — generates `/tmp/checks_{provider}.json` from `*.metadata.json` files.
- **Templates:** See [assets/](assets/) for framework JSON templates
- **Documentation:** See [references/compliance-docs.md](references/compliance-docs.md) for additional resources
- **Related skill:** [prowler-compliance-review](../prowler-compliance-review/SKILL.md) — PR review checklist and validator script for compliance framework PRs

View File

@@ -1,188 +0,0 @@
#!/usr/bin/env python3
"""
Cloud-auditor pass template for any Prowler compliance framework.
Encode explicit REPLACE decisions per (requirement_id, provider) pair below.
Each decision FULLY overwrites the legacy Checks list for that requirement.
Workflow:
1. Run build_inventory.py first to cache per-provider check metadata.
2. Run dump_section.py to see current mappings for the catalog you're auditing.
3. Fill in DECISIONS below with explicit check lists.
4. Run this script — it pre-validates every check id against the inventory
and aborts with stderr listing typos before writing.
Decision rules (apply as a hostile cloud auditor):
- The Prowler check's title/risk MUST literally describe what the AR text says.
"Related" is not enough.
- If no check actually addresses the requirement, leave `[]` (= MANUAL).
HONEST MANUAL is worth more than padded coverage.
- Missing provider key = leave the legacy mapping untouched.
- Empty list `[]` = explicitly MANUAL (overwrites legacy).
Usage:
# 1. Copy this file to /tmp/audit_<framework>.py and fill in DECISIONS
# 2. Edit FRAMEWORK_KEY below to match your framework file naming
# 3. Run:
python /tmp/audit_<framework>.py
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
# ---------------------------------------------------------------------------
# Configure for your framework
# ---------------------------------------------------------------------------
# Framework file basename inside prowler/compliance/{provider}/.
# If your framework is called "cis_5.0_aws.json", FRAMEWORK_KEY is "cis_5.0".
# If the file is "ccc_aws.json", FRAMEWORK_KEY is "ccc".
FRAMEWORK_KEY = "ccc"
# Which providers to apply decisions to.
PROVIDERS = ["aws", "azure", "gcp"]
PROWLER_DIR = Path("prowler/compliance")
CHECK_INV = {prov: Path(f"/tmp/checks_{prov}.json") for prov in PROVIDERS}
# ---------------------------------------------------------------------------
# DECISIONS — encode one entry per requirement you want to audit
# ---------------------------------------------------------------------------
# DECISIONS[requirement_id][provider] = list[str] of check ids
# See SKILL.md → "Audit Reference Table: Requirement Text → Prowler Checks"
# for a comprehensive mapping cheat sheet built from a 172-AR CCC audit.
DECISIONS: dict[str, dict[str, list[str]]] = {}
# ---- Example entries (delete and replace with your own) ----
# Example 1: TLS in transit enforced (non-SSH traffic)
# DECISIONS["CCC.Core.CN01.AR01"] = {
# "aws": [
# "cloudfront_distributions_https_enabled",
# "cloudfront_distributions_origin_traffic_encrypted",
# "s3_bucket_secure_transport_policy",
# "elbv2_ssl_listeners",
# "rds_instance_transport_encrypted",
# "kafka_cluster_in_transit_encryption_enabled",
# "redshift_cluster_in_transit_encryption_enabled",
# "opensearch_service_domains_https_communications_enforced",
# ],
# "azure": [
# "storage_secure_transfer_required_is_enabled",
# "app_minimum_tls_version_12",
# "postgresql_flexible_server_enforce_ssl_enabled",
# "sqlserver_recommended_minimal_tls_version",
# ],
# "gcp": [
# "cloudsql_instance_ssl_connections",
# ],
# }
# Example 2: MANUAL — no Prowler check exists
# DECISIONS["CCC.Core.CN01.AR07"] = {
# "aws": [], # no IANA port/protocol check exists in Prowler
# "azure": [],
# "gcp": [],
# }
# Example 3: Reuse a decision for multiple sibling ARs
# DECISIONS["CCC.ObjStor.CN05.AR02"] = DECISIONS["CCC.ObjStor.CN05.AR01"]
# ---------------------------------------------------------------------------
# Driver — do not edit below
# ---------------------------------------------------------------------------
def load_inventory(provider: str) -> dict:
path = CHECK_INV[provider]
if not path.exists():
raise SystemExit(
f"Check inventory missing: {path}\n"
f"Run: python skills/prowler-compliance/assets/build_inventory.py {provider}"
)
with open(path) as f:
return json.load(f)
def resolve_json_path(provider: str) -> Path:
"""Resolve the JSON file path for a given provider.
Handles both shapes: {FRAMEWORK_KEY}_{provider}.json (ccc_aws.json) and
cases where FRAMEWORK_KEY already contains the provider suffix.
"""
candidates = [
PROWLER_DIR / provider / f"{FRAMEWORK_KEY}_{provider}.json",
PROWLER_DIR / provider / f"{FRAMEWORK_KEY}.json",
]
for c in candidates:
if c.exists():
return c
raise SystemExit(
f"Could not find framework JSON for provider={provider} "
f"with FRAMEWORK_KEY={FRAMEWORK_KEY}. Tried: {candidates}"
)
def apply_for_provider(provider: str) -> tuple[int, int, int]:
"""Apply DECISIONS to the JSON for one provider.
Returns (touched, added, removed).
"""
path = resolve_json_path(provider)
with open(path) as f:
data = json.load(f)
inv = load_inventory(provider)
touched = 0
add_count = 0
rm_count = 0
unknown: list[tuple[str, str]] = []
for req in data["Requirements"]:
rid = req["Id"]
if rid not in DECISIONS or provider not in DECISIONS[rid]:
continue
new_checks = list(dict.fromkeys(DECISIONS[rid][provider]))
for c in new_checks:
if c not in inv:
unknown.append((rid, c))
before = set(req.get("Checks") or [])
after = set(new_checks)
rm_count += len(before - after)
add_count += len(after - before)
req["Checks"] = new_checks
touched += 1
if unknown:
print(f"\n!! {provider} — UNKNOWN CHECK IDS (typos?):", file=sys.stderr)
for rid, c in unknown:
print(f" {rid} -> {c}", file=sys.stderr)
print("\nAborting: fix the check ids above and re-run.", file=sys.stderr)
sys.exit(2)
with open(path, "w") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
f.write("\n")
return touched, add_count, rm_count
def main() -> int:
if not DECISIONS:
print("No DECISIONS encoded. Fill in the DECISIONS dict and re-run.")
return 1
print(f"Applying {len(DECISIONS)} decisions to framework '{FRAMEWORK_KEY}'...")
for provider in PROVIDERS:
touched, added, removed = apply_for_provider(provider)
print(
f" {provider}: touched={touched} added={added} removed={removed}"
)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python3
"""
Build a per-provider check inventory by scanning Prowler's check metadata files.
Outputs one JSON per provider at /tmp/checks_{provider}.json with the shape:
{
"check_id": {
"service": "...",
"subservice": "...",
"resource": "...",
"severity": "...",
"title": "...",
"description": "...",
"risk": "..."
},
...
}
This is the reference used by audit_framework_template.py for pre-validation
(every check id in the audit ledger must exist in the inventory) and by
query_checks.py for keyword/service lookup.
Usage:
python skills/prowler-compliance/assets/build_inventory.py
# Or for a specific provider:
python skills/prowler-compliance/assets/build_inventory.py aws
Output:
/tmp/checks_{provider}.json for every provider discovered under
prowler/providers/ with a services/ directory.
"""
from __future__ import annotations
import json
import sys
from pathlib import Path
PROVIDERS_ROOT = Path("prowler/providers")
def discover_providers() -> list[str]:
"""Return every provider that currently has a services/ directory.
Derived from the filesystem so new providers are picked up automatically
and stale hard-coded lists cannot drift from the repo.
"""
if not PROVIDERS_ROOT.exists():
return []
return sorted(
p.name
for p in PROVIDERS_ROOT.iterdir()
if p.is_dir() and (p / "services").is_dir()
)
def build_for_provider(provider: str) -> dict:
inventory: dict[str, dict] = {}
base = Path(f"prowler/providers/{provider}/services")
if not base.exists():
print(f" skip {provider}: no services directory", file=sys.stderr)
return inventory
for meta_path in base.rglob("*.metadata.json"):
try:
with open(meta_path) as f:
data = json.load(f)
except Exception as exc:
print(f" warn: cannot parse {meta_path}: {exc}", file=sys.stderr)
continue
cid = data.get("CheckID") or meta_path.stem.replace(".metadata", "")
inventory[cid] = {
"service": data.get("ServiceName", ""),
"subservice": data.get("SubServiceName", ""),
"resource": data.get("ResourceType", ""),
"severity": data.get("Severity", ""),
"title": data.get("CheckTitle", ""),
"description": data.get("Description", ""),
"risk": data.get("Risk", ""),
}
return inventory
def main() -> int:
providers = sys.argv[1:] or discover_providers()
if not providers:
print(
f"error: no providers found under {PROVIDERS_ROOT}/",
file=sys.stderr,
)
return 1
for provider in providers:
inv = build_for_provider(provider)
out_path = Path(f"/tmp/checks_{provider}.json")
with open(out_path, "w") as f:
json.dump(inv, f, indent=2)
print(f" {provider}: {len(inv)} checks → {out_path}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,111 +0,0 @@
# FINOS Common Cloud Controls (CCC) sync config for sync_framework.py.
#
# Usage:
# python skills/prowler-compliance/assets/sync_framework.py \
# skills/prowler-compliance/assets/configs/ccc.yaml
#
# Prerequisite: run the upstream fetch step from SKILL.md Workflow A Step 1 to
# populate upstream.dir with the raw FINOS catalog YAML files.
framework:
name: CCC
display_name: Common Cloud Controls Catalog (CCC)
version: v2025.10
# The {provider_display} placeholder is replaced at output time with the
# per-provider display string from the providers list below.
description_template: "Common Cloud Controls Catalog (CCC) for {provider_display}"
providers:
- key: aws
display: AWS
- key: azure
display: Azure
- key: gcp
display: GCP
output:
# Supported placeholders: {provider}, {framework}, {version}.
# For versioned frameworks like CIS the template would be
# "prowler/compliance/{provider}/cis_{version}_{provider}.json".
path_template: "prowler/compliance/{provider}/ccc_{provider}.json"
upstream:
# Directory containing the cached FINOS catalog YAMLs. Populate via
# SKILL.md Workflow A Step 1 (gh api raw download commands).
dir: /tmp/ccc_upstream
fetch_docs: "See SKILL.md Workflow A Step 1 for gh api fetch commands"
parser:
# Name of the parser module under parsers/ (loaded dynamically by the
# runner). For FINOS CCC YAML this is always finos_ccc.
module: finos_ccc
# FINOS CCC catalog files in load order. Core first so its ARs render
# first in the output JSON.
catalog_files:
- core_ccc.yaml
- management_auditlog.yaml
- management_logging.yaml
- management_monitoring.yaml
- storage_object.yaml
- networking_loadbalancer.yaml
- networking_vpc.yaml
- crypto_key.yaml
- crypto_secrets.yaml
- database_warehouse.yaml
- database_vector.yaml
- database_relational.yaml
- devtools_build.yaml
- devtools_container-registry.yaml
- identity_iam.yaml
- ai-ml_gen-ai.yaml
- ai-ml_mlde.yaml
- app-integration_message.yaml
- compute_serverless-computing.yaml
# Shape-2 catalogs (storage/object) reference the family via id only
# (e.g. "CCC.ObjStor.Data") with no human-readable title or description
# in the YAML. Map the suffix (after the last dot) to a canonical title
# and description so the generated JSON has consistent FamilyName fields
# regardless of upstream shape.
family_id_title:
Data: Data
IAM: Identity and Access Management
Identity: Identity and Access Management
Encryption: Encryption
Logging: Logging and Monitoring
Network: Network Security
Availability: Availability
Integrity: Integrity
Confidentiality: Confidentiality
family_id_description:
Data: "The Data control family ensures the confidentiality, integrity, availability, and sovereignty of data across its lifecycle."
IAM: "The Identity and Access Management control family ensures that only trusted and authenticated entities can access resources."
post_processing:
# Collapse FamilyName variants that appear inconsistently across upstream
# catalogs. The Prowler UI groups by Attributes[0].FamilyName exactly,
# so each variant would otherwise become a separate tree branch.
family_name_normalization:
"Logging & Monitoring": "Logging and Monitoring"
"Logging and Metrics Publication": "Logging and Monitoring"
# Preserve existing Checks lists from the legacy Prowler JSON when
# regenerating. The runner builds two lookup tables from the legacy
# output: a primary index by Id, and fallback indexes composed of
# attribute field names.
#
# primary_key: the top-level requirement field to use as the primary
# lookup key (almost always "Id")
# fallback_keys: a list of composite keys. Each composite key is a list
# of Attributes[0] field names to join into a tuple. List-valued fields
# (like Applicability) are frozen to frozenset so the tuple is hashable.
#
# CCC uses (Section, Applicability) because Applicability is a CCC-only
# top-level attribute field. CIS would use (Section, Profile). NIST would
# use (ItemId,). The fallback is how renumbered or rewritten ids still
# recover their check mappings.
check_preservation:
primary_key: Id
fallback_keys:
- [Section, Applicability]

View File

@@ -1,91 +0,0 @@
#!/usr/bin/env python3
"""
Dump every requirement of a compliance framework for a given id prefix across
providers, with their current Check mappings.
Useful for reviewing a whole control family in one pass before encoding audit
decisions in audit_framework_template.py.
Usage:
# Dump all CCC.Core requirements across aws/azure/gcp
python skills/prowler-compliance/assets/dump_section.py ccc "CCC.Core."
# Dump all CIS 5.0 section 1 requirements for AWS only
python skills/prowler-compliance/assets/dump_section.py cis_5.0_aws "1."
Arguments:
framework_key: file prefix inside prowler/compliance/{provider}/ without
the provider suffix. Examples:
- "ccc" → loads ccc_aws.json / ccc_azure.json / ccc_gcp.json
- "cis_5.0_aws" → loads only that one file
- "iso27001_2022" → loads all providers
id_prefix: Requirement id prefix to filter by (e.g. "CCC.Core.",
"1.1.", "A.5.").
"""
from __future__ import annotations
import json
import sys
from collections import defaultdict
from pathlib import Path
PROWLER_COMPLIANCE_DIR = Path("prowler/compliance")
def main() -> int:
if len(sys.argv) < 3:
print(__doc__)
return 1
framework_key = sys.argv[1]
id_prefix = sys.argv[2]
# Find matching JSON files across all providers
candidates: list[tuple[str, Path]] = []
for prov_dir in sorted(PROWLER_COMPLIANCE_DIR.iterdir()):
if not prov_dir.is_dir():
continue
for json_path in prov_dir.glob("*.json"):
stem = json_path.stem
if stem == framework_key or stem.startswith(f"{framework_key}_") \
or stem == f"{framework_key}_{prov_dir.name}":
candidates.append((prov_dir.name, json_path))
if not candidates:
print(f"No files matching '{framework_key}'", file=sys.stderr)
return 2
by_id: dict[str, dict] = defaultdict(dict)
for prov, path in candidates:
with open(path) as f:
data = json.load(f)
for req in data["Requirements"]:
if req["Id"].startswith(id_prefix):
by_id[req["Id"]][prov] = {
"desc": req.get("Description", ""),
"sec": (req.get("Attributes") or [{}])[0].get("Section", ""),
"obj": (req.get("Attributes") or [{}])[0].get(
"SubSectionObjective", ""
),
"checks": req.get("Checks") or [],
}
for ar_id in sorted(by_id):
rows = by_id[ar_id]
sample = next(iter(rows.values()))
print(f"\n### {ar_id}")
print(f" desc: {sample['desc']}")
if sample["sec"]:
print(f" sec : {sample['sec']}")
if sample["obj"]:
print(f" obj : {sample['obj']}")
for prov in ["aws", "azure", "gcp", "kubernetes", "m365", "github",
"oraclecloud", "alibabacloud"]:
if prov in rows:
checks = rows[prov]["checks"]
print(f" {prov}: ({len(checks)}) {checks}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,219 +0,0 @@
"""
FINOS Common Cloud Controls (CCC) YAML parser.
Reads cached upstream YAML files and emits Prowler-format requirements
(``{Id, Description, Attributes: [...], Checks: []}``). This module is
agnostic to providers, JSON output paths, framework metadata and legacy
check-mapping preservation — those are handled by ``sync_framework.py``.
Contract
--------
``parse_upstream(config: dict) -> list[dict]``
Returns a list of Prowler-format requirement dicts with **guaranteed
unique ids**. Foreign-prefix AR rewriting and genuine collision
renumbering both happen inside this module — the runner treats id
uniqueness as a contract violation, not as something to fix.
Config keys consumed
--------------------
This parser reads the following config entries (the rest of the config is
opaque to it):
- ``upstream.dir`` — directory containing the cached YAMLs
- ``parser.catalog_files`` — ordered list of YAML filenames to load
- ``parser.family_id_title`` — suffix → canonical family title (shape 2)
- ``parser.family_id_description`` — suffix → family description (shape 2)
Upstream shapes
---------------
FINOS CCC catalogs come in two shapes:
1. ``control-families: [{title, description, controls: [...]}]``
(used by most catalogs)
2. ``controls: [{id, family: "CCC.X.Y", ...}]`` (no families wrapper; used
by ``storage/object``). The ``family`` field references a family id with
no human-readable title in the file — the title/description come from
``config.parser.family_id_title`` / ``family_id_description``.
Id rewriting rules
------------------
- **Foreign-prefix rewriting**: upstream intentionally aliases requirements
across catalogs by keeping the original prefix (e.g. ``CCC.AuditLog.CN08.AR01``
appears nested under ``CCC.Logging.CN03``). Prowler requires unique ids
within a catalog file, so we rename the AR to fit its parent control:
``CCC.Logging.CN03.AR01``. See ``rewrite_ar_id()``.
- **Genuine collision renumbering**: sometimes upstream has a real typo
where two distinct requirements share the same id (e.g.
``CCC.Core.CN14.AR02`` appears twice for 30-day and 14-day backup variants).
The second copy is renumbered to the next free AR number within the
control. See the ``seen_ids`` logic in ``emit_requirement()``.
"""
from __future__ import annotations
from pathlib import Path
import yaml
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def clean(value: str | None) -> str:
"""Trim and collapse internal whitespace/newlines into single spaces.
Upstream YAML uses ``|`` block scalars that preserve newlines; Prowler
stores descriptions as single-line text.
"""
if not value:
return ""
return " ".join(value.split())
def flatten_mappings(mappings):
"""Convert upstream ``{reference-id, entries: [{reference-id, ...}]}`` to
Prowler's ``{ReferenceId, Identifiers: [...]}``.
"""
if not mappings:
return []
out = []
for m in mappings:
ids = []
for entry in m.get("entries") or []:
eid = entry.get("reference-id")
if eid:
ids.append(eid)
out.append({"ReferenceId": m.get("reference-id", ""), "Identifiers": ids})
return out
def ar_prefix(ar_id: str) -> str:
"""Return the first three dot-segments of an AR id (the parent control).
e.g. ``CCC.Core.CN01.AR01`` -> ``CCC.Core.CN01``.
"""
return ".".join(ar_id.split(".")[:3])
def rewrite_ar_id(parent_control_id: str, original_ar_id: str, ar_index: int) -> str:
"""If an AR's id doesn't share its parent control's prefix, rename it.
Example
-------
parent ``CCC.Logging.CN03`` + AR id ``CCC.AuditLog.CN08.AR01`` with
index 0 -> ``CCC.Logging.CN03.AR01``.
"""
if ar_prefix(original_ar_id) == parent_control_id:
return original_ar_id
return f"{parent_control_id}.AR{ar_index + 1:02d}"
def emit_requirement(
control: dict,
family_name: str,
family_desc: str,
seen_ids: set[str],
requirements: list[dict],
) -> None:
"""Translate one FINOS control + its assessment-requirements into
Prowler-format requirement dicts and append them to ``requirements``.
Applies foreign-prefix rewriting and genuine-collision renumbering so
the final list is guaranteed to have unique ids.
"""
control_id = clean(control.get("id"))
control_title = clean(control.get("title"))
section = f"{control_id} {control_title}".strip()
objective = clean(control.get("objective"))
threat_mappings = flatten_mappings(control.get("threat-mappings"))
guideline_mappings = flatten_mappings(control.get("guideline-mappings"))
ars = control.get("assessment-requirements") or []
for idx, ar in enumerate(ars):
raw_id = clean(ar.get("id"))
if not raw_id:
continue
new_id = rewrite_ar_id(control_id, raw_id, idx)
# Renumber on genuine upstream collision (find next free AR number)
if new_id in seen_ids:
base = ".".join(new_id.split(".")[:-1])
n = 1
while f"{base}.AR{n:02d}" in seen_ids:
n += 1
new_id = f"{base}.AR{n:02d}"
seen_ids.add(new_id)
requirements.append(
{
"Id": new_id,
"Description": clean(ar.get("text")),
"Attributes": [
{
"FamilyName": family_name,
"FamilyDescription": family_desc,
"Section": section,
"SubSection": "",
"SubSectionObjective": objective,
"Applicability": list(ar.get("applicability") or []),
"Recommendation": clean(ar.get("recommendation")),
"SectionThreatMappings": threat_mappings,
"SectionGuidelineMappings": guideline_mappings,
}
],
"Checks": [],
}
)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
def parse_upstream(config: dict) -> list[dict]:
"""Walk upstream YAMLs and emit Prowler-format requirements.
Handles both top-level shapes (``control-families`` and ``controls``).
Ids are guaranteed unique in the returned list.
"""
upstream_dir = Path(config["upstream"]["dir"])
parser_cfg = config.get("parser") or {}
catalog_files = parser_cfg.get("catalog_files") or []
family_id_title = parser_cfg.get("family_id_title") or {}
family_id_description = parser_cfg.get("family_id_description") or {}
requirements: list[dict] = []
seen_ids: set[str] = set()
for filename in catalog_files:
path = upstream_dir / filename
if not path.exists():
# The runner handles fatal errors; a missing optional catalog
# file is surfaced as a warning via print to stderr.
import sys
print(f"warn: missing upstream file {filename}", file=sys.stderr)
continue
with open(path) as f:
doc = yaml.safe_load(f) or {}
# Shape 1: control-families wrapper
for family in doc.get("control-families") or []:
family_name = clean(family.get("title"))
family_desc = clean(family.get("description"))
for control in family.get("controls") or []:
emit_requirement(
control, family_name, family_desc, seen_ids, requirements
)
# Shape 2: top-level controls with family reference id
for control in doc.get("controls") or []:
family_ref = clean(control.get("family"))
suffix = family_ref.split(".")[-1] if family_ref else ""
family_name = family_id_title.get(suffix, suffix or "Data")
family_desc = family_id_description.get(suffix, "")
emit_requirement(
control, family_name, family_desc, seen_ids, requirements
)
return requirements

View File

@@ -1,86 +0,0 @@
#!/usr/bin/env python3
"""
Keyword/service/id lookup over a Prowler check inventory produced by
build_inventory.py.
Usage:
# Keyword AND-search across id + title + risk + description
python skills/prowler-compliance/assets/query_checks.py aws encryption transit
# Show all checks for a service
python skills/prowler-compliance/assets/query_checks.py aws --service iam
# Show full metadata for one check id
python skills/prowler-compliance/assets/query_checks.py aws --id kms_cmk_rotation_enabled
"""
from __future__ import annotations
import json
import sys
def main() -> int:
if len(sys.argv) < 3:
print(__doc__)
return 1
provider = sys.argv[1]
try:
with open(f"/tmp/checks_{provider}.json") as f:
inv = json.load(f)
except FileNotFoundError:
print(
f"No inventory for {provider}. Run build_inventory.py first.",
file=sys.stderr,
)
return 2
if sys.argv[2] == "--service":
if len(sys.argv) < 4:
print("usage: --service <service_name>")
return 1
svc = sys.argv[3]
hits = [cid for cid in sorted(inv) if inv[cid].get("service") == svc]
for cid in hits:
print(f" {cid}")
print(f" {inv[cid].get('title', '')}")
print(f"\n{len(hits)} checks in service '{svc}'")
elif sys.argv[2] == "--id":
if len(sys.argv) < 4:
print("usage: --id <check_id>")
return 1
cid = sys.argv[3]
if cid not in inv:
print(f"NOT FOUND: {cid}")
return 3
m = inv[cid]
print(f"== {cid} ==")
print(f"service : {m.get('service')}")
print(f"severity: {m.get('severity')}")
print(f"resource: {m.get('resource')}")
print(f"title : {m.get('title')}")
print(f"desc : {m.get('description', '')[:500]}")
print(f"risk : {m.get('risk', '')[:500]}")
else:
keywords = [k.lower() for k in sys.argv[2:]]
hits = 0
for cid in sorted(inv):
m = inv[cid]
blob = " ".join(
[
cid,
m.get("title", ""),
m.get("risk", ""),
m.get("description", ""),
]
).lower()
if all(k in blob for k in keywords):
hits += 1
print(f" {cid} [{m.get('service', '')}]")
print(f" {m.get('title', '')[:120]}")
print(f"\n{hits} matches for {' + '.join(keywords)}")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,515 +0,0 @@
#!/usr/bin/env python3
"""
Generic, config-driven compliance framework sync runner.
Usage:
python skills/prowler-compliance/assets/sync_framework.py \
skills/prowler-compliance/assets/configs/ccc.yaml
Pipeline:
1. Load and validate the YAML config (fail fast on missing or empty
required fields — notably ``framework.version``, which silently
breaks ``get_check_compliance()`` key construction if empty).
2. Dynamically import the parser module declared in ``parser.module``
(resolved as ``parsers.{name}`` under this script's directory).
3. Call ``parser.parse_upstream(config) -> list[dict]`` to get raw
Prowler-format requirements. The parser owns all upstream-format
quirks (foreign-prefix AR rewriting, collision renumbering, shape
handling) and MUST return ids that are unique within the returned
list.
4. **Safety net**: assert id uniqueness. The runner raises
``ValueError`` on any duplicate — it does NOT silently renumber,
because mutating a canonical upstream id (e.g. CIS ``1.1.1`` or
NIST ``AC-2(1)``) would be catastrophic.
5. Apply generic ``FamilyName`` normalization from
``post_processing.family_name_normalization`` (optional).
6. Preserve legacy ``Checks`` lists from the existing Prowler JSON
using a config-driven primary key + fallback key chain. CCC uses
``(Section, Applicability)`` as fallback; CIS would use
``(Section, Profile)``; NIST would use ``(ItemId,)``.
7. Wrap each provider's requirements in the framework metadata dict
built from the config templates.
8. Write each provider's JSON to the path resolved from
``output.path_template`` (supports ``{framework}``, ``{version}``
and ``{provider}`` placeholders).
9. Pydantic-validate the written JSON via ``Compliance.parse_file()``
and report the load counts per provider.
The runner is strictly generic — it never mentions CCC, knows nothing
about YAML shapes, and can handle any upstream-backed framework given a
parser module and a config file.
"""
from __future__ import annotations
import importlib
import json
import sys
from pathlib import Path
from typing import Any
import yaml
# Make sibling `parsers/` package importable regardless of the runner's
# invocation directory.
_SCRIPT_DIR = Path(__file__).resolve().parent
if str(_SCRIPT_DIR) not in sys.path:
sys.path.insert(0, str(_SCRIPT_DIR))
# ---------------------------------------------------------------------------
# Config loading and validation
# ---------------------------------------------------------------------------
class ConfigError(ValueError):
"""Raised when the sync config is malformed or missing required fields."""
def _require(cfg: dict, dotted_path: str) -> Any:
"""Fetch a dotted-path key from nested dicts. Raises ConfigError on
missing or empty values (empty-string, empty-list, None)."""
current: Any = cfg
parts = dotted_path.split(".")
for i, part in enumerate(parts):
if not isinstance(current, dict) or part not in current:
raise ConfigError(f"config: missing required field '{dotted_path}'")
current = current[part]
if current in ("", None, [], {}):
raise ConfigError(f"config: field '{dotted_path}' must not be empty")
return current
def load_config(path: Path) -> dict:
if not path.exists():
raise ConfigError(f"config file not found: {path}")
with open(path) as f:
cfg = yaml.safe_load(f) or {}
if not isinstance(cfg, dict):
raise ConfigError(f"config root must be a mapping, got {type(cfg).__name__}")
# Required fields — fail fast. Empty Version in particular silently
# breaks get_check_compliance() key construction.
_require(cfg, "framework.name")
_require(cfg, "framework.display_name")
_require(cfg, "framework.version")
_require(cfg, "framework.description_template")
_require(cfg, "providers")
_require(cfg, "output.path_template")
_require(cfg, "upstream.dir")
_require(cfg, "parser.module")
_require(cfg, "post_processing.check_preservation.primary_key")
providers = cfg["providers"]
if not isinstance(providers, list) or not providers:
raise ConfigError("config: 'providers' must be a non-empty list")
for idx, p in enumerate(providers):
if not isinstance(p, dict) or "key" not in p or "display" not in p:
raise ConfigError(
f"config: providers[{idx}] must have 'key' and 'display' fields"
)
return cfg
# ---------------------------------------------------------------------------
# Parser loading
# ---------------------------------------------------------------------------
def load_parser(parser_module_name: str):
try:
return importlib.import_module(f"parsers.{parser_module_name}")
except ImportError as exc:
raise ConfigError(
f"cannot import parser 'parsers.{parser_module_name}': {exc}"
) from exc
# ---------------------------------------------------------------------------
# Post-processing: id uniqueness safety net
# ---------------------------------------------------------------------------
def assert_unique_ids(requirements: list[dict]) -> None:
"""Enforce the parser contract: every requirement must have a unique Id.
The runner never renumbers silently — a duplicate is a parser bug.
"""
seen: set[str] = set()
dups: list[str] = []
for req in requirements:
rid = req.get("Id")
if not rid:
raise ValueError(f"requirement missing Id: {req}")
if rid in seen:
dups.append(rid)
seen.add(rid)
if dups:
raise ValueError(
f"parser returned duplicate requirement ids: {sorted(set(dups))}"
)
# ---------------------------------------------------------------------------
# Post-processing: FamilyName normalization
# ---------------------------------------------------------------------------
def normalize_family_names(requirements: list[dict], norm_map: dict[str, str]) -> None:
"""Apply ``Attributes[0].FamilyName`` normalization in place."""
if not norm_map:
return
for req in requirements:
for attr in req.get("Attributes") or []:
name = attr.get("FamilyName")
if name in norm_map:
attr["FamilyName"] = norm_map[name]
# ---------------------------------------------------------------------------
# Post-processing: legacy check-mapping preservation
# ---------------------------------------------------------------------------
def _freeze(value: Any) -> Any:
"""Make a value hashable for use in composite lookup keys.
Lists become frozensets (order-insensitive match). Scalars pass through.
"""
if isinstance(value, list):
return frozenset(value)
return value
def _build_fallback_key(attrs: dict, field_names: list[str]) -> tuple | None:
"""Build a composite tuple key from the given attribute field names.
Returns None if any field is missing or falsy — that key will be
skipped (the lookup table just won't have an entry for it).
"""
parts = []
for name in field_names:
if name not in attrs:
return None
value = attrs[name]
if value in ("", None, [], {}):
return None
parts.append(_freeze(value))
return tuple(parts)
def load_legacy_check_maps(
legacy_path: Path,
primary_key: str,
fallback_keys: list[list[str]],
) -> tuple[dict[str, list[str]], list[dict[tuple, list[str]]]]:
"""Read the existing Prowler JSON and build lookup tables for check
preservation.
Fails fast on ambiguous preservation keys. If two distinct legacy
requirements share the same primary value or the same fallback tuple,
merging their ``Checks`` silently would corrupt the preserved mapping
for unrelated requirements. Raises ``ValueError`` listing every
conflict so the user can either dedupe the legacy data or strengthen
``check_preservation`` in the sync config.
Returns
-------
by_primary : dict
``{primary_value: [checks]}`` — e.g. ``{ar_id: [checks]}``.
by_fallback : list[dict]
One lookup dict per entry in ``fallback_keys``. Each maps a
composite tuple key to its preserved checks list.
"""
by_primary: dict[str, list[str]] = {}
by_fallback: list[dict[tuple, list[str]]] = [{} for _ in fallback_keys]
if not legacy_path.exists():
return by_primary, by_fallback
with open(legacy_path) as f:
data = json.load(f)
# Track which legacy requirement Ids contributed to each bucket so we
# can surface ambiguity after the scan completes.
primary_sources: dict[str, list[str]] = {}
fallback_sources: list[dict[tuple, list[str]]] = [{} for _ in fallback_keys]
for req in data.get("Requirements") or []:
legacy_id = req.get("Id") or "<missing-Id>"
checks = req.get("Checks") or []
pv = req.get(primary_key)
if pv:
primary_sources.setdefault(pv, []).append(legacy_id)
bucket = by_primary.setdefault(pv, [])
for c in checks:
if c not in bucket:
bucket.append(c)
attributes = req.get("Attributes") or []
if not attributes:
continue
attrs = attributes[0]
for i, field_names in enumerate(fallback_keys):
key = _build_fallback_key(attrs, field_names)
if key is None:
continue
fallback_sources[i].setdefault(key, []).append(legacy_id)
bucket = by_fallback[i].setdefault(key, [])
for c in checks:
if c not in bucket:
bucket.append(c)
conflicts: list[str] = []
for pv, ids in primary_sources.items():
if len(ids) > 1:
conflicts.append(
f"primary_key={primary_key!r} value={pv!r} shared by {ids}"
)
for i, field_names in enumerate(fallback_keys):
for key, ids in fallback_sources[i].items():
if len(ids) > 1:
conflicts.append(
f"fallback_key={field_names} value={key!r} shared by {ids}"
)
if conflicts:
details = "\n - ".join(conflicts)
raise ValueError(
f"ambiguous preservation keys in {legacy_path} — cannot "
f"faithfully preserve Checks across distinct requirements:\n"
f" - {details}\n"
f"Fix: dedupe the legacy JSON, or strengthen "
f"'post_processing.check_preservation' in the sync config "
f"(e.g. add a more discriminating field to fallback_keys)."
)
return by_primary, by_fallback
def lookup_preserved_checks(
req: dict,
by_primary: dict,
by_fallback: list[dict],
primary_key: str,
fallback_keys: list[list[str]],
) -> list[str]:
"""Return preserved check ids for a requirement, trying the primary
key first then each fallback in order."""
pv = req.get(primary_key)
if pv and pv in by_primary:
return list(by_primary[pv])
attributes = req.get("Attributes") or []
if not attributes:
return []
attrs = attributes[0]
for i, field_names in enumerate(fallback_keys):
key = _build_fallback_key(attrs, field_names)
if key and key in by_fallback[i]:
return list(by_fallback[i][key])
return []
# ---------------------------------------------------------------------------
# Provider output assembly
# ---------------------------------------------------------------------------
def resolve_output_path(template: str, framework: dict, provider_key: str) -> Path:
return Path(
template.format(
provider=provider_key,
framework=framework["name"].lower(),
version=framework["version"],
)
)
def build_provider_json(
config: dict,
provider: dict,
base_requirements: list[dict],
) -> tuple[dict, dict[str, int]]:
"""Produce the provider-specific JSON dict ready to dump.
Returns ``(json_dict, counts)`` where ``counts`` tracks how each
requirement's checks were resolved (primary, fallback, or none).
"""
framework = config["framework"]
preservation = config["post_processing"]["check_preservation"]
primary_key = preservation["primary_key"]
fallback_keys = preservation.get("fallback_keys") or []
legacy_path = resolve_output_path(
config["output"]["path_template"], framework, provider["key"]
)
by_primary, by_fallback = load_legacy_check_maps(
legacy_path, primary_key, fallback_keys
)
counts = {"primary": 0, "fallback": 0, "none": 0}
enriched: list[dict] = []
for req in base_requirements:
# Try primary key first
pv = req.get(primary_key)
checks: list[str] = []
source = "none"
if pv and pv in by_primary:
checks = list(by_primary[pv])
source = "primary"
else:
attributes = req.get("Attributes") or []
if attributes:
attrs = attributes[0]
for i, field_names in enumerate(fallback_keys):
key = _build_fallback_key(attrs, field_names)
if key and key in by_fallback[i]:
checks = list(by_fallback[i][key])
source = "fallback"
break
counts[source] += 1
enriched.append(
{
"Id": req["Id"],
"Description": req["Description"],
# Shallow-copy attribute dicts so providers don't share refs
"Attributes": [dict(a) for a in req.get("Attributes") or []],
"Checks": checks,
}
)
description = framework["description_template"].format(
provider_display=provider["display"],
provider_key=provider["key"],
framework_name=framework["name"],
framework_display=framework["display_name"],
version=framework["version"],
)
out = {
"Framework": framework["name"],
"Version": framework["version"],
"Provider": provider["display"],
"Name": framework["display_name"],
"Description": description,
"Requirements": enriched,
}
return out, counts
# ---------------------------------------------------------------------------
# Pydantic post-validation
# ---------------------------------------------------------------------------
def pydantic_validate(json_path: Path) -> int:
"""Import Prowler lazily so the runner still works without Prowler
installed (validation step is skipped in that case)."""
try:
from prowler.lib.check.compliance_models import Compliance
except ImportError:
print(
" note: prowler package not importable — skipping Pydantic validation",
file=sys.stderr,
)
return -1
try:
parsed = Compliance.parse_file(str(json_path))
except Exception as exc:
raise RuntimeError(
f"Pydantic validation failed for {json_path}: {exc}"
) from exc
return len(parsed.Requirements)
# ---------------------------------------------------------------------------
# Driver
# ---------------------------------------------------------------------------
def main() -> int:
if len(sys.argv) != 2:
print("usage: sync_framework.py <config.yaml>", file=sys.stderr)
return 1
config_path = Path(sys.argv[1])
try:
config = load_config(config_path)
except ConfigError as exc:
print(f"config error: {exc}", file=sys.stderr)
return 2
framework_name = config["framework"]["name"]
upstream_dir = Path(config["upstream"]["dir"])
if not upstream_dir.exists():
print(
f"error: upstream cache dir {upstream_dir} not found\n"
f" hint: {config['upstream'].get('fetch_docs', '(see SKILL.md Workflow A Step 1)')}",
file=sys.stderr,
)
return 3
parser_module_name = config["parser"]["module"]
print(
f"Sync: framework={framework_name} version={config['framework']['version']} "
f"parser={parser_module_name}"
)
try:
parser = load_parser(parser_module_name)
except ConfigError as exc:
print(f"parser error: {exc}", file=sys.stderr)
return 4
print(f"Parsing upstream from {upstream_dir}...")
base_requirements = parser.parse_upstream(config)
print(f" parser returned {len(base_requirements)} requirements")
# Safety-net: parser contract
try:
assert_unique_ids(base_requirements)
except ValueError as exc:
print(f"parser contract violation: {exc}", file=sys.stderr)
return 5
# Post-processing: family name normalization
norm_map = (
config.get("post_processing", {})
.get("family_name_normalization")
or {}
)
normalize_family_names(base_requirements, norm_map)
# Per-provider output
print()
for provider in config["providers"]:
provider_json, counts = build_provider_json(
config, provider, base_requirements
)
out_path = resolve_output_path(
config["output"]["path_template"],
config["framework"],
provider["key"],
)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w") as f:
json.dump(provider_json, f, indent=2, ensure_ascii=False)
f.write("\n")
validated = pydantic_validate(out_path)
validated_msg = (
f" pydantic_reqs={validated}" if validated >= 0 else " pydantic=skipped"
)
print(
f" {provider['key']}: total={len(provider_json['Requirements'])} "
f"matched_primary={counts['primary']} "
f"matched_fallback={counts['fallback']} "
f"new_or_unmatched={counts['none']}{validated_msg}"
)
print(f" wrote {out_path}")
print("\nDone.")
return 0
if __name__ == "__main__":
sys.exit(main())