From 104a4a92c3afa7cc28ec79f00440ade414b533e7 Mon Sep 17 00:00:00 2001 From: "mintlify[bot]" <109931778+mintlify[bot]@users.noreply.github.com> Date: Wed, 4 Mar 2026 11:59:22 +0100 Subject: [PATCH] docs: Add OCSF field requirements for Prowler Cloud integration (#10245) Co-authored-by: mintlify[bot] <109931778+mintlify[bot]@users.noreply.github.com> Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com> --- docs/developer-guide/provider.mdx | 34 ++++++ scripts/validate_ocsf_output.py | 184 ++++++++++++++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100755 scripts/validate_ocsf_output.py diff --git a/docs/developer-guide/provider.mdx b/docs/developer-guide/provider.mdx index 867dff14d8..acc46102cb 100644 --- a/docs/developer-guide/provider.mdx +++ b/docs/developer-guide/provider.mdx @@ -3406,6 +3406,40 @@ Use existing providers as templates, this will help you to understand better the - **Use Rules**: Use rules to ensure the code generated by AI is following the way of working in Prowler. +--- + +## OCSF Field Requirements for Prowler Cloud Integration + +When implementing a new provider that supports the `--push-to-cloud` feature, specific OCSF fields must be correctly populated to ensure proper findings ingestion into Prowler Cloud. + +### Required OCSF Fields + +The following fields in the OCSF output are critical for successful ingestion: + +| Field | Requirement | Description | +|-------|-------------|-------------| +| `provider_uid` | Must match the UID used when registering the provider in the API | This identifier links findings to the correct provider in Prowler Cloud | +| `provider` | Must be the provider name | The name of the provider (e.g., `aws`, `azure`, `gcp`, `googleworkspace`) | +| `finding_info.uid` | Must be unique | Each finding must have a unique identifier to avoid duplicates | +| `resources.uid` | Must have a value | The resource UID cannot be empty; it identifies the specific resource being assessed | + +### Implementation Reference + +These fields are set in the OCSF output generation. See the [OCSF output implementation](https://github.com/prowler-cloud/prowler/blob/master/prowler/lib/outputs/ocsf/ocsf.py) for reference. + +### Validation Checklist + +Before releasing a new provider with `--push-to-cloud` support: + +- [ ] Verify `provider_uid` matches the UID used in the API to register the provider +- [ ] Confirm `provider` field contains the correct provider name +- [ ] Ensure all `finding_info.uid` values are unique across findings +- [ ] Validate that `resources.uid` is populated for every finding + + + Use `python scripts/validate_ocsf_output.py output/*.ocsf.json` to automate these checks. + + ## Checklist for New Providers ### CLI Integration Only diff --git a/scripts/validate_ocsf_output.py b/scripts/validate_ocsf_output.py new file mode 100755 index 0000000000..f97de65db6 --- /dev/null +++ b/scripts/validate_ocsf_output.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +""" +OCSF Output Validator + +Validates OCSF JSON output files for Prowler Cloud integration requirements: +- finding_info.uid uniqueness across all findings +- resources[*].uid populated for every resource + +Usage: + python validate_ocsf_output.py [...] + +Example: + python validate_ocsf_output.py output/*.ocsf.json +""" + +import glob +import json +import sys +from argparse import ArgumentParser +from pathlib import Path + + +def load_ocsf_file(path: str) -> list[dict]: + """Load and parse an OCSF JSON file containing an array of findings.""" + file_path = Path(path) + if not file_path.exists(): + raise FileNotFoundError(f"File not found: {path}") + if not file_path.suffix == ".json" and not path.endswith(".ocsf.json"): + raise ValueError(f"Expected .ocsf.json file, got: {path}") + + with open(file_path) as f: + data = json.load(f) + + if not isinstance(data, list): + raise ValueError(f"Expected JSON array, got {type(data).__name__}") + + return data + + +def validate_unique_finding_uids(findings: list[dict]) -> list[str]: + """Check that finding_info.uid is present and unique across all findings.""" + errors = [] + seen = {} + + for idx, finding in enumerate(findings): + finding_info = finding.get("finding_info") + if not finding_info or not isinstance(finding_info, dict): + errors.append(f"Finding [{idx}]: missing 'finding_info' object") + continue + + uid = finding_info.get("uid") + if not uid: + errors.append(f"Finding [{idx}]: missing 'finding_info.uid'") + continue + + if uid in seen: + errors.append( + f"Finding [{idx}]: duplicate 'finding_info.uid' = '{uid}' " + f"(first seen at index {seen[uid]})" + ) + else: + seen[uid] = idx + + return errors + + +def validate_resources_uid(findings: list[dict]) -> list[str]: + """Check that every resource in every finding has a non-empty uid.""" + errors = [] + + for idx, finding in enumerate(findings): + resources = finding.get("resources") + if not resources: + errors.append(f"Finding [{idx}]: missing or empty 'resources' array") + continue + + if not isinstance(resources, list): + errors.append(f"Finding [{idx}]: 'resources' is not an array") + continue + + for res_idx, resource in enumerate(resources): + uid = resource.get("uid") + if not uid or (isinstance(uid, str) and not uid.strip()): + errors.append( + f"Finding [{idx}], resource [{res_idx}]: " + f"missing or empty 'resources[].uid'" + ) + + return errors + + +def validate_file(path: str) -> dict: + """Run all validations on a single OCSF file.""" + result = {"file": path, "valid": True, "errors": [], "finding_count": 0} + + try: + findings = load_ocsf_file(path) + except (FileNotFoundError, ValueError, json.JSONDecodeError) as e: + result["valid"] = False + result["errors"].append(str(e)) + return result + + result["finding_count"] = len(findings) + + if not findings: + return result + + uid_errors = validate_unique_finding_uids(findings) + resource_errors = validate_resources_uid(findings) + + all_errors = uid_errors + resource_errors + if all_errors: + result["valid"] = False + result["errors"] = all_errors + + return result + + +def print_report(results: list[dict]): + """Print a formatted validation report.""" + print("\n" + "=" * 60) + print("OCSF OUTPUT VALIDATION REPORT") + print("=" * 60) + + total_files = len(results) + passed = sum(1 for r in results if r["valid"]) + failed = total_files - passed + total_findings = sum(r["finding_count"] for r in results) + + for result in results: + print(f"\nFile: {result['file']}") + print(f" Findings: {result['finding_count']}") + + if result["valid"]: + print(" Status: PASS") + else: + print(" Status: FAIL") + for error in result["errors"]: + print(f" [X] {error}") + + print("\n" + "-" * 60) + print(f"Files: {total_files} | Findings: {total_findings}") + print(f"Passed: {passed} | Failed: {failed}") + print("-" * 60) + + if failed == 0: + print("RESULT: PASS") + else: + print("RESULT: FAIL") + print("=" * 60 + "\n") + + +def main(): + parser = ArgumentParser( + description="Validate OCSF output files for Prowler Cloud integration" + ) + parser.add_argument( + "files", + nargs="+", + help="OCSF JSON file path(s) or glob pattern(s)", + ) + args = parser.parse_args() + + # Expand glob patterns + file_paths = [] + for pattern in args.files: + expanded = glob.glob(pattern) + if expanded: + file_paths.extend(expanded) + else: + file_paths.append(pattern) + + if not file_paths: + print("Error: No files matched the provided pattern(s).") + sys.exit(1) + + results = [validate_file(path) for path in file_paths] + print_report(results) + + sys.exit(0 if all(r["valid"] for r in results) else 1) + + +if __name__ == "__main__": + main()