mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-14 16:50:04 +00:00
Compare commits
12 Commits
aws-region
...
feat/PROWL
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ffd114f10c | ||
|
|
fad845669b | ||
|
|
b28d6a4fcc | ||
|
|
cc658fc958 | ||
|
|
86b2297a5b | ||
|
|
58e5e5bb2a | ||
|
|
8bde7b6eb9 | ||
|
|
a8991f1232 | ||
|
|
4f0894dd92 | ||
|
|
5bf816ee42 | ||
|
|
42ab40d079 | ||
|
|
2ce706e474 |
@@ -8,7 +8,7 @@ Prowler supports multiple output formats, allowing users to tailor findings pres
|
||||
|
||||
- Output Organization in Prowler
|
||||
|
||||
Prowler outputs are managed within the `/lib/outputs` directory. Each format—such as JSON, CSV, HTML—is implemented as a Python class.
|
||||
Prowler outputs are managed within the `/lib/outputs` directory. Each format—such as JSON, CSV, HTML, SARIF—is implemented as a Python class.
|
||||
|
||||
- Outputs are generated based on scan findings, which are stored as structured dictionaries containing details such as:
|
||||
|
||||
|
||||
@@ -25,7 +25,12 @@ If you prefer the former verbose output, use: `--verbose`. This allows seeing mo
|
||||
|
||||
## Report Generation
|
||||
|
||||
By default, Prowler generates CSV, JSON-OCSF, and HTML reports. To generate a JSON-ASFF report (used by AWS Security Hub), specify `-M` or `--output-modes`:
|
||||
By default, Prowler generates CSV, JSON-OCSF, and HTML reports. Additional provider-specific formats are available:
|
||||
|
||||
* **JSON-ASFF** (AWS only): Used by AWS Security Hub
|
||||
* **SARIF** (IaC only): Used by GitHub Code Scanning
|
||||
|
||||
To specify output formats, use the `-M` or `--output-modes` flag:
|
||||
|
||||
```console
|
||||
prowler <provider> -M csv json-asff json-ocsf html
|
||||
|
||||
@@ -61,6 +61,7 @@ Prowler natively supports the following reporting output formats:
|
||||
- JSON-OCSF
|
||||
- JSON-ASFF (AWS only)
|
||||
- HTML
|
||||
- SARIF (IaC only)
|
||||
|
||||
Hereunder is the structure for each of the supported report formats by Prowler:
|
||||
|
||||
@@ -368,6 +369,29 @@ Each finding is a `json` object within a list.
|
||||
The following image is an example of the HTML output:
|
||||
|
||||
<img src="/images/cli/reporting/html-output.png" />
|
||||
|
||||
### SARIF (IaC Only)
|
||||
|
||||
import { VersionBadge } from "/snippets/version-badge.mdx"
|
||||
|
||||
<VersionBadge version="5.23.0" />
|
||||
|
||||
The SARIF (Static Analysis Results Interchange Format) output generates a [SARIF 2.1.0](https://docs.oasis-open.org/sarif/sarif/v2.1.0/sarif-v2.1.0.html) document compatible with GitHub Code Scanning and other SARIF-compatible tools. This format is exclusively available for the IaC provider, as it is designed for static analysis results that reference specific files and line numbers.
|
||||
|
||||
```console
|
||||
prowler iac --scan-repository-url https://github.com/user/repo -M sarif
|
||||
```
|
||||
|
||||
<Note>
|
||||
The SARIF output format is only available when using the `iac` provider. Attempting to use it with other providers results in an error.
|
||||
</Note>
|
||||
|
||||
The SARIF output includes:
|
||||
|
||||
* **Rules:** Each unique check ID produces a rule entry with severity, description, remediation, and a markdown help panel.
|
||||
* **Results:** Only failed (non-muted) findings are included, with file paths and line numbers for precise annotation.
|
||||
* **Severity mapping:** Prowler severities map to SARIF levels (`critical`/`high` → `error`, `medium` → `warning`, `low`/`informational` → `note`).
|
||||
|
||||
## V4 Deprecations
|
||||
|
||||
Some deprecations have been made to unify formats and improve outputs.
|
||||
|
||||
@@ -29,7 +29,7 @@ Prowler IaC provider scans the following Infrastructure as Code configurations f
|
||||
- For remote repository scans, authentication can be provided via [git URL](https://git-scm.com/docs/git-clone#_git_urls), CLI flags or environment variables.
|
||||
- Check the [IaC Authentication](/user-guide/providers/iac/authentication) page for more details.
|
||||
- Mutelist logic ([filtering](https://trivy.dev/latest/docs/configuration/filtering/)) is handled by Trivy, not Prowler.
|
||||
- Results are output in the same formats as other Prowler providers (CSV, JSON, HTML, etc.).
|
||||
- Results are output in the same formats as other Prowler providers (CSV, JSON-OCSF, HTML), plus [SARIF](/user-guide/cli/tutorials/reporting#sarif-iac-only) for GitHub Code Scanning integration.
|
||||
|
||||
## Prowler App
|
||||
|
||||
@@ -140,8 +140,20 @@ prowler iac --scan-path ./my-iac-directory --exclude-path ./my-iac-directory/tes
|
||||
|
||||
### Output
|
||||
|
||||
Use the standard Prowler output options, for example:
|
||||
Use the standard Prowler output options. The IaC provider also supports [SARIF](/user-guide/cli/tutorials/reporting#sarif-iac-only) output for GitHub Code Scanning integration:
|
||||
|
||||
```sh
|
||||
prowler iac --scan-path ./iac --output-formats csv json html
|
||||
prowler iac --scan-path ./iac --output-formats csv json-ocsf html
|
||||
```
|
||||
|
||||
#### SARIF Output
|
||||
|
||||
<VersionBadge version="5.23.0" />
|
||||
|
||||
To generate SARIF output for integration with SARIF-compatible tools:
|
||||
|
||||
```sh
|
||||
prowler iac --scan-repository-url https://github.com/user/repo -M sarif
|
||||
```
|
||||
|
||||
See the [SARIF reporting documentation](/user-guide/cli/tutorials/reporting#sarif-iac-only) for details on the format and severity mapping.
|
||||
|
||||
@@ -19,6 +19,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- `entra_conditional_access_policy_device_registration_mfa_required` check and `entra_intune_enrollment_sign_in_frequency_every_time` enhancement for M365 provider [(#10222)](https://github.com/prowler-cloud/prowler/pull/10222)
|
||||
- `entra_conditional_access_policy_block_elevated_insider_risk` check for M365 provider [(#10234)](https://github.com/prowler-cloud/prowler/pull/10234)
|
||||
- `Vercel` provider support with 30 checks [(#10189)](https://github.com/prowler-cloud/prowler/pull/10189)
|
||||
- SARIF output format for the IaC provider, enabling GitHub Code Scanning integration via `--output-formats sarif` [(#10626)](https://github.com/prowler-cloud/prowler/pull/10626)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ from prowler.config.config import (
|
||||
json_asff_file_suffix,
|
||||
json_ocsf_file_suffix,
|
||||
orange_color,
|
||||
sarif_file_suffix,
|
||||
)
|
||||
from prowler.lib.banner import print_banner
|
||||
from prowler.lib.check.check import (
|
||||
@@ -69,11 +70,11 @@ from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_github import GithubCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_googleworkspace import GoogleWorkspaceCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
|
||||
GoogleWorkspaceCISASCuBA,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.compliance import display_compliance_table
|
||||
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
|
||||
@@ -122,6 +123,7 @@ from prowler.lib.outputs.html.html import HTML
|
||||
from prowler.lib.outputs.ocsf.ingestion import send_ocsf_to_api
|
||||
from prowler.lib.outputs.ocsf.ocsf import OCSF
|
||||
from prowler.lib.outputs.outputs import extract_findings_statistics, report
|
||||
from prowler.lib.outputs.sarif.sarif import SARIF
|
||||
from prowler.lib.outputs.slack.slack import Slack
|
||||
from prowler.lib.outputs.summary_table import display_summary_table
|
||||
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
|
||||
@@ -546,6 +548,13 @@ def prowler():
|
||||
html_output.batch_write_data_to_file(
|
||||
provider=global_provider, stats=stats
|
||||
)
|
||||
if mode == "sarif":
|
||||
sarif_output = SARIF(
|
||||
findings=finding_outputs,
|
||||
file_path=f"{filename}{sarif_file_suffix}",
|
||||
)
|
||||
generated_outputs["regular"].append(sarif_output)
|
||||
sarif_output.batch_write_data_to_file()
|
||||
|
||||
if getattr(args, "push_to_cloud", False):
|
||||
if not ocsf_output or not getattr(ocsf_output, "file_path", None):
|
||||
|
||||
@@ -110,6 +110,7 @@ json_file_suffix = ".json"
|
||||
json_asff_file_suffix = ".asff.json"
|
||||
json_ocsf_file_suffix = ".ocsf.json"
|
||||
html_file_suffix = ".html"
|
||||
sarif_file_suffix = ".sarif"
|
||||
default_config_file_path = (
|
||||
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
|
||||
)
|
||||
@@ -120,7 +121,7 @@ default_redteam_config_file_path = (
|
||||
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/llm_config.yaml"
|
||||
)
|
||||
encoding_format_utf_8 = "utf-8"
|
||||
available_output_formats = ["csv", "json-asff", "json-ocsf", "html"]
|
||||
available_output_formats = ["csv", "json-asff", "json-ocsf", "html", "sarif"]
|
||||
|
||||
# Prowler Cloud API settings
|
||||
cloud_api_base_url = os.getenv("PROWLER_CLOUD_API_BASE_URL", "https://api.prowler.com")
|
||||
|
||||
@@ -1094,15 +1094,10 @@ class CheckReportIAC(Check_Report):
|
||||
|
||||
self.resource = finding
|
||||
self.resource_name = file_path
|
||||
self.resource_line_range = (
|
||||
(
|
||||
str(finding.get("CauseMetadata", {}).get("StartLine", ""))
|
||||
+ ":"
|
||||
+ str(finding.get("CauseMetadata", {}).get("EndLine", ""))
|
||||
)
|
||||
if finding.get("CauseMetadata", {}).get("StartLine", "")
|
||||
else ""
|
||||
)
|
||||
cause = finding.get("CauseMetadata", {})
|
||||
start = cause.get("StartLine") or finding.get("StartLine")
|
||||
end = cause.get("EndLine") or finding.get("EndLine")
|
||||
self.resource_line_range = f"{start}:{end}" if start else ""
|
||||
|
||||
|
||||
@dataclass
|
||||
|
||||
@@ -17,6 +17,7 @@ from prowler.providers.common.arguments import (
|
||||
init_providers_parser,
|
||||
validate_asff_usage,
|
||||
validate_provider_arguments,
|
||||
validate_sarif_usage,
|
||||
)
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
|
||||
@@ -150,6 +151,12 @@ Detailed documentation at https://docs.prowler.com
|
||||
if not asff_is_valid:
|
||||
self.parser.error(asff_error)
|
||||
|
||||
sarif_is_valid, sarif_error = validate_sarif_usage(
|
||||
args.provider, getattr(args, "output_formats", None)
|
||||
)
|
||||
if not sarif_is_valid:
|
||||
self.parser.error(sarif_error)
|
||||
|
||||
return args
|
||||
|
||||
def __set_default_provider__(self, args: list) -> list:
|
||||
|
||||
@@ -354,6 +354,9 @@ class Finding(BaseModel):
|
||||
check_output, "resource_line_range", ""
|
||||
)
|
||||
output_data["framework"] = check_output.check_metadata.ServiceName
|
||||
output_data["raw"] = {
|
||||
"resource_line_range": output_data.get("resource_line_range", ""),
|
||||
}
|
||||
|
||||
elif provider.type == "llm":
|
||||
output_data["auth_method"] = provider.auth_method
|
||||
|
||||
0
prowler/lib/outputs/sarif/__init__.py
Normal file
0
prowler/lib/outputs/sarif/__init__.py
Normal file
193
prowler/lib/outputs/sarif/sarif.py
Normal file
193
prowler/lib/outputs/sarif/sarif.py
Normal file
@@ -0,0 +1,193 @@
|
||||
from json import dump
|
||||
from typing import Optional
|
||||
|
||||
from prowler.config.config import prowler_version
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.finding import Finding
|
||||
from prowler.lib.outputs.output import Output
|
||||
|
||||
SARIF_SCHEMA_URL = "https://json.schemastore.org/sarif-2.1.0.json"
|
||||
SARIF_VERSION = "2.1.0"
|
||||
|
||||
SEVERITY_TO_SARIF_LEVEL = {
|
||||
"critical": "error",
|
||||
"high": "error",
|
||||
"medium": "warning",
|
||||
"low": "note",
|
||||
"informational": "note",
|
||||
}
|
||||
|
||||
SEVERITY_TO_SECURITY_SEVERITY = {
|
||||
"critical": "9.0",
|
||||
"high": "7.0",
|
||||
"medium": "4.0",
|
||||
"low": "2.0",
|
||||
"informational": "0.0",
|
||||
}
|
||||
|
||||
|
||||
class SARIF(Output):
|
||||
"""Generates SARIF 2.1.0 output compatible with GitHub Code Scanning."""
|
||||
|
||||
def transform(self, findings: list[Finding]) -> None:
|
||||
"""Transform findings into a SARIF 2.1.0 document.
|
||||
|
||||
Only FAIL findings that are not muted are included. Each unique
|
||||
check ID produces one rule entry; multiple findings for the same
|
||||
check share the rule via ruleIndex.
|
||||
|
||||
Args:
|
||||
findings: List of Finding objects to transform.
|
||||
"""
|
||||
rules = {}
|
||||
rule_indices = {}
|
||||
results = []
|
||||
|
||||
for finding in findings:
|
||||
if finding.status != "FAIL" or finding.muted:
|
||||
continue
|
||||
|
||||
check_id = finding.metadata.CheckID
|
||||
severity = finding.metadata.Severity.lower()
|
||||
|
||||
if check_id not in rules:
|
||||
rule_indices[check_id] = len(rules)
|
||||
rule = {
|
||||
"id": check_id,
|
||||
"name": finding.metadata.CheckTitle,
|
||||
"shortDescription": {"text": finding.metadata.CheckTitle},
|
||||
"fullDescription": {
|
||||
"text": finding.metadata.Description or check_id
|
||||
},
|
||||
"help": {
|
||||
"text": finding.metadata.Remediation.Recommendation.Text
|
||||
or finding.metadata.Description
|
||||
or check_id,
|
||||
"markdown": self._build_help_markdown(finding, severity),
|
||||
},
|
||||
"defaultConfiguration": {
|
||||
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
|
||||
},
|
||||
"properties": {
|
||||
"tags": [
|
||||
"security",
|
||||
f"prowler/{finding.metadata.Provider}",
|
||||
f"severity/{severity}",
|
||||
],
|
||||
"security-severity": SEVERITY_TO_SECURITY_SEVERITY.get(
|
||||
severity, "0.0"
|
||||
),
|
||||
},
|
||||
}
|
||||
if finding.metadata.RelatedUrl:
|
||||
rule["helpUri"] = finding.metadata.RelatedUrl
|
||||
rules[check_id] = rule
|
||||
|
||||
rule_index = rule_indices[check_id]
|
||||
result = {
|
||||
"ruleId": check_id,
|
||||
"ruleIndex": rule_index,
|
||||
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
|
||||
"message": {
|
||||
"text": finding.status_extended or finding.metadata.CheckTitle
|
||||
},
|
||||
}
|
||||
|
||||
location = self._build_location(finding)
|
||||
if location is not None:
|
||||
result["locations"] = [location]
|
||||
|
||||
results.append(result)
|
||||
|
||||
sarif_document = {
|
||||
"$schema": SARIF_SCHEMA_URL,
|
||||
"version": SARIF_VERSION,
|
||||
"runs": [
|
||||
{
|
||||
"tool": {
|
||||
"driver": {
|
||||
"name": "Prowler",
|
||||
"version": prowler_version,
|
||||
"informationUri": "https://prowler.com",
|
||||
"rules": list(rules.values()),
|
||||
},
|
||||
},
|
||||
"results": results,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
self._data = [sarif_document]
|
||||
|
||||
def batch_write_data_to_file(self) -> None:
|
||||
"""Write the SARIF document to the output file as JSON."""
|
||||
try:
|
||||
if (
|
||||
getattr(self, "_file_descriptor", None)
|
||||
and not self._file_descriptor.closed
|
||||
and self._data
|
||||
):
|
||||
dump(self._data[0], self._file_descriptor, indent=2)
|
||||
if self.close_file or self._from_cli:
|
||||
self._file_descriptor.close()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_help_markdown(finding: Finding, severity: str) -> str:
|
||||
"""Build a markdown help string for a SARIF rule."""
|
||||
remediation = (
|
||||
finding.metadata.Remediation.Recommendation.Text
|
||||
or finding.metadata.Description
|
||||
or finding.metadata.CheckID
|
||||
)
|
||||
lines = [
|
||||
f"**{finding.metadata.CheckTitle}**\n",
|
||||
f"| Severity | Remediation |",
|
||||
f"| --- | --- |",
|
||||
f"| {severity.upper()} | {remediation} |",
|
||||
]
|
||||
if finding.metadata.RelatedUrl:
|
||||
lines.append(
|
||||
f"\n[More info]({finding.metadata.RelatedUrl})"
|
||||
)
|
||||
return "\n".join(lines)
|
||||
|
||||
@staticmethod
|
||||
def _build_location(finding: Finding) -> Optional[dict]:
|
||||
"""Build a SARIF physicalLocation from a Finding.
|
||||
|
||||
Uses resource_name as the artifact URI and resource_line_range
|
||||
(stored in finding.raw for IaC findings) for line range info.
|
||||
|
||||
Returns:
|
||||
A SARIF location dict, or None if resource_name is empty.
|
||||
"""
|
||||
if not finding.resource_name:
|
||||
return None
|
||||
|
||||
location = {
|
||||
"physicalLocation": {
|
||||
"artifactLocation": {
|
||||
"uri": finding.resource_name,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
line_range = finding.raw.get("resource_line_range", "")
|
||||
if line_range and ":" in line_range:
|
||||
parts = line_range.split(":")
|
||||
try:
|
||||
start_line = int(parts[0])
|
||||
end_line = int(parts[1])
|
||||
if start_line >= 1 and end_line >= 1:
|
||||
location["physicalLocation"]["region"] = {
|
||||
"startLine": start_line,
|
||||
"endLine": end_line,
|
||||
}
|
||||
except (ValueError, IndexError):
|
||||
pass # Malformed line range — skip region, keep location
|
||||
|
||||
return location
|
||||
@@ -9,6 +9,7 @@ from prowler.config.config import (
|
||||
json_asff_file_suffix,
|
||||
json_ocsf_file_suffix,
|
||||
orange_color,
|
||||
sarif_file_suffix,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.github.models import GithubAppIdentityInfo, GithubIdentityInfo
|
||||
@@ -207,6 +208,10 @@ def display_summary_table(
|
||||
print(
|
||||
f" - HTML: {output_directory}/{output_filename}{html_file_suffix}"
|
||||
)
|
||||
if "sarif" in output_options.output_modes:
|
||||
print(
|
||||
f" - SARIF: {output_directory}/{output_filename}{sarif_file_suffix}"
|
||||
)
|
||||
|
||||
else:
|
||||
print(
|
||||
|
||||
@@ -70,3 +70,19 @@ def validate_asff_usage(
|
||||
False,
|
||||
f"json-asff output format is only available for the aws provider, but {provider} was selected",
|
||||
)
|
||||
|
||||
|
||||
def validate_sarif_usage(
|
||||
provider: Optional[str], output_formats: Optional[Sequence[str]]
|
||||
) -> tuple[bool, str]:
|
||||
"""Ensure sarif output is only requested for the IaC provider."""
|
||||
if not output_formats or "sarif" not in output_formats:
|
||||
return (True, "")
|
||||
|
||||
if provider == "iac":
|
||||
return (True, "")
|
||||
|
||||
return (
|
||||
False,
|
||||
f"sarif output format is only available for the iac provider, but {provider} was selected",
|
||||
)
|
||||
|
||||
312
tests/lib/outputs/sarif/sarif_test.py
Normal file
312
tests/lib/outputs/sarif/sarif_test.py
Normal file
@@ -0,0 +1,312 @@
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.outputs.sarif.sarif import SARIF, SARIF_SCHEMA_URL, SARIF_VERSION
|
||||
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
|
||||
|
||||
|
||||
class TestSARIF:
|
||||
def test_transform_fail_finding(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="S3 bucket is not encrypted",
|
||||
severity="high",
|
||||
resource_name="main.tf",
|
||||
service_name="s3",
|
||||
check_id="s3_encryption_check",
|
||||
check_title="S3 Bucket Encryption",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
assert sarif.data[0]["$schema"] == SARIF_SCHEMA_URL
|
||||
assert sarif.data[0]["version"] == SARIF_VERSION
|
||||
assert len(sarif.data[0]["runs"]) == 1
|
||||
|
||||
run = sarif.data[0]["runs"][0]
|
||||
assert run["tool"]["driver"]["name"] == "Prowler"
|
||||
assert len(run["tool"]["driver"]["rules"]) == 1
|
||||
assert len(run["results"]) == 1
|
||||
|
||||
rule = run["tool"]["driver"]["rules"][0]
|
||||
assert rule["id"] == "s3_encryption_check"
|
||||
assert rule["shortDescription"]["text"] == "S3 Bucket Encryption"
|
||||
assert rule["defaultConfiguration"]["level"] == "error"
|
||||
assert rule["properties"]["security-severity"] == "7.0"
|
||||
|
||||
result = run["results"][0]
|
||||
assert result["ruleId"] == "s3_encryption_check"
|
||||
assert result["ruleIndex"] == 0
|
||||
assert result["level"] == "error"
|
||||
assert result["message"]["text"] == "S3 bucket is not encrypted"
|
||||
|
||||
def test_transform_pass_finding_excluded(self):
|
||||
finding = generate_finding_output(status="PASS", severity="high")
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
run = sarif.data[0]["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
|
||||
def test_transform_muted_finding_excluded(self):
|
||||
finding = generate_finding_output(status="FAIL", severity="high", muted=True)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
run = sarif.data[0]["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"severity,expected_level,expected_security_severity",
|
||||
[
|
||||
("critical", "error", "9.0"),
|
||||
("high", "error", "7.0"),
|
||||
("medium", "warning", "4.0"),
|
||||
("low", "note", "2.0"),
|
||||
("informational", "note", "0.0"),
|
||||
],
|
||||
)
|
||||
def test_transform_severity_mapping(
|
||||
self, severity, expected_level, expected_security_severity
|
||||
):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
severity=severity,
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
run = sarif.data[0]["runs"][0]
|
||||
result = run["results"][0]
|
||||
rule = run["tool"]["driver"]["rules"][0]
|
||||
|
||||
assert result["level"] == expected_level
|
||||
assert rule["defaultConfiguration"]["level"] == expected_level
|
||||
assert rule["properties"]["security-severity"] == expected_security_severity
|
||||
|
||||
def test_transform_multiple_findings_dedup_rules(self):
|
||||
findings = [
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="file1.tf",
|
||||
status_extended="Finding in file1",
|
||||
),
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="file2.tf",
|
||||
status_extended="Finding in file2",
|
||||
),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data[0]["runs"][0]
|
||||
assert len(run["tool"]["driver"]["rules"]) == 1
|
||||
assert len(run["results"]) == 2
|
||||
assert run["results"][0]["ruleIndex"] == 0
|
||||
assert run["results"][1]["ruleIndex"] == 0
|
||||
|
||||
def test_transform_multiple_different_rules(self):
|
||||
findings = [
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
service_name="alpha",
|
||||
check_id="alpha_check_one",
|
||||
status_extended="Finding A",
|
||||
),
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
service_name="beta",
|
||||
check_id="beta_check_two",
|
||||
status_extended="Finding B",
|
||||
),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data[0]["runs"][0]
|
||||
assert len(run["tool"]["driver"]["rules"]) == 2
|
||||
assert run["results"][0]["ruleIndex"] == 0
|
||||
assert run["results"][1]["ruleIndex"] == 1
|
||||
|
||||
def test_transform_location_with_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="modules/s3/main.tf",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "10:25"}
|
||||
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data[0]["runs"][0]["results"][0]
|
||||
location = result["locations"][0]["physicalLocation"]
|
||||
assert location["artifactLocation"]["uri"] == "modules/s3/main.tf"
|
||||
assert location["region"]["startLine"] == 10
|
||||
assert location["region"]["endLine"] == 25
|
||||
|
||||
def test_transform_location_without_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data[0]["runs"][0]["results"][0]
|
||||
location = result["locations"][0]["physicalLocation"]
|
||||
assert location["artifactLocation"]["uri"] == "main.tf"
|
||||
assert "region" not in location
|
||||
|
||||
def test_transform_no_resource_name(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data[0]["runs"][0]["results"][0]
|
||||
assert "locations" not in result
|
||||
|
||||
def test_batch_write_data_to_file(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="test finding",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sarif", delete=False
|
||||
) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
sarif = SARIF(
|
||||
findings=[finding],
|
||||
file_path=tmp_path,
|
||||
)
|
||||
sarif.batch_write_data_to_file()
|
||||
|
||||
with open(tmp_path) as f:
|
||||
content = json.load(f)
|
||||
|
||||
assert content["$schema"] == SARIF_SCHEMA_URL
|
||||
assert content["version"] == SARIF_VERSION
|
||||
assert len(content["runs"][0]["results"]) == 1
|
||||
|
||||
os.unlink(tmp_path)
|
||||
|
||||
def test_sarif_schema_structure(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
severity="critical",
|
||||
resource_name="infra/main.tf",
|
||||
service_name="iac",
|
||||
check_id="iac_misconfig_check",
|
||||
check_title="IaC Misconfiguration",
|
||||
description="Checks for misconfigurations",
|
||||
remediation_recommendation_text="Fix the configuration",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "5:15"}
|
||||
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
doc = sarif.data[0]
|
||||
|
||||
assert "$schema" in doc
|
||||
assert "version" in doc
|
||||
assert "runs" in doc
|
||||
|
||||
run = doc["runs"][0]
|
||||
|
||||
assert "tool" in run
|
||||
assert "driver" in run["tool"]
|
||||
driver = run["tool"]["driver"]
|
||||
assert "name" in driver
|
||||
assert "version" in driver
|
||||
assert "informationUri" in driver
|
||||
assert "rules" in driver
|
||||
|
||||
rule = driver["rules"][0]
|
||||
assert "id" in rule
|
||||
assert "shortDescription" in rule
|
||||
assert "fullDescription" in rule
|
||||
assert "help" in rule
|
||||
assert "defaultConfiguration" in rule
|
||||
assert "properties" in rule
|
||||
assert "tags" in rule["properties"]
|
||||
assert "security-severity" in rule["properties"]
|
||||
|
||||
result = run["results"][0]
|
||||
assert "ruleId" in result
|
||||
assert "ruleIndex" in result
|
||||
assert "level" in result
|
||||
assert "message" in result
|
||||
assert "locations" in result
|
||||
|
||||
loc = result["locations"][0]["physicalLocation"]
|
||||
assert "artifactLocation" in loc
|
||||
assert "uri" in loc["artifactLocation"]
|
||||
assert "region" in loc
|
||||
assert "startLine" in loc["region"]
|
||||
assert "endLine" in loc["region"]
|
||||
|
||||
def test_transform_helpuri_present_when_related_url_set(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
provider="iac",
|
||||
related_url="https://docs.example.com/check",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
rule = sarif.data[0]["runs"][0]["tool"]["driver"]["rules"][0]
|
||||
assert rule["helpUri"] == "https://docs.example.com/check"
|
||||
|
||||
def test_transform_helpuri_absent_when_related_url_empty(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
related_url="",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
rule = sarif.data[0]["runs"][0]["tool"]["driver"]["rules"][0]
|
||||
assert "helpUri" not in rule
|
||||
|
||||
def test_location_with_non_numeric_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "abc:def"}
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
location = sarif.data[0]["runs"][0]["results"][0]["locations"][0][
|
||||
"physicalLocation"
|
||||
]
|
||||
assert "region" not in location
|
||||
|
||||
def test_location_with_single_value_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "10"}
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
location = sarif.data[0]["runs"][0]["results"][0]["locations"][0][
|
||||
"physicalLocation"
|
||||
]
|
||||
assert "region" not in location
|
||||
|
||||
def test_location_with_zero_line_numbers(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "0:0"}
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
location = sarif.data[0]["runs"][0]["results"][0]["locations"][0][
|
||||
"physicalLocation"
|
||||
]
|
||||
assert "region" not in location
|
||||
|
||||
def test_only_pass_findings(self):
|
||||
findings = [
|
||||
generate_finding_output(status="PASS"),
|
||||
generate_finding_output(status="PASS"),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data[0]["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
53
tests/providers/common/arguments_test.py
Normal file
53
tests/providers/common/arguments_test.py
Normal file
@@ -0,0 +1,53 @@
|
||||
from prowler.providers.common.arguments import (
|
||||
validate_asff_usage,
|
||||
validate_sarif_usage,
|
||||
)
|
||||
|
||||
|
||||
class TestValidateAsffUsage:
|
||||
def test_asff_with_aws_provider(self):
|
||||
valid, msg = validate_asff_usage("aws", ["json-asff"])
|
||||
assert valid is True
|
||||
assert msg == ""
|
||||
|
||||
def test_asff_with_non_aws_provider(self):
|
||||
valid, msg = validate_asff_usage("gcp", ["json-asff"])
|
||||
assert valid is False
|
||||
assert "aws" in msg
|
||||
|
||||
def test_no_asff_in_formats(self):
|
||||
valid, msg = validate_asff_usage("gcp", ["csv", "html"])
|
||||
assert valid is True
|
||||
|
||||
def test_no_output_formats(self):
|
||||
valid, msg = validate_asff_usage("aws", None)
|
||||
assert valid is True
|
||||
|
||||
|
||||
class TestValidateSarifUsage:
|
||||
def test_sarif_with_iac_provider(self):
|
||||
valid, msg = validate_sarif_usage("iac", ["sarif"])
|
||||
assert valid is True
|
||||
assert msg == ""
|
||||
|
||||
def test_sarif_with_non_iac_provider(self):
|
||||
valid, msg = validate_sarif_usage("aws", ["sarif"])
|
||||
assert valid is False
|
||||
assert "iac" in msg
|
||||
|
||||
def test_sarif_with_other_provider(self):
|
||||
valid, msg = validate_sarif_usage("gcp", ["csv", "sarif"])
|
||||
assert valid is False
|
||||
assert "gcp" in msg
|
||||
|
||||
def test_no_sarif_in_formats(self):
|
||||
valid, msg = validate_sarif_usage("aws", ["csv", "html"])
|
||||
assert valid is True
|
||||
|
||||
def test_no_output_formats(self):
|
||||
valid, msg = validate_sarif_usage("iac", None)
|
||||
assert valid is True
|
||||
|
||||
def test_empty_output_formats(self):
|
||||
valid, msg = validate_sarif_usage("aws", [])
|
||||
assert valid is True
|
||||
Reference in New Issue
Block a user