Compare commits

...

3 Commits

Author SHA1 Message Date
Andoni A.
5bf816ee42 fix(sdk): handle empty message and helpUri in SARIF output
Ensure result message.text is never empty (falls back to CheckTitle)
and omit helpUri when RelatedUrl is empty to avoid SARIF validation
warnings.
2026-04-02 16:06:11 +02:00
Andoni A.
42ab40d079 fix(sdk): drop partialFingerprints from SARIF output
Let github/codeql-action/upload-sarif compute fingerprints from file
content instead of using Prowler UIDs, which conflict with GitHub's
expected format.
2026-04-02 15:48:43 +02:00
Andoni A.
2ce706e474 feat(sdk): add SARIF 2.1.0 output format for GitHub Code Scanning
Add SARIF output class that converts Prowler findings into SARIF 2.1.0
format compatible with github/codeql-action/upload-sarif. Includes
severity mapping, rule deduplication, and file location support with
line ranges for IaC findings.
2026-04-02 14:51:38 +02:00
8 changed files with 433 additions and 3 deletions

View File

@@ -18,6 +18,7 @@ from prowler.config.config import (
json_asff_file_suffix,
json_ocsf_file_suffix,
orange_color,
sarif_file_suffix,
)
from prowler.lib.banner import print_banner
from prowler.lib.check.check import (
@@ -69,11 +70,11 @@ from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
from prowler.lib.outputs.compliance.cis.cis_github import GithubCIS
from prowler.lib.outputs.compliance.cis.cis_googleworkspace import GoogleWorkspaceCIS
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
GoogleWorkspaceCISASCuBA,
)
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
@@ -122,6 +123,7 @@ from prowler.lib.outputs.html.html import HTML
from prowler.lib.outputs.ocsf.ingestion import send_ocsf_to_api
from prowler.lib.outputs.ocsf.ocsf import OCSF
from prowler.lib.outputs.outputs import extract_findings_statistics, report
from prowler.lib.outputs.sarif.sarif import SARIF
from prowler.lib.outputs.slack.slack import Slack
from prowler.lib.outputs.summary_table import display_summary_table
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
@@ -546,6 +548,13 @@ def prowler():
html_output.batch_write_data_to_file(
provider=global_provider, stats=stats
)
if mode == "sarif":
sarif_output = SARIF(
findings=finding_outputs,
file_path=f"{filename}{sarif_file_suffix}",
)
generated_outputs["regular"].append(sarif_output)
sarif_output.batch_write_data_to_file()
if getattr(args, "push_to_cloud", False):
if not ocsf_output or not getattr(ocsf_output, "file_path", None):

View File

@@ -110,6 +110,7 @@ json_file_suffix = ".json"
json_asff_file_suffix = ".asff.json"
json_ocsf_file_suffix = ".ocsf.json"
html_file_suffix = ".html"
sarif_file_suffix = ".sarif"
default_config_file_path = (
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
)
@@ -120,7 +121,7 @@ default_redteam_config_file_path = (
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/llm_config.yaml"
)
encoding_format_utf_8 = "utf-8"
available_output_formats = ["csv", "json-asff", "json-ocsf", "html"]
available_output_formats = ["csv", "json-asff", "json-ocsf", "html", "sarif"]
# Prowler Cloud API settings
cloud_api_base_url = os.getenv("PROWLER_CLOUD_API_BASE_URL", "https://api.prowler.com")

View File

@@ -354,6 +354,9 @@ class Finding(BaseModel):
check_output, "resource_line_range", ""
)
output_data["framework"] = check_output.check_metadata.ServiceName
output_data["raw"] = {
"resource_line_range": output_data.get("resource_line_range", ""),
}
elif provider.type == "llm":
output_data["auth_method"] = provider.auth_method

View File

View File

@@ -0,0 +1,155 @@
from json import dump
from typing import List
from prowler.config.config import prowler_version
from prowler.lib.logger import logger
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.output import Output
SARIF_SCHEMA_URL = "https://json.schemastore.org/sarif-2.1.0.json"
SARIF_VERSION = "2.1.0"
SEVERITY_TO_SARIF_LEVEL = {
"critical": "error",
"high": "error",
"medium": "warning",
"low": "note",
"informational": "note",
}
SEVERITY_TO_SECURITY_SEVERITY = {
"critical": "9.0",
"high": "7.0",
"medium": "4.0",
"low": "2.0",
"informational": "0.0",
}
class SARIF(Output):
"""Generates SARIF 2.1.0 output compatible with GitHub Code Scanning."""
def transform(self, findings: List[Finding]) -> None:
rules = {}
results = []
for finding in findings:
if finding.status != "FAIL":
continue
check_id = finding.metadata.CheckID
severity = finding.metadata.Severity.lower()
if check_id not in rules:
rule = {
"id": check_id,
"name": check_id,
"shortDescription": {"text": finding.metadata.CheckTitle},
"fullDescription": {
"text": finding.metadata.Description or check_id
},
"help": {
"text": finding.metadata.Remediation.Recommendation.Text
or finding.metadata.Description
or check_id,
},
"defaultConfiguration": {
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
},
"properties": {
"tags": [
"security",
f"prowler/{finding.metadata.Provider}",
f"severity/{severity}",
],
"security-severity": SEVERITY_TO_SECURITY_SEVERITY.get(
severity, "0.0"
),
},
}
if finding.metadata.RelatedUrl:
rule["helpUri"] = finding.metadata.RelatedUrl
rules[check_id] = rule
rule_index = list(rules.keys()).index(check_id)
result = {
"ruleId": check_id,
"ruleIndex": rule_index,
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
"message": {
"text": finding.status_extended or finding.metadata.CheckTitle
},
}
location = self._build_location(finding)
if location:
result["locations"] = [location]
results.append(result)
sarif_document = {
"$schema": SARIF_SCHEMA_URL,
"version": SARIF_VERSION,
"runs": [
{
"tool": {
"driver": {
"name": "Prowler",
"version": prowler_version,
"informationUri": "https://prowler.com",
"rules": list(rules.values()),
},
},
"results": results,
},
],
}
self._data = sarif_document
def batch_write_data_to_file(self) -> None:
try:
if (
getattr(self, "_file_descriptor", None)
and not self._file_descriptor.closed
and self._data
):
dump(self._data, self._file_descriptor, indent=2)
self._file_descriptor.close()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@staticmethod
def _build_location(finding: Finding) -> dict:
"""Build a SARIF physicalLocation from a Finding.
Uses resource_name as the artifact URI and resource_line_range
(stored in finding.raw for IaC findings) for region info.
"""
if not finding.resource_name:
return {}
location = {
"physicalLocation": {
"artifactLocation": {
"uri": finding.resource_name,
},
},
}
line_range = finding.raw.get("resource_line_range", "")
if line_range and ":" in line_range:
parts = line_range.split(":")
try:
start_line = int(parts[0])
end_line = int(parts[1])
location["physicalLocation"]["region"] = {
"startLine": start_line,
"endLine": end_line,
}
except (ValueError, IndexError):
pass
return location

View File

@@ -9,6 +9,7 @@ from prowler.config.config import (
json_asff_file_suffix,
json_ocsf_file_suffix,
orange_color,
sarif_file_suffix,
)
from prowler.lib.logger import logger
from prowler.providers.github.models import GithubAppIdentityInfo, GithubIdentityInfo
@@ -207,6 +208,10 @@ def display_summary_table(
print(
f" - HTML: {output_directory}/{output_filename}{html_file_suffix}"
)
if "sarif" in output_options.output_modes:
print(
f" - SARIF: {output_directory}/{output_filename}{sarif_file_suffix}"
)
else:
print(

View File

View File

@@ -0,0 +1,257 @@
import json
import tempfile
import pytest
from prowler.lib.outputs.sarif.sarif import SARIF, SARIF_SCHEMA_URL, SARIF_VERSION
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
class TestSARIF:
def test_transform_fail_finding(self):
finding = generate_finding_output(
status="FAIL",
status_extended="S3 bucket is not encrypted",
severity="high",
resource_name="main.tf",
service_name="s3",
check_id="s3_encryption_check",
check_title="S3 Bucket Encryption",
)
sarif = SARIF(findings=[finding], file_path=None)
assert sarif.data["$schema"] == SARIF_SCHEMA_URL
assert sarif.data["version"] == SARIF_VERSION
assert len(sarif.data["runs"]) == 1
run = sarif.data["runs"][0]
assert run["tool"]["driver"]["name"] == "Prowler"
assert len(run["tool"]["driver"]["rules"]) == 1
assert len(run["results"]) == 1
rule = run["tool"]["driver"]["rules"][0]
assert rule["id"] == "s3_encryption_check"
assert rule["shortDescription"]["text"] == "S3 Bucket Encryption"
assert rule["defaultConfiguration"]["level"] == "error"
assert rule["properties"]["security-severity"] == "7.0"
result = run["results"][0]
assert result["ruleId"] == "s3_encryption_check"
assert result["ruleIndex"] == 0
assert result["level"] == "error"
assert result["message"]["text"] == "S3 bucket is not encrypted"
def test_transform_pass_finding_excluded(self):
finding = generate_finding_output(status="PASS", severity="high")
sarif = SARIF(findings=[finding], file_path=None)
run = sarif.data["runs"][0]
assert len(run["results"]) == 0
assert len(run["tool"]["driver"]["rules"]) == 0
def test_transform_muted_finding_excluded(self):
finding = generate_finding_output(status="FAIL", severity="high", muted=True)
sarif = SARIF(findings=[finding], file_path=None)
run = sarif.data["runs"][0]
assert len(run["results"]) == 1
@pytest.mark.parametrize(
"severity,expected_level,expected_security_severity",
[
("critical", "error", "9.0"),
("high", "error", "7.0"),
("medium", "warning", "4.0"),
("low", "note", "2.0"),
("informational", "note", "0.0"),
],
)
def test_transform_severity_mapping(
self, severity, expected_level, expected_security_severity
):
finding = generate_finding_output(
status="FAIL",
severity=severity,
)
sarif = SARIF(findings=[finding], file_path=None)
run = sarif.data["runs"][0]
result = run["results"][0]
rule = run["tool"]["driver"]["rules"][0]
assert result["level"] == expected_level
assert rule["defaultConfiguration"]["level"] == expected_level
assert rule["properties"]["security-severity"] == expected_security_severity
def test_transform_multiple_findings_dedup_rules(self):
findings = [
generate_finding_output(
status="FAIL",
resource_name="file1.tf",
status_extended="Finding in file1",
),
generate_finding_output(
status="FAIL",
resource_name="file2.tf",
status_extended="Finding in file2",
),
]
sarif = SARIF(findings=findings, file_path=None)
run = sarif.data["runs"][0]
assert len(run["tool"]["driver"]["rules"]) == 1
assert len(run["results"]) == 2
assert run["results"][0]["ruleIndex"] == 0
assert run["results"][1]["ruleIndex"] == 0
def test_transform_multiple_different_rules(self):
findings = [
generate_finding_output(
status="FAIL",
service_name="alpha",
check_id="alpha_check_one",
status_extended="Finding A",
),
generate_finding_output(
status="FAIL",
service_name="beta",
check_id="beta_check_two",
status_extended="Finding B",
),
]
sarif = SARIF(findings=findings, file_path=None)
run = sarif.data["runs"][0]
assert len(run["tool"]["driver"]["rules"]) == 2
assert run["results"][0]["ruleIndex"] == 0
assert run["results"][1]["ruleIndex"] == 1
def test_transform_location_with_line_range(self):
finding = generate_finding_output(
status="FAIL",
resource_name="modules/s3/main.tf",
)
finding.raw = {"resource_line_range": "10:25"}
sarif = SARIF(findings=[finding], file_path=None)
result = sarif.data["runs"][0]["results"][0]
location = result["locations"][0]["physicalLocation"]
assert location["artifactLocation"]["uri"] == "modules/s3/main.tf"
assert location["region"]["startLine"] == 10
assert location["region"]["endLine"] == 25
def test_transform_location_without_line_range(self):
finding = generate_finding_output(
status="FAIL",
resource_name="main.tf",
)
sarif = SARIF(findings=[finding], file_path=None)
result = sarif.data["runs"][0]["results"][0]
location = result["locations"][0]["physicalLocation"]
assert location["artifactLocation"]["uri"] == "main.tf"
assert "region" not in location
def test_transform_no_resource_name(self):
finding = generate_finding_output(
status="FAIL",
resource_name="",
)
sarif = SARIF(findings=[finding], file_path=None)
result = sarif.data["runs"][0]["results"][0]
assert "locations" not in result
def test_batch_write_data_to_file(self):
finding = generate_finding_output(
status="FAIL",
status_extended="test finding",
resource_name="main.tf",
)
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sarif", delete=False
) as tmp:
tmp_path = tmp.name
sarif = SARIF(
findings=[finding],
file_path=tmp_path,
)
sarif.batch_write_data_to_file()
with open(tmp_path) as f:
content = json.load(f)
assert content["$schema"] == SARIF_SCHEMA_URL
assert content["version"] == SARIF_VERSION
assert len(content["runs"][0]["results"]) == 1
def test_sarif_schema_structure(self):
finding = generate_finding_output(
status="FAIL",
severity="critical",
resource_name="infra/main.tf",
service_name="iac",
check_id="iac_misconfig_check",
check_title="IaC Misconfiguration",
description="Checks for misconfigurations",
remediation_recommendation_text="Fix the configuration",
)
finding.raw = {"resource_line_range": "5:15"}
sarif = SARIF(findings=[finding], file_path=None)
doc = sarif.data
assert "$schema" in doc
assert "version" in doc
assert "runs" in doc
run = doc["runs"][0]
assert "tool" in run
assert "driver" in run["tool"]
driver = run["tool"]["driver"]
assert "name" in driver
assert "version" in driver
assert "informationUri" in driver
assert "rules" in driver
rule = driver["rules"][0]
assert "id" in rule
assert "shortDescription" in rule
assert "fullDescription" in rule
assert "help" in rule
assert "defaultConfiguration" in rule
assert "properties" in rule
assert "tags" in rule["properties"]
assert "security-severity" in rule["properties"]
result = run["results"][0]
assert "ruleId" in result
assert "ruleIndex" in result
assert "level" in result
assert "message" in result
assert "locations" in result
loc = result["locations"][0]["physicalLocation"]
assert "artifactLocation" in loc
assert "uri" in loc["artifactLocation"]
assert "region" in loc
assert "startLine" in loc["region"]
assert "endLine" in loc["region"]
def test_empty_findings(self):
sarif = SARIF(findings=[], file_path=None)
assert sarif.data == []
def test_only_pass_findings(self):
findings = [
generate_finding_output(status="PASS"),
generate_finding_output(status="PASS"),
]
sarif = SARIF(findings=findings, file_path=None)
run = sarif.data["runs"][0]
assert len(run["results"]) == 0
assert len(run["tool"]["driver"]["rules"]) == 0