refactor(iac): revert importingcheckov as python library (#8385)

This commit is contained in:
Sergio Garcia
2025-07-29 15:55:28 +08:00
committed by GitHub
parent 92a804bf88
commit 1bdcf2c7f1
8 changed files with 986 additions and 1969 deletions

1807
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -14,6 +14,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Changed
- Handle some AWS errors as warnings instead of errors [(#8347)](https://github.com/prowler-cloud/prowler/pull/8347)
- Revert import of `checkov` python library [(#8385)](https://github.com/prowler-cloud/prowler/pull/8385)
- Updated policy mapping in ISMS-P compliance file for improved alignment [(#8367)](https://github.com/prowler-cloud/prowler/pull/8367)
### Fixed

View File

@@ -7,7 +7,6 @@ from dataclasses import asdict, dataclass, is_dataclass
from enum import Enum
from typing import Any, Dict, Optional, Set
from checkov.common.output.record import Record
from pydantic.v1 import BaseModel, ValidationError, validator
from prowler.config.config import Provider
@@ -155,7 +154,7 @@ class CheckMetadata(BaseModel):
raise ValueError("ServiceName must be a non-empty string")
check_id = values.get("CheckID")
if check_id:
if check_id and values.get("Provider") != "iac":
service_from_check_id = check_id.split("_")[0]
if service_name != service_from_check_id:
raise ValueError(
@@ -472,8 +471,6 @@ class Check_Report:
self.resource = resource.to_dict()
elif is_dataclass(resource):
self.resource = asdict(resource)
elif hasattr(resource, "__dict__"):
self.resource = resource.__dict__
else:
logger.error(
f"Resource metadata {type(resource)} in {self.check_metadata.CheckID} could not be converted to dict"
@@ -659,7 +656,7 @@ class CheckReportIAC(Check_Report):
resource_path: str
resource_line_range: str
def __init__(self, metadata: dict = {}, resource: Record = None) -> None:
def __init__(self, metadata: dict = {}, finding: dict = {}) -> None:
"""
Initialize the IAC Check's finding information from a Checkov failed_check dict.
@@ -667,10 +664,11 @@ class CheckReportIAC(Check_Report):
metadata (Dict): Optional check metadata (can be None).
failed_check (dict): A single failed_check result from Checkov's JSON output.
"""
super().__init__(metadata, resource)
self.resource_name = resource.resource
self.resource_path = resource.file_path
self.resource_line_range = resource.file_line_range
super().__init__(metadata, finding)
self.resource_name = getattr(finding, "resource", "")
self.resource_path = getattr(finding, "file_path", "")
self.resource_line_range = getattr(finding, "file_line_range", "")
@dataclass

View File

@@ -286,8 +286,8 @@ class Finding(BaseModel):
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = "iac"
output_data["account_name"] = "iac"
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_name
output_data["resource_name"] = check_output.resource["resource"]
output_data["resource_uid"] = check_output.resource["resource"]
output_data["region"] = check_output.resource_path
output_data["resource_line_range"] = check_output.resource_line_range
output_data["framework"] = check_output.check_metadata.ServiceName

View File

@@ -1,43 +1,12 @@
import json
import shutil
import subprocess
import sys
import tempfile
from os import environ
from typing import List
from alive_progress import alive_bar
from checkov.ansible.runner import Runner as AnsibleRunner
from checkov.argo_workflows.runner import Runner as ArgoWorkflowsRunner
from checkov.arm.runner import Runner as ArmRunner
from checkov.azure_pipelines.runner import Runner as AzurePipelinesRunner
from checkov.bicep.runner import Runner as BicepRunner
from checkov.bitbucket.runner import Runner as BitbucketRunner
from checkov.bitbucket_pipelines.runner import Runner as BitbucketPipelinesRunner
from checkov.cdk.runner import CdkRunner
from checkov.circleci_pipelines.runner import Runner as CircleciPipelinesRunner
from checkov.cloudformation.runner import Runner as CfnRunner
from checkov.common.output.record import Record
from checkov.common.output.report import Report
from checkov.common.runners.runner_registry import RunnerRegistry
from checkov.dockerfile.runner import Runner as DockerfileRunner
from checkov.github.runner import Runner as GithubRunner
from checkov.github_actions.runner import Runner as GithubActionsRunner
from checkov.gitlab.runner import Runner as GitlabRunner
from checkov.gitlab_ci.runner import Runner as GitlabCiRunner
from checkov.helm.runner import Runner as HelmRunner
from checkov.json_doc.runner import Runner as JsonDocRunner
from checkov.kubernetes.runner import Runner as K8sRunner
from checkov.kustomize.runner import Runner as KustomizeRunner
from checkov.openapi.runner import Runner as OpenapiRunner
from checkov.runner_filter import RunnerFilter
from checkov.sast.runner import Runner as SastRunner
from checkov.sca_image.runner import Runner as ScaImageRunner
from checkov.sca_package_2.runner import Runner as ScaPackage2Runner
from checkov.secrets.runner import Runner as SecretsRunner
from checkov.serverless.runner import Runner as ServerlessRunner
from checkov.terraform.runner import Runner as TerraformRunner
from checkov.terraform_json.runner import TerraformJsonRunner
from checkov.yaml_doc.runner import Runner as YamlDocRunner
from colorama import Fore, Style
from dulwich import porcelain
@@ -165,9 +134,7 @@ class IacProvider(Provider):
"""IAC provider doesn't need a session since it uses Checkov directly"""
return None
def _process_check(
self, finding: Report, check: Record, status: str
) -> CheckReportIAC:
def _process_check(self, finding: dict, check: dict, status: str) -> CheckReportIAC:
"""
Process a single check (failed or passed) and create a CheckReportIAC object.
@@ -182,17 +149,23 @@ class IacProvider(Provider):
try:
metadata_dict = {
"Provider": "iac",
"CheckID": check.check_id,
"CheckTitle": check.check_name,
"CheckID": check.get("check_id", ""),
"CheckTitle": check.get("check_name", ""),
"CheckType": ["Infrastructure as Code"],
"ServiceName": finding.check_type,
"ServiceName": finding["check_type"],
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": (check.severity.lower() if check.severity else "low"),
"ResourceType": finding.check_type,
"Description": check.check_name,
"Severity": (
check.get("severity", "low").lower()
if check.get("severity")
else "low"
),
"ResourceType": "iac",
"Description": check.get("check_name", ""),
"Risk": "",
"RelatedUrl": (check.guideline if check.guideline else ""),
"RelatedUrl": (
check.get("guideline", "") if check.get("guideline") else ""
),
"Remediation": {
"Code": {
"NativeIaC": "",
@@ -202,7 +175,9 @@ class IacProvider(Provider):
},
"Recommendation": {
"Text": "",
"Url": (check.guideline if check.guideline else ""),
"Url": (
check.get("guideline", "") if check.get("guideline") else ""
),
},
},
"Categories": [],
@@ -214,10 +189,10 @@ class IacProvider(Provider):
# Convert metadata dict to JSON string
metadata = json.dumps(metadata_dict)
report = CheckReportIAC(metadata=metadata, resource=check)
report = CheckReportIAC(metadata=metadata, finding=check)
report.status = status
report.resource_tags = check.entity_tags
report.status_extended = check.check_name
report.resource_tags = check.get("entity_tags", {})
report.status_extended = check.get("check_name", "")
if status == "MUTED":
report.muted = True
return report
@@ -298,59 +273,73 @@ class IacProvider(Provider):
self, directory: str, frameworks: list[str], exclude_path: list[str]
) -> List[CheckReportIAC]:
try:
logger.info(f"Running IaC scan on {directory}...")
runners = [
TerraformRunner(),
CfnRunner(),
K8sRunner(),
ArmRunner(),
ServerlessRunner(),
DockerfileRunner(),
YamlDocRunner(),
OpenapiRunner(),
SastRunner(),
ScaImageRunner(),
ScaPackage2Runner(),
SecretsRunner(),
AnsibleRunner(),
ArgoWorkflowsRunner(),
BitbucketRunner(),
BitbucketPipelinesRunner(),
CdkRunner(),
CircleciPipelinesRunner(),
GithubRunner(),
GithubActionsRunner(),
GitlabRunner(),
GitlabCiRunner(),
HelmRunner(),
JsonDocRunner(),
TerraformJsonRunner(),
KustomizeRunner(),
AzurePipelinesRunner(),
BicepRunner(),
logger.info(f"Running IaC scan on {directory} ...")
checkov_command = [
"checkov",
"-d",
directory,
"-o",
"json",
"-f",
",".join(frameworks),
]
runner_filter = RunnerFilter(
framework=frameworks, excluded_paths=exclude_path
if exclude_path:
checkov_command.extend(["--skip-path", ",".join(exclude_path)])
# Run Checkov with JSON output
process = subprocess.run(
checkov_command,
capture_output=True,
text=True,
)
# Log Checkov's error output if any
if process.stderr:
logger.error(process.stderr)
registry = RunnerRegistry("", runner_filter, *runners)
checkov_reports = registry.run(root_folder=directory)
try:
output = json.loads(process.stdout)
if not output:
logger.warning("No findings returned from Checkov scan")
return []
except Exception as error:
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
sys.exit(1)
reports: List[CheckReportIAC] = []
for report in checkov_reports:
reports = []
for failed in report.failed_checks:
reports.append(self._process_check(report, failed, "FAIL"))
# If only one framework has findings, the output is a dict, otherwise it's a list of dicts
if isinstance(output, dict):
output = [output]
for passed in report.passed_checks:
reports.append(self._process_check(report, passed, "PASS"))
# Process all frameworks findings
for finding in output:
results = finding.get("results", {})
for skipped in report.skipped_checks:
reports.append(self._process_check(report, skipped, "MUTED"))
# Process failed checks
failed_checks = results.get("failed_checks", [])
for failed_check in failed_checks:
report = self._process_check(finding, failed_check, "FAIL")
reports.append(report)
# Process passed checks
passed_checks = results.get("passed_checks", [])
for passed_check in passed_checks:
report = self._process_check(finding, passed_check, "PASS")
reports.append(report)
# Process skipped checks (muted)
skipped_checks = results.get("skipped_checks", [])
for skipped_check in skipped_checks:
report = self._process_check(finding, skipped_check, "MUTED")
reports.append(report)
return reports
except Exception as error:
if "No such file or directory: 'checkov'" in str(error):
logger.critical("Please, install checkov using 'pip install checkov'")
sys.exit(1)
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)

View File

@@ -12,7 +12,7 @@ classifiers = [
]
dependencies = [
"awsipranges==0.3.3",
"alive-progress==3.2.0",
"alive-progress==3.3.0",
"azure-identity==1.21.0",
"azure-keyvault-keys==4.10.0",
"azure-mgmt-applicationinsights==4.1.0",
@@ -36,8 +36,8 @@ dependencies = [
"azure-mgmt-subscription==3.1.1",
"azure-mgmt-web==8.0.0",
"azure-storage-blob==12.24.1",
"boto3==1.35.49",
"botocore==1.35.99",
"boto3==1.39.14",
"botocore==1.39.14",
"colorama==0.4.6",
"cryptography==44.0.1",
"dash==3.1.1",
@@ -62,7 +62,6 @@ dependencies = [
"slack-sdk==3.34.0",
"tabulate==0.9.0",
"tzlocal==5.3.1",
"checkov==3.2.445",
"py-iam-expand==0.1.0"
]
description = "Prowler is an Open Source security tool to perform AWS, GCP and Azure security best practices assessments, audits, incident response, continuous monitoring, hardening and forensics readiness. It contains hundreds of controls covering CIS, NIST 800, NIST CSF, CISA, RBI, FedRAMP, PCI-DSS, GDPR, HIPAA, FFIEC, SOC2, GXP, AWS Well-Architected Framework Security Pillar, AWS Foundational Technical Review (FTR), ENS (Spanish National Security Scheme) and your custom security frameworks."

View File

@@ -1,257 +1,167 @@
from checkov.common.models.enums import CheckResult
from checkov.common.output.record import Record
from checkov.common.output.report import Report
# IAC Provider Constants
DEFAULT_SCAN_PATH = "."
# Sample Checkov Output
SAMPLE_CHECKOV_OUTPUT = [
{
"check_type": "terraform",
"results": {
"failed_checks": [
{
"check_id": "CKV_AWS_1",
"check_name": "Ensure S3 bucket has encryption enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled",
"severity": "low",
},
{
"check_id": "CKV_AWS_2",
"check_name": "Ensure S3 bucket has public access blocked",
"guideline": "https://docs.bridgecrew.io/docs/s3_2-s3-bucket-has-public-access-blocked",
"severity": "low",
},
],
"passed_checks": [
{
"check_id": "CKV_AWS_3",
"check_name": "Ensure S3 bucket has versioning enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled",
"severity": "low",
}
],
},
}
]
# Sample Finding Data
SAMPLE_FINDING = Report(check_type="terraform")
SAMPLE_FAILED_CHECK = Record(
check_id="CKV_AWS_1",
check_name="Ensure S3 bucket has encryption enabled",
severity="low",
file_path="test.tf",
file_line_range=[1, 2],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_FAILED_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled"
)
SAMPLE_FINDING = SAMPLE_CHECKOV_OUTPUT[0]
SAMPLE_PASSED_CHECK = Record(
check_id="CKV_AWS_3",
check_name="Ensure S3 bucket has versioning enabled",
severity="low",
file_path="test.tf",
file_line_range=[1, 2],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.PASSED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_PASSED_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled"
)
SAMPLE_FAILED_CHECK = {
"check_id": "CKV_AWS_1",
"check_name": "Ensure S3 bucket has encryption enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled",
"severity": "low",
}
# Additional test fixtures for comprehensive testing
SAMPLE_SKIPPED_CHECK = Record(
check_id="CKV_AWS_2",
check_name="Ensure S3 bucket has public access blocked",
severity="high",
file_path="test.tf",
file_line_range=[3, 4],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.SKIPPED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_SKIPPED_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_2-s3-bucket-has-public-access-blocked"
)
SAMPLE_PASSED_CHECK = {
"check_id": "CKV_AWS_3",
"check_name": "Ensure S3 bucket has versioning enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled",
"severity": "low",
}
SAMPLE_HIGH_SEVERITY_CHECK = Record(
check_id="CKV_AWS_4",
check_name="Ensure S3 bucket has logging enabled",
severity="HIGH",
file_path="test.tf",
file_line_range=[5, 6],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_HIGH_SEVERITY_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_4-s3-bucket-has-logging-enabled"
)
# Additional sample checks
SAMPLE_ANOTHER_FAILED_CHECK = {
"check_id": "CKV_AWS_4",
"check_name": "Ensure S3 bucket has logging enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_4-s3-bucket-has-logging-enabled",
"severity": "medium",
}
SAMPLE_KUBERNETES_CHECK = Record(
check_id="CKV_K8S_1",
check_name="Ensure API server has audit logging enabled",
severity="medium",
file_path="deployment.yaml",
file_line_range=[1, 10],
resource="kubernetes_deployment.test_deployment",
evaluations=[],
check_class="kubernetes",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="deployment.yaml",
)
SAMPLE_KUBERNETES_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/k8s_1-api-server-has-audit-logging-enabled"
)
SAMPLE_ANOTHER_PASSED_CHECK = {
"check_id": "CKV_AWS_5",
"check_name": "Ensure S3 bucket has lifecycle policy",
"guideline": "https://docs.bridgecrew.io/docs/s3_5-s3-bucket-has-lifecycle-policy",
"severity": "low",
}
SAMPLE_CLOUDFORMATION_CHECK = Record(
check_id="CKV_AWS_5",
check_name="Ensure CloudFormation stacks are not publicly accessible",
severity="critical",
file_path="template.yaml",
file_line_range=[1, 20],
resource="AWS::CloudFormation::Stack",
evaluations=[],
check_class="cloudformation",
check_result=CheckResult.PASSED,
code_block=[],
file_abs_path="template.yaml",
)
SAMPLE_CLOUDFORMATION_CHECK.guideline = "https://docs.bridgecrew.io/docs/cfn_1-cloudformation-stacks-are-not-publicly-accessible"
SAMPLE_ANOTHER_SKIPPED_CHECK = {
"check_id": "CKV_AWS_6",
"check_name": "Ensure S3 bucket has object lock enabled",
"guideline": "https://docs.bridgecrew.io/docs/s3_6-s3-bucket-has-object-lock-enabled",
"severity": "high",
"suppress_comment": "Not applicable for this use case",
}
# Sample findings for different frameworks
SAMPLE_KUBERNETES_FINDING = Report(check_type="kubernetes")
SAMPLE_CLOUDFORMATION_FINDING = Report(check_type="cloudformation")
SAMPLE_SKIPPED_CHECK = {
"check_id": "CKV_AWS_7",
"check_name": "Ensure S3 bucket has server-side encryption",
"guideline": "https://docs.bridgecrew.io/docs/s3_7-s3-bucket-has-server-side-encryption",
"severity": "medium",
"suppress_comment": "Legacy bucket, will be migrated",
}
# Additional fixtures for different test scenarios
SAMPLE_CHECK_WITHOUT_GUIDELINE = Record(
check_id="CKV_AWS_6",
check_name="Test check without guideline",
severity="low",
file_path="test.tf",
file_line_range=[1, 2],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="test.tf",
)
# Note: No guideline attribute set
SAMPLE_HIGH_SEVERITY_CHECK = {
"check_id": "CKV_AWS_8",
"check_name": "Ensure S3 bucket has public access blocked",
"guideline": "https://docs.bridgecrew.io/docs/s3_8-s3-bucket-has-public-access-blocked",
"severity": "high",
}
SAMPLE_MEDIUM_SEVERITY_CHECK = Record(
check_id="CKV_AWS_7",
check_name="Ensure S3 bucket has proper access controls",
severity="MEDIUM",
file_path="test.tf",
file_line_range=[7, 8],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_MEDIUM_SEVERITY_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_7-s3-bucket-has-proper-access-controls"
)
# Dockerfile samples
SAMPLE_DOCKERFILE_REPORT = {
"check_type": "dockerfile",
"results": {
"failed_checks": [
{
"check_id": "CKV_DOCKER_1",
"check_name": "Ensure base image is not using latest tag",
"guideline": "https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag",
"severity": "medium",
}
],
"passed_checks": [],
},
}
SAMPLE_CRITICAL_SEVERITY_CHECK = Record(
check_id="CKV_AWS_8",
check_name="Ensure S3 bucket has encryption at rest",
severity="CRITICAL",
file_path="test.tf",
file_line_range=[9, 10],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_CRITICAL_SEVERITY_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_8-s3-bucket-has-encryption-at-rest"
)
SAMPLE_DOCKERFILE_CHECK = {
"check_id": "CKV_DOCKER_1",
"check_name": "Ensure base image is not using latest tag",
"guideline": "https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag",
"severity": "medium",
}
# Sample reports for different frameworks
SAMPLE_TERRAFORM_REPORT = Report(check_type="terraform")
SAMPLE_KUBERNETES_REPORT = Report(check_type="kubernetes")
SAMPLE_CLOUDFORMATION_REPORT = Report(check_type="cloudformation")
SAMPLE_DOCKERFILE_REPORT = Report(check_type="dockerfile")
SAMPLE_YAML_REPORT = Report(check_type="yaml")
# YAML samples
SAMPLE_YAML_REPORT = {
"check_type": "yaml",
"results": {
"failed_checks": [
{
"check_id": "CKV_K8S_1",
"check_name": "Ensure API server is not exposed",
"guideline": "https://docs.bridgecrew.io/docs/k8s_1-api-server-not-exposed",
"severity": "high",
}
],
"passed_checks": [],
},
}
# Sample checks for different frameworks
SAMPLE_DOCKERFILE_CHECK = Record(
check_id="CKV_DOCKER_1",
check_name="Ensure base image is not using latest tag",
severity="high",
file_path="Dockerfile",
file_line_range=[1, 1],
resource="Dockerfile",
evaluations=[],
check_class="dockerfile",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="Dockerfile",
)
SAMPLE_DOCKERFILE_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag"
)
SAMPLE_YAML_CHECK = {
"check_id": "CKV_K8S_1",
"check_name": "Ensure API server is not exposed",
"guideline": "https://docs.bridgecrew.io/docs/k8s_1-api-server-not-exposed",
"severity": "high",
}
SAMPLE_YAML_CHECK = Record(
check_id="CKV_YAML_1",
check_name="Ensure YAML file has proper indentation",
severity="low",
file_path="config.yaml",
file_line_range=[1, 5],
resource="config.yaml",
evaluations=[],
check_class="yaml",
check_result=CheckResult.PASSED,
code_block=[],
file_abs_path="config.yaml",
)
SAMPLE_YAML_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/yaml_1-proper-indentation"
)
# CloudFormation samples
SAMPLE_CLOUDFORMATION_CHECK = {
"check_id": "CKV_AWS_9",
"check_name": "Ensure CloudFormation stack has drift detection enabled",
"guideline": "https://docs.bridgecrew.io/docs/aws_9-cloudformation-stack-has-drift-detection-enabled",
"severity": "low",
}
# Sample checks with different statuses for comprehensive testing
SAMPLE_ANOTHER_FAILED_CHECK = Record(
check_id="CKV_AWS_9",
check_name="Ensure S3 bucket has lifecycle policy",
severity="medium",
file_path="test.tf",
file_line_range=[11, 12],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.FAILED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_ANOTHER_FAILED_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_9-s3-bucket-has-lifecycle-policy"
)
# Kubernetes samples
SAMPLE_KUBERNETES_CHECK = {
"check_id": "CKV_K8S_2",
"check_name": "Ensure RBAC is enabled",
"guideline": "https://docs.bridgecrew.io/docs/k8s_2-rbac-enabled",
"severity": "medium",
}
SAMPLE_ANOTHER_PASSED_CHECK = Record(
check_id="CKV_AWS_10",
check_name="Ensure S3 bucket has proper tags",
severity="low",
file_path="test.tf",
file_line_range=[13, 14],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.PASSED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_ANOTHER_PASSED_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_10-s3-bucket-has-proper-tags"
)
SAMPLE_ANOTHER_SKIPPED_CHECK = Record(
check_id="CKV_AWS_11",
check_name="Ensure S3 bucket has cross-region replication",
severity="high",
file_path="test.tf",
file_line_range=[15, 16],
resource="aws_s3_bucket.test_bucket",
evaluations=[],
check_class="terraform",
check_result=CheckResult.SKIPPED,
code_block=[],
file_abs_path="test.tf",
)
SAMPLE_ANOTHER_SKIPPED_CHECK.guideline = (
"https://docs.bridgecrew.io/docs/s3_11-s3-bucket-has-cross-region-replication"
)
def get_sample_checkov_json_output():
"""Return sample Checkov JSON output as string"""
import json
return json.dumps(SAMPLE_CHECKOV_OUTPUT)
def get_empty_checkov_output():
"""Return empty Checkov output as string"""
return "[]"
def get_invalid_checkov_output():
"""Return invalid JSON output as string"""
return "invalid json output"

View File

@@ -1,7 +1,8 @@
import json
import os
import tempfile
from unittest import mock
from unittest.mock import Mock, patch
from unittest.mock import MagicMock, patch
import pytest
@@ -12,21 +13,20 @@ from tests.providers.iac.iac_fixtures import (
SAMPLE_ANOTHER_FAILED_CHECK,
SAMPLE_ANOTHER_PASSED_CHECK,
SAMPLE_ANOTHER_SKIPPED_CHECK,
SAMPLE_CHECK_WITHOUT_GUIDELINE,
SAMPLE_CLOUDFORMATION_CHECK,
SAMPLE_CRITICAL_SEVERITY_CHECK,
SAMPLE_DOCKERFILE_CHECK,
SAMPLE_DOCKERFILE_REPORT,
SAMPLE_FAILED_CHECK,
SAMPLE_FINDING,
SAMPLE_HIGH_SEVERITY_CHECK,
SAMPLE_KUBERNETES_CHECK,
SAMPLE_KUBERNETES_FINDING,
SAMPLE_MEDIUM_SEVERITY_CHECK,
SAMPLE_PASSED_CHECK,
SAMPLE_SKIPPED_CHECK,
SAMPLE_YAML_CHECK,
SAMPLE_YAML_REPORT,
get_empty_checkov_output,
get_invalid_checkov_output,
get_sample_checkov_json_output,
)
@@ -61,10 +61,10 @@ class TestIacProvider:
assert report.status == "FAIL"
assert report.check_metadata.Provider == "iac"
assert report.check_metadata.CheckID == SAMPLE_FAILED_CHECK.check_id
assert report.check_metadata.CheckTitle == SAMPLE_FAILED_CHECK.check_name
assert report.check_metadata.CheckID == SAMPLE_FAILED_CHECK["check_id"]
assert report.check_metadata.CheckTitle == SAMPLE_FAILED_CHECK["check_name"]
assert report.check_metadata.Severity == "low"
assert report.check_metadata.RelatedUrl == SAMPLE_FAILED_CHECK.guideline
assert report.check_metadata.RelatedUrl == SAMPLE_FAILED_CHECK["guideline"]
def test_iac_provider_process_check_passed(self):
"""Test processing a passed check"""
@@ -76,63 +76,49 @@ class TestIacProvider:
assert report.status == "PASS"
assert report.check_metadata.Provider == "iac"
assert report.check_metadata.CheckID == SAMPLE_PASSED_CHECK.check_id
assert report.check_metadata.CheckTitle == SAMPLE_PASSED_CHECK.check_name
assert report.check_metadata.CheckID == SAMPLE_PASSED_CHECK["check_id"]
assert report.check_metadata.CheckTitle == SAMPLE_PASSED_CHECK["check_name"]
assert report.check_metadata.Severity == "low"
assert report.check_metadata.RelatedUrl == SAMPLE_PASSED_CHECK.guideline
def test_iac_provider_process_check_skipped(self):
"""Test processing a skipped check"""
@patch("subprocess.run")
def test_iac_provider_run_scan_success(self, mock_subprocess):
"""Test successful IAC scan with Checkov"""
provider = IacProvider()
report = provider._process_check(SAMPLE_FINDING, SAMPLE_SKIPPED_CHECK, "MUTED")
assert isinstance(report, CheckReportIAC)
assert report.status == "MUTED"
assert report.muted is True
assert report.check_metadata.Provider == "iac"
assert report.check_metadata.CheckID == SAMPLE_SKIPPED_CHECK.check_id
assert report.check_metadata.CheckTitle == SAMPLE_SKIPPED_CHECK.check_name
assert report.check_metadata.Severity == "high"
assert report.check_metadata.RelatedUrl == SAMPLE_SKIPPED_CHECK.guideline
def test_iac_provider_process_check_high_severity(self):
"""Test processing a high severity check"""
provider = IacProvider()
report = provider._process_check(
SAMPLE_FINDING, SAMPLE_HIGH_SEVERITY_CHECK, "FAIL"
mock_subprocess.return_value = MagicMock(
stdout=get_sample_checkov_json_output(), stderr=""
)
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.Severity == "high"
reports = provider.run_scan("/test/directory", ["all"], [])
def test_iac_provider_process_check_different_framework(self):
"""Test processing a check from a different framework (Kubernetes)"""
provider = IacProvider()
# Should have 2 failed checks + 1 passed check = 3 total reports
assert len(reports) == 3
report = provider._process_check(
SAMPLE_KUBERNETES_FINDING, SAMPLE_KUBERNETES_CHECK, "FAIL"
# Check that we have both failed and passed reports
failed_reports = [r for r in reports if r.status == "FAIL"]
passed_reports = [r for r in reports if r.status == "PASS"]
assert len(failed_reports) == 2
assert len(passed_reports) == 1
# Verify subprocess was called correctly
mock_subprocess.assert_called_once_with(
["checkov", "-d", "/test/directory", "-o", "json", "-f", "all"],
capture_output=True,
text=True,
)
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.ServiceName == "kubernetes"
assert report.check_metadata.CheckID == SAMPLE_KUBERNETES_CHECK.check_id
def test_iac_provider_process_check_no_guideline(self):
"""Test processing a check without guideline URL"""
@patch("subprocess.run")
def test_iac_provider_run_scan_empty_output(self, mock_subprocess):
"""Test IAC scan with empty Checkov output"""
provider = IacProvider()
report = provider._process_check(
SAMPLE_FINDING, SAMPLE_CHECK_WITHOUT_GUIDELINE, "FAIL"
mock_subprocess.return_value = MagicMock(
stdout=get_empty_checkov_output(), stderr=""
)
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.RelatedUrl == ""
reports = provider.run_scan("/test/directory", ["all"], [])
assert len(reports) == 0
def test_provider_run_local_scan(self):
scan_path = "."
@@ -191,29 +177,29 @@ class TestIacProvider:
for call in mock_print.call_args_list
)
def test_iac_provider_process_check_medium_severity(self):
@patch("subprocess.run")
def test_iac_provider_process_check_medium_severity(self, mock_subprocess):
"""Test processing a medium severity check"""
provider = IacProvider()
report = provider._process_check(
SAMPLE_FINDING, SAMPLE_MEDIUM_SEVERITY_CHECK, "FAIL"
mock_subprocess.return_value = MagicMock(
stdout=get_invalid_checkov_output(), stderr=""
)
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.Severity == "medium"
with pytest.raises(SystemExit) as excinfo:
provider.run_scan("/test/directory", ["all"], [])
def test_iac_provider_process_check_critical_severity(self):
"""Test processing a critical severity check"""
assert excinfo.value.code == 1
@patch("subprocess.run")
def test_iac_provider_run_scan_null_output(self, mock_subprocess):
"""Test IAC scan with null Checkov output"""
provider = IacProvider()
report = provider._process_check(
SAMPLE_FINDING, SAMPLE_CRITICAL_SEVERITY_CHECK, "FAIL"
)
mock_subprocess.return_value = MagicMock(stdout="null", stderr="")
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.Severity == "critical"
reports = provider.run_scan("/test/directory", ["all"], [])
assert len(reports) == 0
def test_iac_provider_process_check_dockerfile(self):
"""Test processing a Dockerfile check"""
@@ -226,7 +212,7 @@ class TestIacProvider:
assert isinstance(report, CheckReportIAC)
assert report.status == "FAIL"
assert report.check_metadata.ServiceName == "dockerfile"
assert report.check_metadata.CheckID == SAMPLE_DOCKERFILE_CHECK.check_id
assert report.check_metadata.CheckID == SAMPLE_DOCKERFILE_CHECK["check_id"]
def test_iac_provider_process_check_yaml(self):
"""Test processing a YAML check"""
@@ -237,42 +223,30 @@ class TestIacProvider:
assert isinstance(report, CheckReportIAC)
assert report.status == "PASS"
assert report.check_metadata.ServiceName == "yaml"
assert report.check_metadata.CheckID == SAMPLE_YAML_CHECK.check_id
assert report.check_metadata.CheckID == SAMPLE_YAML_CHECK["check_id"]
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_success_with_failed_and_passed_checks(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_success_with_failed_and_passed_checks(self, mock_subprocess):
"""Test successful run_scan with both failed and passed checks"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
# Create mock reports with failed and passed checks
mock_report = Mock()
mock_report.check_type = "terraform" # Set the check_type attribute
mock_report.failed_checks = [SAMPLE_FAILED_CHECK]
mock_report.passed_checks = [SAMPLE_PASSED_CHECK]
mock_report.skipped_checks = []
mock_registry_instance.run.return_value = [mock_report]
provider = IacProvider()
result = provider.run_scan("/test/directory", ["terraform"], [])
# Verify logger was called
mock_logger.info.assert_called_with("Running IaC scan on /test/directory...")
# Create sample output with both failed and passed checks
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
}
]
# Verify RunnerFilter was created with correct parameters
mock_runner_filter.assert_called_with(
framework=["terraform"], excluded_paths=[]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
# Verify RunnerRegistry was created and run was called
mock_runner_registry.assert_called_once()
mock_registry_instance.run.assert_called_with(root_folder="/test/directory")
result = provider.run_scan("/test/directory", ["terraform"], [])
# Verify results
assert len(result) == 2
@@ -283,93 +257,76 @@ class TestIacProvider:
assert "FAIL" in statuses
assert "PASS" in statuses
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_with_skipped_checks(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_with_skipped_checks(self, mock_subprocess):
"""Test run_scan with skipped checks (muted)"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
# Create mock report with skipped checks
mock_report = Mock()
mock_report.check_type = "terraform" # Set the check_type attribute
mock_report.failed_checks = []
mock_report.passed_checks = []
mock_report.skipped_checks = [SAMPLE_SKIPPED_CHECK]
mock_registry_instance.run.return_value = [mock_report]
provider = IacProvider()
result = provider.run_scan("/test/directory", ["all"], ["exclude/path"])
# Verify RunnerFilter was created with correct parameters
mock_runner_filter.assert_called_with(
framework=["all"], excluded_paths=["exclude/path"]
# Create sample output with skipped checks
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [],
"passed_checks": [],
"skipped_checks": [SAMPLE_SKIPPED_CHECK],
},
}
]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["all"], ["exclude/path"])
# Verify results
assert len(result) == 1
assert isinstance(result[0], CheckReportIAC)
assert result[0].status == "MUTED"
assert result[0].muted is True
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_empty_results(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_empty_results(self, mock_subprocess):
"""Test run_scan with no findings"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
# Create mock report with no checks
mock_report = Mock()
mock_report.check_type = "terraform" # Set the check_type attribute
mock_report.failed_checks = []
mock_report.passed_checks = []
mock_report.skipped_checks = []
mock_registry_instance.run.return_value = [mock_report]
provider = IacProvider()
mock_subprocess.return_value = MagicMock(stdout="[]", stderr="")
result = provider.run_scan("/test/directory", ["kubernetes"], [])
# Verify results
assert len(result) == 0
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_multiple_reports(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_multiple_reports(self, mock_subprocess):
"""Test run_scan with multiple reports from different frameworks"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
# Create multiple mock reports
mock_report1 = Mock()
mock_report1.check_type = "terraform" # Set the check_type attribute
mock_report1.failed_checks = [SAMPLE_FAILED_CHECK]
mock_report1.passed_checks = []
mock_report1.skipped_checks = []
mock_report2 = Mock()
mock_report2.check_type = "kubernetes" # Set the check_type attribute
mock_report2.failed_checks = []
mock_report2.passed_checks = [SAMPLE_PASSED_CHECK]
mock_report2.skipped_checks = []
mock_registry_instance.run.return_value = [mock_report1, mock_report2]
provider = IacProvider()
# Create sample output with multiple frameworks
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK],
"passed_checks": [],
"skipped_checks": [],
},
},
{
"check_type": "kubernetes",
"results": {
"failed_checks": [],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
},
]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["terraform", "kubernetes"], [])
# Verify results
@@ -381,122 +338,128 @@ class TestIacProvider:
assert "FAIL" in statuses
assert "PASS" in statuses
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
@patch("prowler.providers.iac.iac_provider.sys")
def test_run_scan_exception_handling(
self, mock_sys, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_exception_handling(self, mock_subprocess):
"""Test run_scan exception handling"""
# Setup mocks to raise an exception
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
mock_registry_instance.run.side_effect = Exception("Test exception")
# Configure sys.exit to raise SystemExit
mock_sys.exit.side_effect = SystemExit(1)
provider = IacProvider()
# The function should call sys.exit(1) when an exception occurs
# Make subprocess.run raise an exception
mock_subprocess.side_effect = Exception("Test exception")
with pytest.raises(SystemExit) as exc_info:
provider.run_scan("/test/directory", ["terraform"], [])
assert exc_info.value.code == 1
# Verify logger was called with error information
mock_logger.critical.assert_called_once()
critical_call_args = mock_logger.critical.call_args[0][0]
assert "Exception" in critical_call_args
assert "Test exception" in critical_call_args
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_with_different_frameworks(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_with_different_frameworks(self, mock_subprocess):
"""Test run_scan with different framework configurations"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
mock_report = Mock()
mock_report.check_type = "terraform" # Set the check_type attribute
mock_report.failed_checks = []
mock_report.passed_checks = [SAMPLE_PASSED_CHECK]
mock_report.skipped_checks = []
mock_registry_instance.run.return_value = [mock_report]
provider = IacProvider()
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
}
]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
# Test with specific frameworks
frameworks = ["terraform", "kubernetes", "cloudformation"]
result = provider.run_scan("/test/directory", frameworks, [])
# Verify RunnerFilter was created with correct frameworks
mock_runner_filter.assert_called_with(framework=frameworks, excluded_paths=[])
# Verify results
assert len(result) == 1
assert result[0].status == "PASS"
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_with_exclude_paths(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
"""Test run_scan with exclude paths"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
mock_report = Mock()
mock_report.check_type = "terraform" # Set the check_type attribute
mock_report.failed_checks = []
mock_report.passed_checks = [SAMPLE_PASSED_CHECK]
mock_report.skipped_checks = []
mock_registry_instance.run.return_value = [mock_report]
provider = IacProvider()
# Test with exclude paths
exclude_paths = ["node_modules", ".git", "vendor"]
result = provider.run_scan("/test/directory", ["all"], exclude_paths)
# Verify RunnerFilter was created with correct exclude paths
mock_runner_filter.assert_called_with(
framework=["all"], excluded_paths=exclude_paths
# Verify subprocess was called with correct frameworks
mock_subprocess.assert_called_once_with(
[
"checkov",
"-d",
"/test/directory",
"-o",
"json",
"-f",
",".join(frameworks),
],
capture_output=True,
text=True,
)
# Verify results
assert len(result) == 1
assert result[0].status == "PASS"
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_all_check_types(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
"""Test run_scan with all types of checks (failed, passed, skipped)"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
mock_report = Mock()
mock_report.check_type = "terraform" # Set the check_type attribute
mock_report.failed_checks = [SAMPLE_FAILED_CHECK, SAMPLE_HIGH_SEVERITY_CHECK]
mock_report.passed_checks = [SAMPLE_PASSED_CHECK, SAMPLE_CLOUDFORMATION_CHECK]
mock_report.skipped_checks = [SAMPLE_SKIPPED_CHECK]
mock_registry_instance.run.return_value = [mock_report]
@patch("subprocess.run")
def test_run_scan_with_exclude_paths(self, mock_subprocess):
"""Test run_scan with exclude paths"""
provider = IacProvider()
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
}
]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
# Test with exclude paths
exclude_paths = ["node_modules", ".git", "vendor"]
result = provider.run_scan("/test/directory", ["all"], exclude_paths)
# Verify subprocess was called with correct exclude paths
expected_command = [
"checkov",
"-d",
"/test/directory",
"-o",
"json",
"-f",
"all",
"--skip-path",
",".join(exclude_paths),
]
mock_subprocess.assert_called_once_with(
expected_command,
capture_output=True,
text=True,
)
# Verify results
assert len(result) == 1
assert result[0].status == "PASS"
@patch("subprocess.run")
def test_run_scan_all_check_types(self, mock_subprocess):
"""Test run_scan with all types of checks (failed, passed, skipped)"""
provider = IacProvider()
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK, SAMPLE_HIGH_SEVERITY_CHECK],
"passed_checks": [SAMPLE_PASSED_CHECK, SAMPLE_CLOUDFORMATION_CHECK],
"skipped_checks": [SAMPLE_SKIPPED_CHECK],
},
}
]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan("/test/directory", ["all"], [])
# Verify results
@@ -512,69 +475,59 @@ class TestIacProvider:
muted_reports = [report for report in result if report.status == "MUTED"]
assert all(report.muted for report in muted_reports)
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_no_reports_returned(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_no_reports_returned(self, mock_subprocess):
"""Test run_scan when no reports are returned from registry"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
provider = IacProvider()
# Return empty list of reports
mock_registry_instance.run.return_value = []
mock_subprocess.return_value = MagicMock(stdout="[]", stderr="")
provider = IacProvider()
result = provider.run_scan("/test/directory", ["terraform"], [])
# Verify results
assert len(result) == 0
@patch("prowler.providers.iac.iac_provider.RunnerRegistry")
@patch("prowler.providers.iac.iac_provider.RunnerFilter")
@patch("prowler.providers.iac.iac_provider.logger")
def test_run_scan_multiple_frameworks_with_different_checks(
self, mock_logger, mock_runner_filter, mock_runner_registry
):
@patch("subprocess.run")
def test_run_scan_multiple_frameworks_with_different_checks(self, mock_subprocess):
"""Test run_scan with multiple frameworks and different types of checks"""
# Setup mocks
mock_registry_instance = Mock()
mock_runner_registry.return_value = mock_registry_instance
# Create reports for different frameworks
terraform_report = Mock()
terraform_report.check_type = "terraform"
terraform_report.failed_checks = [
SAMPLE_FAILED_CHECK,
SAMPLE_ANOTHER_FAILED_CHECK,
]
terraform_report.passed_checks = [SAMPLE_PASSED_CHECK]
terraform_report.skipped_checks = []
kubernetes_report = Mock()
kubernetes_report.check_type = "kubernetes"
kubernetes_report.failed_checks = [SAMPLE_KUBERNETES_CHECK]
kubernetes_report.passed_checks = []
kubernetes_report.skipped_checks = [SAMPLE_ANOTHER_SKIPPED_CHECK]
cloudformation_report = Mock()
cloudformation_report.check_type = "cloudformation"
cloudformation_report.failed_checks = []
cloudformation_report.passed_checks = [
SAMPLE_CLOUDFORMATION_CHECK,
SAMPLE_ANOTHER_PASSED_CHECK,
]
cloudformation_report.skipped_checks = []
mock_registry_instance.run.return_value = [
terraform_report,
kubernetes_report,
cloudformation_report,
]
provider = IacProvider()
# Create sample output with multiple frameworks and different check types
sample_output = [
{
"check_type": "terraform",
"results": {
"failed_checks": [SAMPLE_FAILED_CHECK, SAMPLE_ANOTHER_FAILED_CHECK],
"passed_checks": [SAMPLE_PASSED_CHECK],
"skipped_checks": [],
},
},
{
"check_type": "kubernetes",
"results": {
"failed_checks": [SAMPLE_KUBERNETES_CHECK],
"passed_checks": [],
"skipped_checks": [SAMPLE_ANOTHER_SKIPPED_CHECK],
},
},
{
"check_type": "cloudformation",
"results": {
"failed_checks": [],
"passed_checks": [
SAMPLE_CLOUDFORMATION_CHECK,
SAMPLE_ANOTHER_PASSED_CHECK,
],
"skipped_checks": [],
},
},
]
mock_subprocess.return_value = MagicMock(
stdout=json.dumps(sample_output), stderr=""
)
result = provider.run_scan(
"/test/directory", ["terraform", "kubernetes", "cloudformation"], []
)