chore(iac): change engine to trivy (#8466)

Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Sergio Garcia
2025-08-22 10:17:51 +02:00
committed by GitHub
parent cefa708322
commit c3a2d79234
16 changed files with 670 additions and 457 deletions

View File

@@ -661,7 +661,7 @@ class TestFinding:
check_output.file_path = "/path/to/iac/file.tf"
check_output.resource_name = "aws_s3_bucket.example"
check_output.resource_path = "/path/to/iac/file.tf"
check_output.file_line_range = [1, 5]
check_output.resource_line_range = "1:5"
check_output.resource = {
"resource": "aws_s3_bucket.example",
"value": {},
@@ -685,7 +685,7 @@ class TestFinding:
assert finding_output.auth_method == "No auth"
assert finding_output.resource_name == "aws_s3_bucket.example"
assert finding_output.resource_uid == "aws_s3_bucket.example"
assert finding_output.region == "/path/to/iac/file.tf"
assert finding_output.region == "1:5"
assert finding_output.status == Status.PASS
assert finding_output.status_extended == "mock_status_extended"
assert finding_output.muted is False

View File

@@ -1,167 +1,290 @@
# IAC Provider Constants
DEFAULT_SCAN_PATH = "."
# Sample Checkov Output
SAMPLE_CHECKOV_OUTPUT = [
{
"check_type": "terraform",
"results": {
"failed_checks": [
# Sample Trivy Output
SAMPLE_TRIVY_OUTPUT = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [
{
"check_id": "CKV_AWS_1",
"check_name": "Ensure S3 bucket has encryption enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled",
"severity": "low",
"ID": "AVD-AWS-0001",
"Title": "S3 bucket should have encryption enabled",
"Description": "S3 bucket should have encryption enabled",
"Message": "S3 bucket should have encryption enabled",
"Resolution": "Enable encryption on the S3 bucket",
"Severity": "LOW",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0001",
"RuleID": "AVD-AWS-0001",
},
{
"check_id": "CKV_AWS_2",
"check_name": "Ensure S3 bucket has public access blocked",
"guideline": "https://docs.bridgecrew.io/docs/s3_2-s3-bucket-has-public-access-blocked",
"severity": "low",
"ID": "AVD-AWS-0002",
"Title": "S3 bucket should have public access blocked",
"Description": "S3 bucket should have public access blocked",
"Message": "S3 bucket should have public access blocked",
"Resolution": "Block public access on the S3 bucket",
"Severity": "LOW",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0002",
"RuleID": "AVD-AWS-0002",
},
],
"passed_checks": [
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [
{
"check_id": "CKV_AWS_3",
"check_name": "Ensure S3 bucket has versioning enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled",
"severity": "low",
"ID": "AVD-AWS-0003",
"Title": "S3 bucket should have versioning enabled",
"Description": "S3 bucket should have versioning enabled",
"Message": "S3 bucket should have versioning enabled",
"Resolution": "Enable versioning on the S3 bucket",
"Severity": "LOW",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0003",
"RuleID": "AVD-AWS-0003",
}
],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
}
]
]
}
# Sample Finding Data
SAMPLE_FINDING = SAMPLE_CHECKOV_OUTPUT[0]
SAMPLE_FINDING = SAMPLE_TRIVY_OUTPUT["Results"][0]
SAMPLE_FAILED_CHECK = {
"check_id": "CKV_AWS_1",
"check_name": "Ensure S3 bucket has encryption enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled",
"severity": "low",
"ID": "AVD-AWS-0001",
"Title": "S3 bucket should have encryption enabled",
"Description": "S3 bucket should have encryption enabled",
"Message": "S3 bucket should have encryption enabled",
"Resolution": "Enable encryption on the S3 bucket",
"Severity": "low",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0001",
"RuleID": "AVD-AWS-0001",
}
SAMPLE_PASSED_CHECK = {
"check_id": "CKV_AWS_3",
"check_name": "Ensure S3 bucket has versioning enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled",
"severity": "low",
"ID": "AVD-AWS-0003",
"Title": "S3 bucket should have versioning enabled",
"Description": "S3 bucket should have versioning enabled",
"Message": "S3 bucket should have versioning enabled",
"Resolution": "Enable versioning on the S3 bucket",
"Severity": "low",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0003",
"RuleID": "AVD-AWS-0003",
}
# Additional sample checks
SAMPLE_ANOTHER_FAILED_CHECK = {
"check_id": "CKV_AWS_4",
"check_name": "Ensure S3 bucket has logging enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_4-s3-bucket-has-logging-enabled",
"severity": "medium",
"ID": "AVD-AWS-0004",
"Title": "S3 bucket should have logging enabled",
"Description": "S3 bucket should have logging enabled",
"Message": "S3 bucket should have logging enabled",
"Resolution": "Enable logging on the S3 bucket",
"Severity": "medium",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0004",
"RuleID": "AVD-AWS-0004",
}
SAMPLE_ANOTHER_PASSED_CHECK = {
"check_id": "CKV_AWS_5",
"check_name": "Ensure S3 bucket has lifecycle policy",
"guideline": "https://docs.bridgecrew.io/docs/s3_5-s3-bucket-has-lifecycle-policy",
"severity": "low",
"ID": "AVD-AWS-0005",
"Title": "S3 bucket should have lifecycle policy",
"Description": "S3 bucket should have lifecycle policy",
"Message": "S3 bucket should have lifecycle policy",
"Resolution": "Configure lifecycle policy on the S3 bucket",
"Severity": "low",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0005",
"RuleID": "AVD-AWS-0005",
}
SAMPLE_ANOTHER_SKIPPED_CHECK = {
"check_id": "CKV_AWS_6",
"check_name": "Ensure S3 bucket has object lock enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_6-s3-bucket-has-object-lock-enabled",
"severity": "high",
"suppress_comment": "Not applicable for this use case",
"ID": "AVD-AWS-0006",
"Title": "S3 bucket should have object lock enabled",
"Description": "S3 bucket should have object lock enabled",
"Message": "S3 bucket should have object lock enabled",
"Resolution": "Enable object lock on the S3 bucket",
"Severity": "high",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0006",
"RuleID": "AVD-AWS-0006",
"Status": "MUTED",
}
SAMPLE_SKIPPED_CHECK = {
"check_id": "CKV_AWS_7",
"check_name": "Ensure S3 bucket has server-side encryption",
"guideline": "https://docs.bridgecrew.io/docs/s3_7-s3-bucket-has-server-side-encryption",
"severity": "medium",
"suppress_comment": "Legacy bucket, will be migrated",
"ID": "AVD-AWS-0007",
"Title": "S3 bucket should have server-side encryption",
"Description": "S3 bucket should have server-side encryption",
"Message": "S3 bucket should have server-side encryption",
"Resolution": "Enable server-side encryption on the S3 bucket",
"Severity": "medium",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0007",
"RuleID": "AVD-AWS-0007",
"Status": "MUTED",
}
SAMPLE_HIGH_SEVERITY_CHECK = {
"check_id": "CKV_AWS_8",
"check_name": "Ensure S3 bucket has public access blocked",
"guideline": "https://docs.bridgecrew.io/docs/s3_8-s3-bucket-has-public-access-blocked",
"severity": "high",
"ID": "AVD-AWS-0008",
"Title": "S3 bucket should have public access blocked",
"Description": "S3 bucket should have public access blocked",
"Message": "S3 bucket should have public access blocked",
"Resolution": "Block public access on the S3 bucket",
"Severity": "high",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0008",
"RuleID": "AVD-AWS-0008",
}
# Dockerfile samples
SAMPLE_DOCKERFILE_REPORT = {
"check_type": "dockerfile",
"results": {
"failed_checks": [
{
"check_id": "CKV_DOCKER_1",
"check_name": "Ensure base image is not using latest tag",
"guideline": "https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag",
"severity": "medium",
}
],
"passed_checks": [],
},
"Target": "Dockerfile",
"Type": "dockerfile",
"Misconfigurations": [
{
"ID": "AVD-DOCKER-0001",
"Title": "Base image should not use latest tag",
"Description": "Base image should not use latest tag",
"Message": "Base image should not use latest tag",
"Resolution": "Use a specific version tag instead of latest",
"Severity": "medium",
"PrimaryURL": "https://avd.aquasec.com/misconfig/docker/dockerfile/avd-docker-0001",
"RuleID": "AVD-DOCKER-0001",
}
],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
SAMPLE_DOCKERFILE_CHECK = {
"check_id": "CKV_DOCKER_1",
"check_name": "Ensure base image is not using latest tag",
"guideline": "https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag",
"severity": "medium",
"ID": "AVD-DOCKER-0001",
"Title": "Base image should not use latest tag",
"Description": "Base image should not use latest tag",
"Message": "Base image should not use latest tag",
"Resolution": "Use a specific version tag instead of latest",
"Severity": "medium",
"PrimaryURL": "https://avd.aquasec.com/misconfig/docker/dockerfile/avd-docker-0001",
"RuleID": "AVD-DOCKER-0001",
}
# YAML samples
SAMPLE_YAML_REPORT = {
"check_type": "yaml",
"results": {
"failed_checks": [
{
"check_id": "CKV_K8S_1",
"check_name": "Ensure API server is not exposed",
"guideline": "https://docs.bridgecrew.io/docs/k8s_1-api-server-not-exposed",
"severity": "high",
}
],
"passed_checks": [],
},
"Target": "deployment.yaml",
"Type": "kubernetes",
"Misconfigurations": [
{
"ID": "AVD-K8S-0001",
"Title": "API server should not be exposed",
"Description": "API server should not be exposed",
"Message": "API server should not be exposed",
"Resolution": "Do not expose the API server",
"Severity": "high",
"PrimaryURL": "https://avd.aquasec.com/misconfig/kubernetes/avd-k8s-0001",
"RuleID": "AVD-K8S-0001",
}
],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
SAMPLE_YAML_CHECK = {
"check_id": "CKV_K8S_1",
"check_name": "Ensure API server is not exposed",
"guideline": "https://docs.bridgecrew.io/docs/k8s_1-api-server-not-exposed",
"severity": "high",
"ID": "AVD-K8S-0001",
"Title": "API server should not be exposed",
"Description": "API server should not be exposed",
"Message": "API server should not be exposed",
"Resolution": "Do not expose the API server",
"Severity": "high",
"PrimaryURL": "https://avd.aquasec.com/misconfig/kubernetes/avd-k8s-0001",
"RuleID": "AVD-K8S-0001",
}
# CloudFormation samples
SAMPLE_CLOUDFORMATION_CHECK = {
"check_id": "CKV_AWS_9",
"check_name": "Ensure CloudFormation stack has drift detection enabled",
"guideline": "https://docs.bridgecrew.io/docs/aws_9-cloudformation-stack-has-drift-detection-enabled",
"severity": "low",
"ID": "AVD-AWS-0009",
"Title": "CloudFormation stack should have drift detection enabled",
"Description": "CloudFormation stack should have drift detection enabled",
"Message": "CloudFormation stack should have drift detection enabled",
"Resolution": "Enable drift detection on the CloudFormation stack",
"Severity": "low",
"PrimaryURL": "https://avd.aquasec.com/misconfig/aws/cloudformation/avd-aws-0009",
"RuleID": "AVD-AWS-0009",
}
# Kubernetes samples
SAMPLE_KUBERNETES_CHECK = {
"check_id": "CKV_K8S_2",
"check_name": "Ensure RBAC is enabled",
"guideline": "https://docs.bridgecrew.io/docs/k8s_2-rbac-enabled",
"severity": "medium",
"ID": "AVD-K8S-0002",
"Title": "RBAC should be enabled",
"Description": "RBAC should be enabled",
"Message": "RBAC should be enabled",
"Resolution": "Enable RBAC on the cluster",
"Severity": "medium",
"PrimaryURL": "https://avd.aquasec.com/misconfig/kubernetes/avd-k8s-0002",
"RuleID": "AVD-K8S-0002",
}
# Sample Trivy output with vulnerabilities
SAMPLE_TRIVY_VULNERABILITY_OUTPUT = {
"Results": [
{
"Target": "package.json",
"Type": "nodejs",
"Misconfigurations": [],
"Vulnerabilities": [
{
"VulnerabilityID": "CVE-2023-1234",
"Title": "Example vulnerability",
"Description": "This is an example vulnerability",
"Severity": "high",
"PrimaryURL": "https://example.com/cve-2023-1234",
}
],
"Secrets": [],
"Licenses": [],
}
]
}
# Sample Trivy output with secrets
SAMPLE_TRIVY_SECRET_OUTPUT = {
"Results": [
{
"Target": "config.yaml",
"Class": "secret",
"Misconfigurations": [],
"Vulnerabilities": [],
"Secrets": [
{
"ID": "aws-access-key-id",
"Title": "AWS Access Key ID",
"Description": "AWS Access Key ID found in configuration",
"Severity": "critical",
"PrimaryURL": "https://example.com/secret-aws-access-key-id",
}
],
"Licenses": [],
}
]
}
def get_sample_checkov_json_output():
"""Return sample Checkov JSON output as string"""
def get_sample_trivy_json_output():
"""Return sample Trivy JSON output as string"""
import json
return json.dumps(SAMPLE_CHECKOV_OUTPUT)
return json.dumps(SAMPLE_TRIVY_OUTPUT)
def get_empty_checkov_output():
"""Return empty Checkov output as string"""
return "[]"
def get_empty_trivy_output():
"""Return empty Trivy output as string"""
import json
return json.dumps({"Results": []})
def get_invalid_checkov_output():
def get_invalid_trivy_output():
"""Return invalid JSON output as string"""
return "invalid json output"

View File

@@ -15,18 +15,15 @@ from tests.providers.iac.iac_fixtures import (
SAMPLE_ANOTHER_SKIPPED_CHECK,
SAMPLE_CLOUDFORMATION_CHECK,
SAMPLE_DOCKERFILE_CHECK,
SAMPLE_DOCKERFILE_REPORT,
SAMPLE_FAILED_CHECK,
SAMPLE_FINDING,
SAMPLE_HIGH_SEVERITY_CHECK,
SAMPLE_KUBERNETES_CHECK,
SAMPLE_PASSED_CHECK,
SAMPLE_SKIPPED_CHECK,
SAMPLE_YAML_CHECK,
SAMPLE_YAML_REPORT,
get_empty_checkov_output,
get_invalid_checkov_output,
get_sample_checkov_json_output,
get_empty_trivy_output,
get_invalid_trivy_output,
get_sample_trivy_json_output,
)
@@ -51,73 +48,85 @@ class TestIacProvider:
assert provider._type == "iac"
assert provider.scan_path == custom_path
def test_iac_provider_process_check_failed(self):
"""Test processing a failed check"""
def test_iac_provider_process_finding_failed(self):
"""Test processing a failed finding"""
provider = IacProvider()
report = provider._process_check(SAMPLE_FINDING, SAMPLE_FAILED_CHECK, "FAIL")
report = provider._process_finding(SAMPLE_FAILED_CHECK, "main.tf", "terraform")
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.Provider == "iac"
assert report.check_metadata.CheckID == SAMPLE_FAILED_CHECK["check_id"]
assert report.check_metadata.CheckTitle == SAMPLE_FAILED_CHECK["check_name"]
assert report.check_metadata.CheckID == SAMPLE_FAILED_CHECK["ID"]
assert report.check_metadata.CheckTitle == SAMPLE_FAILED_CHECK["Title"]
assert report.check_metadata.Severity == "low"
assert report.check_metadata.RelatedUrl == SAMPLE_FAILED_CHECK["guideline"]
assert report.check_metadata.RelatedUrl == SAMPLE_FAILED_CHECK["PrimaryURL"]
def test_iac_provider_process_check_passed(self):
"""Test processing a passed check"""
def test_iac_provider_process_finding_passed(self):
"""Test processing a passed finding"""
provider = IacProvider()
report = provider._process_check(SAMPLE_FINDING, SAMPLE_PASSED_CHECK, "PASS")
report = provider._process_finding(SAMPLE_PASSED_CHECK, "main.tf", "terraform")
assert isinstance(report, CheckReportIAC)
assert report.status == "PASS"
assert report.status == "FAIL" # Trivy findings are always FAIL by default
assert report.check_metadata.Provider == "iac"
assert report.check_metadata.CheckID == SAMPLE_PASSED_CHECK["check_id"]
assert report.check_metadata.CheckTitle == SAMPLE_PASSED_CHECK["check_name"]
assert report.check_metadata.CheckID == SAMPLE_PASSED_CHECK["ID"]
assert report.check_metadata.CheckTitle == SAMPLE_PASSED_CHECK["Title"]
assert report.check_metadata.Severity == "low"
@patch("subprocess.run")
def test_iac_provider_run_scan_success(self, mock_subprocess):
"""Test successful IAC scan with Checkov"""
"""Test successful IAC scan with Trivy"""
provider = IacProvider()
mock_subprocess.return_value = MagicMock(
stdout=get_sample_checkov_json_output(), stderr=""
stdout=get_sample_trivy_json_output(), stderr=""
)
reports = provider.run_scan("/test/directory", ["all"], [])
reports = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Should have 2 failed checks + 1 passed check = 3 total reports
# Should have 3 misconfigurations from the sample output
assert len(reports) == 3
# Check that we have both failed and passed reports
# Check that we have failed reports (Trivy findings are always FAIL by default)
failed_reports = [r for r in reports if r.status == "FAIL"]
passed_reports = [r for r in reports if r.status == "PASS"]
assert len(failed_reports) == 2
assert len(passed_reports) == 1
assert len(failed_reports) == 3
# Verify subprocess was called correctly
mock_subprocess.assert_called_once_with(
["checkov", "-d", "/test/directory", "-o", "json", "-f", "all"],
[
"trivy",
"fs",
"/test/directory",
"--format",
"json",
"--scanners",
"vuln,misconfig,secret",
"--parallel",
"0",
"--include-non-failures",
],
capture_output=True,
text=True,
)
@patch("subprocess.run")
def test_iac_provider_run_scan_empty_output(self, mock_subprocess):
"""Test IAC scan with empty Checkov output"""
"""Test IAC scan with empty Trivy output"""
provider = IacProvider()
mock_subprocess.return_value = MagicMock(
stdout=get_empty_checkov_output(), stderr=""
stdout=get_empty_trivy_output(), stderr=""
)
reports = provider.run_scan("/test/directory", ["all"], [])
reports = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
assert len(reports) == 0
def test_provider_run_local_scan(self):
@@ -127,7 +136,9 @@ class TestIacProvider:
"prowler.providers.iac.iac_provider.IacProvider.run_scan",
) as mock_run_scan:
provider.run()
mock_run_scan.assert_called_with(scan_path, ["all"], [])
mock_run_scan.assert_called_with(
scan_path, ["vuln", "misconfig", "secret"], []
)
@mock.patch.dict(os.environ, {}, clear=True)
def test_provider_run_remote_scan(self):
@@ -145,7 +156,9 @@ class TestIacProvider:
):
provider.run()
mock_clone.assert_called_with(scan_repository_url, None, None, None)
mock_run_scan.assert_called_with(temp_dir, ["all"], [])
mock_run_scan.assert_called_with(
temp_dir, ["vuln", "misconfig", "secret"], []
)
@mock.patch.dict(os.environ, {}, clear=True)
def test_print_credentials_local(self):
@@ -183,7 +196,7 @@ class TestIacProvider:
provider = IacProvider()
mock_subprocess.return_value = MagicMock(
stdout=get_invalid_checkov_output(), stderr=""
stdout=get_invalid_trivy_output(), stderr=""
)
with pytest.raises(SystemExit) as excinfo:
@@ -193,92 +206,102 @@ class TestIacProvider:
@patch("subprocess.run")
def test_iac_provider_run_scan_null_output(self, mock_subprocess):
"""Test IAC scan with null Checkov output"""
"""Test IAC scan with null Trivy output"""
provider = IacProvider()
mock_subprocess.return_value = MagicMock(stdout="null", stderr="")
reports = provider.run_scan("/test/directory", ["all"], [])
assert len(reports) == 0
with pytest.raises(SystemExit) as exc_info:
provider.run_scan("/test/directory", ["vuln", "misconfig", "secret"], [])
assert exc_info.value.code == 1
def test_iac_provider_process_check_dockerfile(self):
"""Test processing a Dockerfile check"""
def test_iac_provider_process_finding_dockerfile(self):
"""Test processing a Dockerfile finding"""
provider = IacProvider()
report = provider._process_check(
SAMPLE_DOCKERFILE_REPORT, SAMPLE_DOCKERFILE_CHECK, "FAIL"
report = provider._process_finding(
SAMPLE_DOCKERFILE_CHECK, "Dockerfile", "dockerfile"
)
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.ServiceName == "dockerfile"
assert report.check_metadata.CheckID == SAMPLE_DOCKERFILE_CHECK["check_id"]
assert report.check_metadata.CheckID == SAMPLE_DOCKERFILE_CHECK["ID"]
def test_iac_provider_process_check_yaml(self):
"""Test processing a YAML check"""
def test_iac_provider_process_finding_yaml(self):
"""Test processing a YAML finding"""
provider = IacProvider()
report = provider._process_check(SAMPLE_YAML_REPORT, SAMPLE_YAML_CHECK, "PASS")
report = provider._process_finding(
SAMPLE_YAML_CHECK, "deployment.yaml", "kubernetes"
)
assert isinstance(report, CheckReportIAC)
assert report.status == "PASS"
assert report.check_metadata.ServiceName == "yaml"
assert report.check_metadata.CheckID == SAMPLE_YAML_CHECK["check_id"]
assert report.status == "FAIL" # Trivy findings are always FAIL by default
assert report.check_metadata.ServiceName == "kubernetes"
assert report.check_metadata.CheckID == SAMPLE_YAML_CHECK["ID"]
@patch("subprocess.run")
def test_run_scan_success_with_failed_and_passed_checks(self, mock_subprocess):
"""Test successful run_scan with both failed and passed checks"""
provider = IacProvider()
# Create sample output with both failed and passed checks
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
}
]
# Create sample Trivy output with both failed and passed checks
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [SAMPLE_FAILED_CHECK, SAMPLE_PASSED_CHECK],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["terraform"], [])
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Verify results
assert len(result) == 2
assert all(isinstance(report, CheckReportIAC) for report in result)
# Check that we have one FAIL and one PASS report
# Check that we have FAIL reports (Trivy findings are always FAIL by default)
statuses = [report.status for report in result]
assert "FAIL" in statuses
assert "PASS" in statuses
assert all(status == "FAIL" for status in statuses)
@patch("subprocess.run")
def test_run_scan_with_skipped_checks(self, mock_subprocess):
"""Test run_scan with skipped checks (muted)"""
provider = IacProvider()
# Create sample output with skipped checks
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [],
"passed_checks": [],
"skipped_checks": [SAMPLE_SKIPPED_CHECK],
},
}
]
# Create sample Trivy output with skipped checks
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [SAMPLE_SKIPPED_CHECK],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["all"], ["exclude/path"])
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], ["exclude/path"]
)
# Verify results
assert len(result) == 1
@@ -291,9 +314,13 @@ class TestIacProvider:
"""Test run_scan with no findings"""
provider = IacProvider()
mock_subprocess.return_value = MagicMock(stdout="[]", stderr="")
mock_subprocess.return_value = MagicMock(
stdout=json.dumps({"Results": []}), stderr=""
)
result = provider.run_scan("/test/directory", ["kubernetes"], [])
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Verify results
assert len(result) == 0
@@ -303,40 +330,43 @@ class TestIacProvider:
"""Test run_scan with multiple reports from different frameworks"""
provider = IacProvider()
# Create sample output with multiple frameworks
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK],
"passed_checks": [],
"skipped_checks": [],
# Create sample Trivy output with multiple frameworks
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [SAMPLE_FAILED_CHECK],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
},
{
"check_type": "kubernetes",
"results": {
"failed_checks": [],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
{
"Target": "deployment.yaml",
"Type": "kubernetes",
"Misconfigurations": [SAMPLE_PASSED_CHECK],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
},
]
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["terraform", "kubernetes"], [])
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Verify results
assert len(result) == 2
assert all(isinstance(report, CheckReportIAC) for report in result)
# Check that we have one FAIL and one PASS report
# Check that we have FAIL reports (Trivy findings are always FAIL by default)
statuses = [report.status for report in result]
assert "FAIL" in statuses
assert "PASS" in statuses
assert all(status == "FAIL" for status in statuses)
@patch("subprocess.run")
def test_run_scan_exception_handling(self, mock_subprocess):
@@ -347,44 +377,49 @@ class TestIacProvider:
mock_subprocess.side_effect = Exception("Test exception")
with pytest.raises(SystemExit) as exc_info:
provider.run_scan("/test/directory", ["terraform"], [])
provider.run_scan("/test/directory", ["vuln", "misconfig", "secret"], [])
assert exc_info.value.code == 1
@patch("subprocess.run")
def test_run_scan_with_different_frameworks(self, mock_subprocess):
"""Test run_scan with different framework configurations"""
"""Test run_scan with different scanner configurations"""
provider = IacProvider()
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
}
]
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [SAMPLE_PASSED_CHECK],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
# Test with specific frameworks
frameworks = ["terraform", "kubernetes", "cloudformation"]
result = provider.run_scan("/test/directory", frameworks, [])
# Test with specific scanners
scanners = ["vuln", "misconfig", "secret"]
result = provider.run_scan("/test/directory", scanners, [])
# Verify subprocess was called with correct frameworks
# Verify subprocess was called with correct scanners
mock_subprocess.assert_called_once_with(
[
"checkov",
"-d",
"trivy",
"fs",
"/test/directory",
"-o",
"--format",
"json",
"-f",
",".join(frameworks),
"--scanners",
",".join(scanners),
"--parallel",
"0",
"--include-non-failures",
],
capture_output=True,
text=True,
@@ -392,23 +427,25 @@ class TestIacProvider:
# Verify results
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status == "FAIL" # Trivy findings are always FAIL by default
@patch("subprocess.run")
def test_run_scan_with_exclude_paths(self, mock_subprocess):
"""Test run_scan with exclude paths"""
provider = IacProvider()
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
}
]
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [SAMPLE_PASSED_CHECK],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
@@ -416,18 +453,23 @@ class TestIacProvider:
# Test with exclude paths
exclude_paths = ["node_modules", ".git", "vendor"]
result = provider.run_scan("/test/directory", ["all"], exclude_paths)
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], exclude_paths
)
# Verify subprocess was called with correct exclude paths
expected_command = [
"checkov",
"-d",
"trivy",
"fs",
"/test/directory",
"-o",
"--format",
"json",
"-f",
"all",
"--skip-path",
"--scanners",
"vuln,misconfig,secret",
"--parallel",
"0",
"--include-non-failures",
"--skip-dirs",
",".join(exclude_paths),
]
mock_subprocess.assert_called_once_with(
@@ -438,38 +480,47 @@ class TestIacProvider:
# Verify results
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status == "FAIL" # Trivy findings are always FAIL by default
@patch("subprocess.run")
def test_run_scan_all_check_types(self, mock_subprocess):
"""Test run_scan with all types of checks (failed, passed, skipped)"""
provider = IacProvider()
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK, SAMPLE_HIGH_SEVERITY_CHECK],
"passed_checks": [SAMPLE_PASSED_CHECK, SAMPLE_CLOUDFORMATION_CHECK],
"skipped_checks": [SAMPLE_SKIPPED_CHECK],
},
}
]
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [
SAMPLE_FAILED_CHECK,
SAMPLE_HIGH_SEVERITY_CHECK,
SAMPLE_PASSED_CHECK,
SAMPLE_CLOUDFORMATION_CHECK,
SAMPLE_SKIPPED_CHECK,
],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
}
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["all"], [])
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Verify results
assert len(result) == 5 # 2 failed + 2 passed + 1 skipped
assert len(result) == 5 # 5 misconfigurations
# Check status distribution
statuses = [report.status for report in result]
assert statuses.count("FAIL") == 2
assert statuses.count("PASS") == 2
assert statuses.count("MUTED") == 1
assert statuses.count("FAIL") == 4 # 4 regular findings
assert statuses.count("MUTED") == 1 # 1 skipped finding
# Check that muted reports have muted=True
muted_reports = [report for report in result if report.status == "MUTED"]
@@ -481,9 +532,13 @@ class TestIacProvider:
provider = IacProvider()
# Return empty list of reports
mock_subprocess.return_value = MagicMock(stdout="[]", stderr="")
mock_subprocess.return_value = MagicMock(
stdout=json.dumps({"Results": []}), stderr=""
)
result = provider.run_scan("/test/directory", ["terraform"], [])
result = provider.run_scan(
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Verify results
assert len(result) == 0
@@ -493,60 +548,70 @@ class TestIacProvider:
"""Test run_scan with multiple frameworks and different types of checks"""
provider = IacProvider()
# Create sample output with multiple frameworks and different check types
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK, SAMPLE_ANOTHER_FAILED_CHECK],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
# Create sample Trivy output with multiple frameworks and different check types
sample_output = {
"Results": [
{
"Target": "main.tf",
"Type": "terraform",
"Misconfigurations": [
SAMPLE_FAILED_CHECK,
SAMPLE_ANOTHER_FAILED_CHECK,
SAMPLE_PASSED_CHECK,
],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
},
{
"check_type": "kubernetes",
"results": {
"failed_checks": [SAMPLE_KUBERNETES_CHECK],
"passed_checks": [],
"skipped_checks": [SAMPLE_ANOTHER_SKIPPED_CHECK],
{
"Target": "deployment.yaml",
"Type": "kubernetes",
"Misconfigurations": [
SAMPLE_KUBERNETES_CHECK,
SAMPLE_ANOTHER_SKIPPED_CHECK,
],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
},
{
"check_type": "cloudformation",
"results": {
"failed_checks": [],
"passed_checks": [
{
"Target": "template.yaml",
"Type": "cloudformation",
"Misconfigurations": [
SAMPLE_CLOUDFORMATION_CHECK,
SAMPLE_ANOTHER_PASSED_CHECK,
],
"skipped_checks": [],
"Vulnerabilities": [],
"Secrets": [],
"Licenses": [],
},
},
]
]
}
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan(
"/test/directory", ["terraform", "kubernetes", "cloudformation"], []
"/test/directory", ["vuln", "misconfig", "secret"], []
)
# Verify results
assert (
len(result) == 7
) # 2 failed + 1 passed (terraform) + 1 failed + 1 skipped (kubernetes) + 2 passed (cloudformation)
) # 3 terraform + 2 kubernetes + 2 cloudformation = 7 total
# Check status distribution
statuses = [report.status for report in result]
assert statuses.count("FAIL") == 3
assert statuses.count("PASS") == 3
assert statuses.count("MUTED") == 1
assert statuses.count("FAIL") == 6 # 6 regular findings
assert statuses.count("MUTED") == 1 # 1 skipped finding
def test_run_method_calls_run_scan(self):
"""Test that the run method calls run_scan with correct parameters"""
provider = IacProvider(
scan_path="/custom/path", frameworks=["terraform"], exclude_path=["exclude"]
scan_path="/custom/path",
scanners=["vuln", "misconfig"],
exclude_path=["exclude"],
)
with patch.object(provider, "run_scan") as mock_run_scan:
@@ -554,7 +619,7 @@ class TestIacProvider:
provider.run()
mock_run_scan.assert_called_once_with(
"/custom/path", ["terraform"], ["exclude"]
"/custom/path", ["vuln", "misconfig"], ["exclude"]
)
@mock.patch("prowler.providers.iac.iac_provider.porcelain.clone")