diff --git a/docs/basic-usage/prowler-cli.md b/docs/basic-usage/prowler-cli.md index 525ed596f2..ba455fb405 100644 --- a/docs/basic-usage/prowler-cli.md +++ b/docs/basic-usage/prowler-cli.md @@ -223,7 +223,7 @@ Prowler enables security scanning of your **GitHub account**, including **Reposi ## Infrastructure as Code (IaC) -Prowler's Infrastructure as Code (IaC) provider enables you to scan local or remote infrastructure code for security and compliance issues using [Checkov](https://www.checkov.io/). This provider supports a wide range of IaC frameworks, allowing you to assess your code before deployment. +Prowler's Infrastructure as Code (IaC) provider enables you to scan local or remote infrastructure code for security and compliance issues using [Trivy](https://trivy.dev/). This provider supports a wide range of IaC frameworks, allowing you to assess your code before deployment. ```console # Scan a directory for IaC files @@ -252,6 +252,6 @@ prowler iac --scan-path ./my-iac-directory --exclude-path ./my-iac-directory/tes - For remote repository scans, authentication can be provided via CLI flags or environment variables (`GITHUB_OAUTH_APP_TOKEN`, `GITHUB_USERNAME`, `GITHUB_PERSONAL_ACCESS_TOKEN`). CLI flags take precedence. - The IaC provider does not require cloud authentication for local scans. - It is ideal for CI/CD pipelines and local development environments. - - For more details on supported frameworks and rules, see the [Checkov documentation](https://www.checkov.io/1.Welcome/Quick%20Start.html) + - For more details on supported scanners, see the [Trivy documentation](https://trivy.dev/latest/docs/scanner/vulnerability/) See more details about IaC scanning in the [IaC Tutorial](../tutorials/iac/getting-started-iac.md) section. diff --git a/docs/getting-started/requirements.md b/docs/getting-started/requirements.md index 12c8b3c0d9..916596c9ea 100644 --- a/docs/getting-started/requirements.md +++ b/docs/getting-started/requirements.md @@ -561,7 +561,7 @@ These options provide flexibility for scanning and analyzing your GitHub account ## Infrastructure as Code (IaC) -Prowler's Infrastructure as Code (IaC) provider enables you to scan local or remote infrastructure code for security and compliance issues using [Checkov](https://www.checkov.io/). This provider supports a wide range of IaC frameworks and requires no cloud authentication for local scans. +Prowler's Infrastructure as Code (IaC) provider enables you to scan local or remote infrastructure code for security and compliance issues using [Trivy](https://trivy.dev/). This provider supports a wide range of IaC frameworks and requires no cloud authentication for local scans. ### Authentication @@ -571,21 +571,11 @@ Prowler's Infrastructure as Code (IaC) provider enables you to scan local or rem - [**GitHub OAuth App Token**](https://docs.github.com/en/authentication/keeping-your-account-and-data-secure/managing-your-personal-access-tokens#creating-a-fine-grained-personal-access-token) - [**Git URL**](https://git-scm.com/docs/git-clone#_git_urls) -### Supported Frameworks +### Supported Scanners -The IaC provider leverages Checkov to support multiple frameworks, including: +The IaC provider leverages Trivy to support multiple scanners, including: -- Terraform -- CloudFormation -- Kubernetes -- ARM (Azure Resource Manager) -- Serverless -- Dockerfile -- YAML/JSON (generic IaC) -- Bicep -- Helm -- GitHub Actions, GitLab CI, Bitbucket Pipelines, Azure Pipelines, CircleCI, Argo Workflows -- Ansible -- Kustomize -- OpenAPI -- SAST, SCA (Software Composition Analysis) +- Vulnerability +- Misconfiguration +- Secret +- License diff --git a/docs/tutorials/iac/getting-started-iac.md b/docs/tutorials/iac/getting-started-iac.md index 052f72bae9..4dac4eea56 100644 --- a/docs/tutorials/iac/getting-started-iac.md +++ b/docs/tutorials/iac/getting-started-iac.md @@ -1,32 +1,22 @@ # Getting Started with the IaC Provider -Prowler's Infrastructure as Code (IaC) provider enables you to scan local or remote infrastructure code for security and compliance issues using [Checkov](https://www.checkov.io/). This provider supports a wide range of IaC frameworks, allowing you to assess your code before deployment. +Prowler's Infrastructure as Code (IaC) provider enables you to scan local or remote infrastructure code for security and compliance issues using [Trivy](https://trivy.dev/). This provider supports a wide range of IaC frameworks, allowing you to assess your code before deployment. -## Supported Frameworks +## Supported Scanners -The IaC provider leverages Checkov to support multiple frameworks, including: +The IaC provider leverages Trivy to support multiple scanners, including: -- Terraform -- CloudFormation -- Kubernetes -- ARM (Azure Resource Manager) -- Serverless -- Dockerfile -- YAML/JSON (generic IaC) -- Bicep -- Helm -- GitHub Actions, GitLab CI, Bitbucket Pipelines, Azure Pipelines, CircleCI, Argo Workflows -- Ansible -- Kustomize -- OpenAPI -- SAST, SCA (Software Composition Analysis) +- Vulnerability +- Misconfiguration +- Secret +- License ## How It Works - The IaC provider scans your local directory (or a specified path) for supported IaC files, or scan a remote repository. - No cloud credentials or authentication are required for local scans. - For remote repository scans, authentication can be provided via [git URL](https://git-scm.com/docs/git-clone#_git_urls), CLI flags or environment variables. -- Mutelist logic is handled by Checkov, not Prowler. +- Mutelist logic is handled by Trivy, not Prowler. - Results are output in the same formats as other Prowler providers (CSV, JSON, HTML, etc.). ## Usage @@ -67,12 +57,12 @@ You can provide authentication for private repositories using one of the followi #### Mutually Exclusive Flags - `--scan-path` and `--scan-repository-url` are mutually exclusive. Only one can be specified at a time. -### Specify Frameworks +### Specify Scanners -Scan only Terraform and Kubernetes files: +Scan only vulnerability and misconfiguration scanners: ```sh -prowler iac --scan-path ./my-iac-directory --frameworks terraform kubernetes +prowler iac --scan-path ./my-iac-directory --scanners vuln misconfig ``` ### Exclude Paths @@ -95,4 +85,4 @@ prowler iac --scan-path ./iac --output-formats csv json html - For remote repository scans, authentication is optional but required for private repos. - CLI flags override environment variables for authentication. - It is ideal for CI/CD pipelines and local development environments. -- For more details on supported frameworks and rules, see the [Checkov documentation](https://www.checkov.io/1.Welcome/Quick%20Start.html). +- For more details on supported scanners, see the [Trivy documentation](https://trivy.dev/latest/docs/scanner/vulnerability/). diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 7892cab44f..06d863874c 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -65,6 +65,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - GitHub repository and organization scoping support with `--repository/respositories` and `--organization/organizations` flags [(#8329)](https://github.com/prowler-cloud/prowler/pull/8329) - GCP provider retry configuration [(#8412)](https://github.com/prowler-cloud/prowler/pull/8412) - `s3_bucket_shadow_resource_vulnerability` check for AWS provider [(#8398)](https://github.com/prowler-cloud/prowler/pull/8398) +- Use `trivy` as engine for IaC provider [(#8466)](https://github.com/prowler-cloud/prowler/pull/8466) ### Changed - Handle some AWS errors as warnings instead of errors [(#8347)](https://github.com/prowler-cloud/prowler/pull/8347) diff --git a/prowler/lib/check/checks_loader.py b/prowler/lib/check/checks_loader.py index 3c7e065fc0..c03034b0e4 100644 --- a/prowler/lib/check/checks_loader.py +++ b/prowler/lib/check/checks_loader.py @@ -20,7 +20,7 @@ def load_checks_to_execute( ) -> set: """Generate the list of checks to execute based on the cloud provider and the input arguments given""" try: - # Bypass check loading for IAC provider since it uses Checkov directly + # Bypass check loading for IAC provider since it uses Trivy directly if provider == "iac": return set() diff --git a/prowler/lib/check/models.py b/prowler/lib/check/models.py index 1c2dd44533..de38162cbc 100644 --- a/prowler/lib/check/models.py +++ b/prowler/lib/check/models.py @@ -166,11 +166,11 @@ class CheckMetadata(BaseModel): return service_name @validator("CheckID", pre=True, always=True) - def valid_check_id(cls, check_id): + def valid_check_id(cls, check_id, values): if not check_id: raise ValueError("CheckID must be a non-empty string") - if check_id: + if check_id and values.get("Provider") != "iac": if "-" in check_id: raise ValueError( f"CheckID {check_id} contains a hyphen, which is not allowed" @@ -648,25 +648,34 @@ class CheckReportM365(Check_Report): @dataclass class CheckReportIAC(Check_Report): - """Contains the IAC Check's finding information using Checkov.""" + """Contains the IAC Check's finding information using Trivy.""" resource_name: str - resource_path: str resource_line_range: str - def __init__(self, metadata: dict = {}, finding: dict = {}) -> None: + def __init__( + self, metadata: dict = {}, finding: dict = {}, file_path: str = "" + ) -> None: """ - Initialize the IAC Check's finding information from a Checkov failed_check dict. + Initialize the IAC Check's finding information from a Trivy misconfiguration dict. Args: metadata (Dict): Optional check metadata (can be None). - failed_check (dict): A single failed_check result from Checkov's JSON output. + finding (dict): A single misconfiguration result from Trivy's JSON output. """ super().__init__(metadata, finding) - self.resource_name = getattr(finding, "resource", "") - self.resource_path = getattr(finding, "file_path", "") - self.resource_line_range = getattr(finding, "file_line_range", "") + self.resource = finding + self.resource_name = file_path + self.resource_line_range = ( + ( + str(finding.get("CauseMetadata", {}).get("StartLine", "")) + + ":" + + str(finding.get("CauseMetadata", {}).get("EndLine", "")) + ) + if finding.get("CauseMetadata", {}).get("StartLine", "") + else "" + ) @dataclass diff --git a/prowler/lib/check/utils.py b/prowler/lib/check/utils.py index bf8854600d..cab19486d5 100644 --- a/prowler/lib/check/utils.py +++ b/prowler/lib/check/utils.py @@ -14,7 +14,7 @@ def recover_checks_from_provider( Returns a list of tuples with the following format (check_name, check_path) """ try: - # Bypass check loading for IAC provider since it uses Checkov directly + # Bypass check loading for IAC provider since it uses Trivy directly if provider == "iac": return [] @@ -63,7 +63,7 @@ def recover_checks_from_service(service_list: list, provider: str) -> set: Returns a set of checks from the given services """ try: - # Bypass check loading for IAC provider since it uses Checkov directly + # Bypass check loading for IAC provider since it uses Trivy directly if provider == "iac": return set() diff --git a/prowler/lib/outputs/finding.py b/prowler/lib/outputs/finding.py index c7e1e1f05b..f686554a88 100644 --- a/prowler/lib/outputs/finding.py +++ b/prowler/lib/outputs/finding.py @@ -297,9 +297,9 @@ class Finding(BaseModel): output_data["auth_method"] = provider.auth_method output_data["account_uid"] = "iac" output_data["account_name"] = "iac" - output_data["resource_name"] = check_output.resource["resource"] - output_data["resource_uid"] = check_output.resource["resource"] - output_data["region"] = check_output.resource_path + output_data["resource_name"] = check_output.resource_name + output_data["resource_uid"] = check_output.resource_name + output_data["region"] = check_output.resource_line_range output_data["resource_line_range"] = check_output.resource_line_range output_data["framework"] = check_output.check_metadata.ServiceName diff --git a/prowler/lib/outputs/html/html.py b/prowler/lib/outputs/html/html.py index d7006b8cad..4e05fe0e8a 100644 --- a/prowler/lib/outputs/html/html.py +++ b/prowler/lib/outputs/html/html.py @@ -41,7 +41,7 @@ class HTML(Output): {finding_status} {finding.metadata.Severity.value} {finding.metadata.ServiceName} - {":".join([finding.resource_metadata["file_path"], "-".join(map(str, finding.resource_metadata["file_line_range"]))]) if finding.metadata.Provider == "iac" else finding.region.lower()} + {finding.region.lower()} {finding.metadata.CheckID.replace("_", "_")} {finding.metadata.CheckTitle} {finding.resource_uid.replace("<", "<").replace(">", ">").replace("_", "_")} @@ -204,7 +204,7 @@ class HTML(Output): Status Severity Service Name - {"File" if provider.type == "iac" else "Region"} + {"Line Range" if provider.type == "iac" else "Region"} Check ID Check Title Resource ID diff --git a/prowler/lib/outputs/summary_table.py b/prowler/lib/outputs/summary_table.py index 2e70115256..a6e011755a 100644 --- a/prowler/lib/outputs/summary_table.py +++ b/prowler/lib/outputs/summary_table.py @@ -86,6 +86,8 @@ def display_summary_table( "Muted": [], } pass_count = fail_count = muted_count = 0 + # Sort findings by ServiceName + findings.sort(key=lambda x: x.check_metadata.ServiceName) for finding in findings: # If new service and not first, add previous row if ( diff --git a/prowler/providers/common/provider.py b/prowler/providers/common/provider.py index 5f3ee07160..1b2de71148 100644 --- a/prowler/providers/common/provider.py +++ b/prowler/providers/common/provider.py @@ -252,7 +252,7 @@ class Provider(ABC): provider_class( scan_path=arguments.scan_path, scan_repository_url=arguments.scan_repository_url, - frameworks=arguments.frameworks, + scanners=arguments.scanners, exclude_path=arguments.exclude_path, config_path=arguments.config_file, fixer_config=fixer_config, diff --git a/prowler/providers/iac/iac_provider.py b/prowler/providers/iac/iac_provider.py index 908f307967..cced114872 100644 --- a/prowler/providers/iac/iac_provider.py +++ b/prowler/providers/iac/iac_provider.py @@ -29,7 +29,7 @@ class IacProvider(Provider): self, scan_path: str = ".", scan_repository_url: str = None, - frameworks: list[str] = ["all"], + scanners: list[str] = ["vuln", "misconfig", "secret"], exclude_path: list[str] = [], config_path: str = None, config_content: dict = None, @@ -42,7 +42,7 @@ class IacProvider(Provider): self.scan_path = scan_path self.scan_repository_url = scan_repository_url - self.frameworks = frameworks + self.scanners = scanners self.exclude_path = exclude_path self.region = "global" self.audited_account = "local-iac" @@ -90,7 +90,7 @@ class IacProvider(Provider): # Fixer Config self._fixer_config = fixer_config - # Mutelist (not needed for IAC since Checkov has its own mutelist logic) + # Mutelist (not needed for IAC since Trivy has its own mutelist logic) self._mutelist = None self.audit_metadata = Audit_Metadata( @@ -131,41 +131,50 @@ class IacProvider(Provider): return self._fixer_config def setup_session(self): - """IAC provider doesn't need a session since it uses Checkov directly""" + """IAC provider doesn't need a session since it uses Trivy directly""" return None - def _process_check(self, finding: dict, check: dict, status: str) -> CheckReportIAC: + def _process_finding( + self, finding: dict, file_path: str, type: str + ) -> CheckReportIAC: """ Process a single check (failed or passed) and create a CheckReportIAC object. Args: - finding: The finding object from Checkov output - check: The individual check data (failed_check or passed_check) - status: The status of the check ("FAIL" or "PASS") + finding: The finding object from Trivy output + file_path: The path to the file that contains the finding + type: The type of the finding Returns: CheckReportIAC: The processed check report """ try: + if "VulnerabilityID" in finding: + finding_id = finding["VulnerabilityID"] + finding_description = finding["Description"] + finding_status = finding.get("Status", "FAIL") + elif "RuleID" in finding: + finding_id = finding["RuleID"] + finding_description = finding["Title"] + finding_status = finding.get("Status", "FAIL") + else: + finding_id = finding["ID"] + finding_description = finding["Description"] + finding_status = finding["Status"] + metadata_dict = { "Provider": "iac", - "CheckID": check.get("check_id", ""), - "CheckTitle": check.get("check_name", ""), + "CheckID": finding_id, + "CheckTitle": finding["Title"], "CheckType": ["Infrastructure as Code"], - "ServiceName": finding["check_type"], + "ServiceName": type, "SubServiceName": "", "ResourceIdTemplate": "", - "Severity": ( - check.get("severity", "low").lower() - if check.get("severity") - else "low" - ), + "Severity": finding["Severity"], "ResourceType": "iac", - "Description": check.get("check_name", ""), + "Description": finding_description, "Risk": "", - "RelatedUrl": ( - check.get("guideline", "") if check.get("guideline") else "" - ), + "RelatedUrl": finding.get("PrimaryURL", ""), "Remediation": { "Code": { "NativeIaC": "", @@ -174,10 +183,8 @@ class IacProvider(Provider): "Other": "", }, "Recommendation": { - "Text": "", - "Url": ( - check.get("guideline", "") if check.get("guideline") else "" - ), + "Text": finding.get("Resolution", ""), + "Url": finding.get("PrimaryURL", ""), }, }, "Categories": [], @@ -189,11 +196,16 @@ class IacProvider(Provider): # Convert metadata dict to JSON string metadata = json.dumps(metadata_dict) - report = CheckReportIAC(metadata=metadata, finding=check) - report.status = status - report.resource_tags = check.get("entity_tags", {}) - report.status_extended = check.get("check_name", "") - if status == "MUTED": + report = CheckReportIAC( + metadata=metadata, finding=finding, file_path=file_path + ) + report.status = finding_status + report.status_extended = ( + finding.get("Message", "") + if finding.get("Message") + else finding.get("Description", "") + ) + if finding_status == "MUTED": report.muted = True return report except Exception as error: @@ -213,6 +225,8 @@ class IacProvider(Provider): Clone a git repository to a temporary directory, supporting GitHub authentication. """ try: + original_url = repository_url + if github_username and personal_access_token: repository_url = repository_url.replace( "https://github.com/", @@ -226,7 +240,7 @@ class IacProvider(Provider): temporary_directory = tempfile.mkdtemp() logger.info( - f"Cloning repository {repository_url} into {temporary_directory}..." + f"Cloning repository {original_url} into {temporary_directory}..." ) with alive_bar( ctrl_c=False, @@ -236,7 +250,7 @@ class IacProvider(Provider): enrich_print=False, ) as bar: try: - bar.title = f"-> Cloning {repository_url}..." + bar.title = f"-> Cloning {original_url}..." porcelain.clone(repository_url, temporary_directory, depth=1) bar.title = "-> Repository cloned successfully!" except Exception as clone_error: @@ -261,7 +275,7 @@ class IacProvider(Provider): scan_dir = self.scan_path try: - reports = self.run_scan(scan_dir, self.frameworks, self.exclude_path) + reports = self.run_scan(scan_dir, self.scanners, self.exclude_path) finally: if temp_dir: logger.info(f"Removing temporary directory {temp_dir}...") @@ -270,35 +284,76 @@ class IacProvider(Provider): return reports def run_scan( - self, directory: str, frameworks: list[str], exclude_path: list[str] + self, directory: str, scanners: list[str], exclude_path: list[str] ) -> List[CheckReportIAC]: try: logger.info(f"Running IaC scan on {directory} ...") - checkov_command = [ - "checkov", - "-d", + trivy_command = [ + "trivy", + "fs", directory, - "-o", + "--format", "json", - "-f", - ",".join(frameworks), + "--scanners", + ",".join(scanners), + "--parallel", + "0", + "--include-non-failures", ] if exclude_path: - checkov_command.extend(["--skip-path", ",".join(exclude_path)]) - # Run Checkov with JSON output - process = subprocess.run( - checkov_command, - capture_output=True, - text=True, - ) - # Log Checkov's error output if any + trivy_command.extend(["--skip-dirs", ",".join(exclude_path)]) + with alive_bar( + ctrl_c=False, + bar="blocks", + spinner="classic", + stats=False, + enrich_print=False, + ) as bar: + try: + bar.title = f"-> Running IaC scan on {directory} ..." + # Run Trivy with JSON output + process = subprocess.run( + trivy_command, + capture_output=True, + text=True, + ) + bar.title = "-> Scan completed!" + except Exception as error: + bar.title = "-> Scan failed!" + raise error + # Log Trivy's stderr output with preserved log levels if process.stderr: - logger.error(process.stderr) + for line in process.stderr.strip().split("\n"): + if line.strip(): + # Parse Trivy's log format to extract level and message + # Trivy format: timestamp level message + parts = line.split() + if len(parts) >= 3: + # Extract level and message + level = parts[1] + message = " ".join(parts[2:]) + + # Map Trivy log levels to Python logging levels + if level == "ERROR": + logger.error(f"{message}") + elif level == "WARN": + logger.warning(f"{message}") + elif level == "INFO": + logger.info(f"{message}") + elif level == "DEBUG": + logger.debug(f"{message}") + else: + # Default to info for unknown levels + logger.info(f"{message}") + else: + # If we can't parse the format, log as info + logger.info(f"{line}") try: - output = json.loads(process.stdout) + output = json.loads(process.stdout)["Results"] + if not output: - logger.warning("No findings returned from Checkov scan") + logger.warning("No findings returned from Trivy scan") return [] except Exception as error: logger.critical( @@ -308,37 +363,41 @@ class IacProvider(Provider): reports = [] - # If only one framework has findings, the output is a dict, otherwise it's a list of dicts - if isinstance(output, dict): - output = [output] - - # Process all frameworks findings + # Process all trivy findings for finding in output: - results = finding.get("results", {}) - # Process failed checks - failed_checks = results.get("failed_checks", []) - for failed_check in failed_checks: - report = self._process_check(finding, failed_check, "FAIL") + # Process Misconfigurations + for misconfiguration in finding.get("Misconfigurations", []): + report = self._process_finding( + misconfiguration, finding["Target"], finding["Type"] + ) reports.append(report) - - # Process passed checks - passed_checks = results.get("passed_checks", []) - for passed_check in passed_checks: - report = self._process_check(finding, passed_check, "PASS") + # Process Vulnerabilities + for vulnerability in finding.get("Vulnerabilities", []): + report = self._process_finding( + vulnerability, finding["Target"], finding["Type"] + ) reports.append(report) - - # Process skipped checks (muted) - skipped_checks = results.get("skipped_checks", []) - for skipped_check in skipped_checks: - report = self._process_check(finding, skipped_check, "MUTED") + # Process Secrets + for secret in finding.get("Secrets", []): + report = self._process_finding( + secret, finding["Target"], finding["Class"] + ) + reports.append(report) + # Process Licenses + for license in finding.get("Licenses", []): + report = self._process_finding( + license, finding["Target"], finding["Type"] + ) reports.append(report) return reports except Exception as error: - if "No such file or directory: 'checkov'" in str(error): - logger.critical("Please, install checkov using 'pip install checkov'") + if "No such file or directory: 'trivy'" in str(error): + logger.critical( + "Trivy binary not found. Please install Trivy from https://trivy.dev/latest/getting-started/installation/ or use your system package manager (e.g., 'brew install trivy' on macOS, 'apt-get install trivy' on Ubuntu)" + ) sys.exit(1) logger.critical( f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}" @@ -367,7 +426,7 @@ class IacProvider(Provider): ) report_lines.append( - f"Frameworks: {Fore.YELLOW}{', '.join(self.frameworks)}{Style.RESET_ALL}" + f"Scanners: {Fore.YELLOW}{', '.join(self.scanners)}{Style.RESET_ALL}" ) report_lines.append( diff --git a/prowler/providers/iac/lib/arguments/arguments.py b/prowler/providers/iac/lib/arguments/arguments.py index 6adadeca86..30ebeb8db2 100644 --- a/prowler/providers/iac/lib/arguments/arguments.py +++ b/prowler/providers/iac/lib/arguments/arguments.py @@ -1,33 +1,8 @@ -FRAMEWORK_CHOICES = [ - "ansible", - "argo_workflows", - "arm", - "azure_pipelines", - "bicep", - "bitbucket", - "bitbucket_pipelines", - "cdk", - "circleci_pipelines", - "cloudformation", - "dockerfile", - "github", - "github_actions", - "gitlab", - "gitlab_ci", - "helm", - "json_doc", - "kubernetes", - "kustomize", - "openapi", - "policies_3d", - "sast", - "sca_image", - "sca_package_2", - "secrets", - "serverless", - "terraform", - "terraform_json", - "yaml_doc", +SCANNERS_CHOICES = [ + "vuln", + "misconfig", + "secret", + "license", ] @@ -56,14 +31,13 @@ def init_parser(self): ) iac_scan_subparser.add_argument( - "--frameworks", - "-f", - "--framework", - dest="frameworks", + "--scanners", + "--scanner", + dest="scanners", nargs="+", - default=["all"], - choices=FRAMEWORK_CHOICES, - help="Comma-separated list of frameworks to scan. Default: all", + default=["vuln", "misconfig", "secret"], + choices=SCANNERS_CHOICES, + help="Comma-separated list of scanners to scan. Default: vuln, misconfig, secret", ) iac_scan_subparser.add_argument( "--exclude-path", diff --git a/tests/lib/outputs/finding_test.py b/tests/lib/outputs/finding_test.py index 52c0ae661e..8a7b05ceae 100644 --- a/tests/lib/outputs/finding_test.py +++ b/tests/lib/outputs/finding_test.py @@ -661,7 +661,7 @@ class TestFinding: check_output.file_path = "/path/to/iac/file.tf" check_output.resource_name = "aws_s3_bucket.example" check_output.resource_path = "/path/to/iac/file.tf" - check_output.file_line_range = [1, 5] + check_output.resource_line_range = "1:5" check_output.resource = { "resource": "aws_s3_bucket.example", "value": {}, @@ -685,7 +685,7 @@ class TestFinding: assert finding_output.auth_method == "No auth" assert finding_output.resource_name == "aws_s3_bucket.example" assert finding_output.resource_uid == "aws_s3_bucket.example" - assert finding_output.region == "/path/to/iac/file.tf" + assert finding_output.region == "1:5" assert finding_output.status == Status.PASS assert finding_output.status_extended == "mock_status_extended" assert finding_output.muted is False diff --git a/tests/providers/iac/iac_fixtures.py b/tests/providers/iac/iac_fixtures.py index 933aa4a2fd..9e4c1105f4 100644 --- a/tests/providers/iac/iac_fixtures.py +++ b/tests/providers/iac/iac_fixtures.py @@ -1,167 +1,290 @@ # IAC Provider Constants DEFAULT_SCAN_PATH = "." -# Sample Checkov Output -SAMPLE_CHECKOV_OUTPUT = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [ +# Sample Trivy Output +SAMPLE_TRIVY_OUTPUT = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [ { - "check_id": "CKV_AWS_1", - "check_name": "Ensure S3 bucket has encryption enabled", - "guideline": "https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled", - "severity": "low", + "ID": "AVD-AWS-0001", + "Title": "S3 bucket should have encryption enabled", + "Description": "S3 bucket should have encryption enabled", + "Message": "S3 bucket should have encryption enabled", + "Resolution": "Enable encryption on the S3 bucket", + "Severity": "LOW", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0001", + "RuleID": "AVD-AWS-0001", }, { - "check_id": "CKV_AWS_2", - "check_name": "Ensure S3 bucket has public access blocked", - "guideline": "https://docs.bridgecrew.io/docs/s3_2-s3-bucket-has-public-access-blocked", - "severity": "low", + "ID": "AVD-AWS-0002", + "Title": "S3 bucket should have public access blocked", + "Description": "S3 bucket should have public access blocked", + "Message": "S3 bucket should have public access blocked", + "Resolution": "Block public access on the S3 bucket", + "Severity": "LOW", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0002", + "RuleID": "AVD-AWS-0002", }, ], - "passed_checks": [ + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], + }, + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [ { - "check_id": "CKV_AWS_3", - "check_name": "Ensure S3 bucket has versioning enabled", - "guideline": "https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled", - "severity": "low", + "ID": "AVD-AWS-0003", + "Title": "S3 bucket should have versioning enabled", + "Description": "S3 bucket should have versioning enabled", + "Message": "S3 bucket should have versioning enabled", + "Resolution": "Enable versioning on the S3 bucket", + "Severity": "LOW", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0003", + "RuleID": "AVD-AWS-0003", } ], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], }, - } -] + ] +} # Sample Finding Data -SAMPLE_FINDING = SAMPLE_CHECKOV_OUTPUT[0] +SAMPLE_FINDING = SAMPLE_TRIVY_OUTPUT["Results"][0] SAMPLE_FAILED_CHECK = { - "check_id": "CKV_AWS_1", - "check_name": "Ensure S3 bucket has encryption enabled", - "guideline": "https://docs.bridgecrew.io/docs/s3_1-s3-bucket-has-encryption-enabled", - "severity": "low", + "ID": "AVD-AWS-0001", + "Title": "S3 bucket should have encryption enabled", + "Description": "S3 bucket should have encryption enabled", + "Message": "S3 bucket should have encryption enabled", + "Resolution": "Enable encryption on the S3 bucket", + "Severity": "low", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0001", + "RuleID": "AVD-AWS-0001", } SAMPLE_PASSED_CHECK = { - "check_id": "CKV_AWS_3", - "check_name": "Ensure S3 bucket has versioning enabled", - "guideline": "https://docs.bridgecrew.io/docs/s3_3-s3-bucket-has-versioning-enabled", - "severity": "low", + "ID": "AVD-AWS-0003", + "Title": "S3 bucket should have versioning enabled", + "Description": "S3 bucket should have versioning enabled", + "Message": "S3 bucket should have versioning enabled", + "Resolution": "Enable versioning on the S3 bucket", + "Severity": "low", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0003", + "RuleID": "AVD-AWS-0003", } # Additional sample checks SAMPLE_ANOTHER_FAILED_CHECK = { - "check_id": "CKV_AWS_4", - "check_name": "Ensure S3 bucket has logging enabled", - "guideline": "https://docs.bridgecrew.io/docs/s3_4-s3-bucket-has-logging-enabled", - "severity": "medium", + "ID": "AVD-AWS-0004", + "Title": "S3 bucket should have logging enabled", + "Description": "S3 bucket should have logging enabled", + "Message": "S3 bucket should have logging enabled", + "Resolution": "Enable logging on the S3 bucket", + "Severity": "medium", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0004", + "RuleID": "AVD-AWS-0004", } SAMPLE_ANOTHER_PASSED_CHECK = { - "check_id": "CKV_AWS_5", - "check_name": "Ensure S3 bucket has lifecycle policy", - "guideline": "https://docs.bridgecrew.io/docs/s3_5-s3-bucket-has-lifecycle-policy", - "severity": "low", + "ID": "AVD-AWS-0005", + "Title": "S3 bucket should have lifecycle policy", + "Description": "S3 bucket should have lifecycle policy", + "Message": "S3 bucket should have lifecycle policy", + "Resolution": "Configure lifecycle policy on the S3 bucket", + "Severity": "low", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0005", + "RuleID": "AVD-AWS-0005", } SAMPLE_ANOTHER_SKIPPED_CHECK = { - "check_id": "CKV_AWS_6", - "check_name": "Ensure S3 bucket has object lock enabled", - "guideline": "https://docs.bridgecrew.io/docs/s3_6-s3-bucket-has-object-lock-enabled", - "severity": "high", - "suppress_comment": "Not applicable for this use case", + "ID": "AVD-AWS-0006", + "Title": "S3 bucket should have object lock enabled", + "Description": "S3 bucket should have object lock enabled", + "Message": "S3 bucket should have object lock enabled", + "Resolution": "Enable object lock on the S3 bucket", + "Severity": "high", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0006", + "RuleID": "AVD-AWS-0006", + "Status": "MUTED", } SAMPLE_SKIPPED_CHECK = { - "check_id": "CKV_AWS_7", - "check_name": "Ensure S3 bucket has server-side encryption", - "guideline": "https://docs.bridgecrew.io/docs/s3_7-s3-bucket-has-server-side-encryption", - "severity": "medium", - "suppress_comment": "Legacy bucket, will be migrated", + "ID": "AVD-AWS-0007", + "Title": "S3 bucket should have server-side encryption", + "Description": "S3 bucket should have server-side encryption", + "Message": "S3 bucket should have server-side encryption", + "Resolution": "Enable server-side encryption on the S3 bucket", + "Severity": "medium", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0007", + "RuleID": "AVD-AWS-0007", + "Status": "MUTED", } SAMPLE_HIGH_SEVERITY_CHECK = { - "check_id": "CKV_AWS_8", - "check_name": "Ensure S3 bucket has public access blocked", - "guideline": "https://docs.bridgecrew.io/docs/s3_8-s3-bucket-has-public-access-blocked", - "severity": "high", + "ID": "AVD-AWS-0008", + "Title": "S3 bucket should have public access blocked", + "Description": "S3 bucket should have public access blocked", + "Message": "S3 bucket should have public access blocked", + "Resolution": "Block public access on the S3 bucket", + "Severity": "high", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/s3/avd-aws-0008", + "RuleID": "AVD-AWS-0008", } # Dockerfile samples SAMPLE_DOCKERFILE_REPORT = { - "check_type": "dockerfile", - "results": { - "failed_checks": [ - { - "check_id": "CKV_DOCKER_1", - "check_name": "Ensure base image is not using latest tag", - "guideline": "https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag", - "severity": "medium", - } - ], - "passed_checks": [], - }, + "Target": "Dockerfile", + "Type": "dockerfile", + "Misconfigurations": [ + { + "ID": "AVD-DOCKER-0001", + "Title": "Base image should not use latest tag", + "Description": "Base image should not use latest tag", + "Message": "Base image should not use latest tag", + "Resolution": "Use a specific version tag instead of latest", + "Severity": "medium", + "PrimaryURL": "https://avd.aquasec.com/misconfig/docker/dockerfile/avd-docker-0001", + "RuleID": "AVD-DOCKER-0001", + } + ], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], } SAMPLE_DOCKERFILE_CHECK = { - "check_id": "CKV_DOCKER_1", - "check_name": "Ensure base image is not using latest tag", - "guideline": "https://docs.bridgecrew.io/docs/docker_1-base-image-not-using-latest-tag", - "severity": "medium", + "ID": "AVD-DOCKER-0001", + "Title": "Base image should not use latest tag", + "Description": "Base image should not use latest tag", + "Message": "Base image should not use latest tag", + "Resolution": "Use a specific version tag instead of latest", + "Severity": "medium", + "PrimaryURL": "https://avd.aquasec.com/misconfig/docker/dockerfile/avd-docker-0001", + "RuleID": "AVD-DOCKER-0001", } # YAML samples SAMPLE_YAML_REPORT = { - "check_type": "yaml", - "results": { - "failed_checks": [ - { - "check_id": "CKV_K8S_1", - "check_name": "Ensure API server is not exposed", - "guideline": "https://docs.bridgecrew.io/docs/k8s_1-api-server-not-exposed", - "severity": "high", - } - ], - "passed_checks": [], - }, + "Target": "deployment.yaml", + "Type": "kubernetes", + "Misconfigurations": [ + { + "ID": "AVD-K8S-0001", + "Title": "API server should not be exposed", + "Description": "API server should not be exposed", + "Message": "API server should not be exposed", + "Resolution": "Do not expose the API server", + "Severity": "high", + "PrimaryURL": "https://avd.aquasec.com/misconfig/kubernetes/avd-k8s-0001", + "RuleID": "AVD-K8S-0001", + } + ], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], } SAMPLE_YAML_CHECK = { - "check_id": "CKV_K8S_1", - "check_name": "Ensure API server is not exposed", - "guideline": "https://docs.bridgecrew.io/docs/k8s_1-api-server-not-exposed", - "severity": "high", + "ID": "AVD-K8S-0001", + "Title": "API server should not be exposed", + "Description": "API server should not be exposed", + "Message": "API server should not be exposed", + "Resolution": "Do not expose the API server", + "Severity": "high", + "PrimaryURL": "https://avd.aquasec.com/misconfig/kubernetes/avd-k8s-0001", + "RuleID": "AVD-K8S-0001", } # CloudFormation samples SAMPLE_CLOUDFORMATION_CHECK = { - "check_id": "CKV_AWS_9", - "check_name": "Ensure CloudFormation stack has drift detection enabled", - "guideline": "https://docs.bridgecrew.io/docs/aws_9-cloudformation-stack-has-drift-detection-enabled", - "severity": "low", + "ID": "AVD-AWS-0009", + "Title": "CloudFormation stack should have drift detection enabled", + "Description": "CloudFormation stack should have drift detection enabled", + "Message": "CloudFormation stack should have drift detection enabled", + "Resolution": "Enable drift detection on the CloudFormation stack", + "Severity": "low", + "PrimaryURL": "https://avd.aquasec.com/misconfig/aws/cloudformation/avd-aws-0009", + "RuleID": "AVD-AWS-0009", } # Kubernetes samples SAMPLE_KUBERNETES_CHECK = { - "check_id": "CKV_K8S_2", - "check_name": "Ensure RBAC is enabled", - "guideline": "https://docs.bridgecrew.io/docs/k8s_2-rbac-enabled", - "severity": "medium", + "ID": "AVD-K8S-0002", + "Title": "RBAC should be enabled", + "Description": "RBAC should be enabled", + "Message": "RBAC should be enabled", + "Resolution": "Enable RBAC on the cluster", + "Severity": "medium", + "PrimaryURL": "https://avd.aquasec.com/misconfig/kubernetes/avd-k8s-0002", + "RuleID": "AVD-K8S-0002", +} + +# Sample Trivy output with vulnerabilities +SAMPLE_TRIVY_VULNERABILITY_OUTPUT = { + "Results": [ + { + "Target": "package.json", + "Type": "nodejs", + "Misconfigurations": [], + "Vulnerabilities": [ + { + "VulnerabilityID": "CVE-2023-1234", + "Title": "Example vulnerability", + "Description": "This is an example vulnerability", + "Severity": "high", + "PrimaryURL": "https://example.com/cve-2023-1234", + } + ], + "Secrets": [], + "Licenses": [], + } + ] +} + +# Sample Trivy output with secrets +SAMPLE_TRIVY_SECRET_OUTPUT = { + "Results": [ + { + "Target": "config.yaml", + "Class": "secret", + "Misconfigurations": [], + "Vulnerabilities": [], + "Secrets": [ + { + "ID": "aws-access-key-id", + "Title": "AWS Access Key ID", + "Description": "AWS Access Key ID found in configuration", + "Severity": "critical", + "PrimaryURL": "https://example.com/secret-aws-access-key-id", + } + ], + "Licenses": [], + } + ] } -def get_sample_checkov_json_output(): - """Return sample Checkov JSON output as string""" +def get_sample_trivy_json_output(): + """Return sample Trivy JSON output as string""" import json - return json.dumps(SAMPLE_CHECKOV_OUTPUT) + return json.dumps(SAMPLE_TRIVY_OUTPUT) -def get_empty_checkov_output(): - """Return empty Checkov output as string""" - return "[]" +def get_empty_trivy_output(): + """Return empty Trivy output as string""" + import json + + return json.dumps({"Results": []}) -def get_invalid_checkov_output(): +def get_invalid_trivy_output(): """Return invalid JSON output as string""" return "invalid json output" diff --git a/tests/providers/iac/iac_provider_test.py b/tests/providers/iac/iac_provider_test.py index d6803289e2..7f89ff7a49 100644 --- a/tests/providers/iac/iac_provider_test.py +++ b/tests/providers/iac/iac_provider_test.py @@ -15,18 +15,15 @@ from tests.providers.iac.iac_fixtures import ( SAMPLE_ANOTHER_SKIPPED_CHECK, SAMPLE_CLOUDFORMATION_CHECK, SAMPLE_DOCKERFILE_CHECK, - SAMPLE_DOCKERFILE_REPORT, SAMPLE_FAILED_CHECK, - SAMPLE_FINDING, SAMPLE_HIGH_SEVERITY_CHECK, SAMPLE_KUBERNETES_CHECK, SAMPLE_PASSED_CHECK, SAMPLE_SKIPPED_CHECK, SAMPLE_YAML_CHECK, - SAMPLE_YAML_REPORT, - get_empty_checkov_output, - get_invalid_checkov_output, - get_sample_checkov_json_output, + get_empty_trivy_output, + get_invalid_trivy_output, + get_sample_trivy_json_output, ) @@ -51,73 +48,85 @@ class TestIacProvider: assert provider._type == "iac" assert provider.scan_path == custom_path - def test_iac_provider_process_check_failed(self): - """Test processing a failed check""" + def test_iac_provider_process_finding_failed(self): + """Test processing a failed finding""" provider = IacProvider() - report = provider._process_check(SAMPLE_FINDING, SAMPLE_FAILED_CHECK, "FAIL") + report = provider._process_finding(SAMPLE_FAILED_CHECK, "main.tf", "terraform") assert isinstance(report, CheckReportIAC) assert report.status == "FAIL" assert report.check_metadata.Provider == "iac" - assert report.check_metadata.CheckID == SAMPLE_FAILED_CHECK["check_id"] - assert report.check_metadata.CheckTitle == SAMPLE_FAILED_CHECK["check_name"] + assert report.check_metadata.CheckID == SAMPLE_FAILED_CHECK["ID"] + assert report.check_metadata.CheckTitle == SAMPLE_FAILED_CHECK["Title"] assert report.check_metadata.Severity == "low" - assert report.check_metadata.RelatedUrl == SAMPLE_FAILED_CHECK["guideline"] + assert report.check_metadata.RelatedUrl == SAMPLE_FAILED_CHECK["PrimaryURL"] - def test_iac_provider_process_check_passed(self): - """Test processing a passed check""" + def test_iac_provider_process_finding_passed(self): + """Test processing a passed finding""" provider = IacProvider() - report = provider._process_check(SAMPLE_FINDING, SAMPLE_PASSED_CHECK, "PASS") + report = provider._process_finding(SAMPLE_PASSED_CHECK, "main.tf", "terraform") assert isinstance(report, CheckReportIAC) - assert report.status == "PASS" + assert report.status == "FAIL" # Trivy findings are always FAIL by default assert report.check_metadata.Provider == "iac" - assert report.check_metadata.CheckID == SAMPLE_PASSED_CHECK["check_id"] - assert report.check_metadata.CheckTitle == SAMPLE_PASSED_CHECK["check_name"] + assert report.check_metadata.CheckID == SAMPLE_PASSED_CHECK["ID"] + assert report.check_metadata.CheckTitle == SAMPLE_PASSED_CHECK["Title"] assert report.check_metadata.Severity == "low" @patch("subprocess.run") def test_iac_provider_run_scan_success(self, mock_subprocess): - """Test successful IAC scan with Checkov""" + """Test successful IAC scan with Trivy""" provider = IacProvider() mock_subprocess.return_value = MagicMock( - stdout=get_sample_checkov_json_output(), stderr="" + stdout=get_sample_trivy_json_output(), stderr="" ) - reports = provider.run_scan("/test/directory", ["all"], []) + reports = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) - # Should have 2 failed checks + 1 passed check = 3 total reports + # Should have 3 misconfigurations from the sample output assert len(reports) == 3 - # Check that we have both failed and passed reports + # Check that we have failed reports (Trivy findings are always FAIL by default) failed_reports = [r for r in reports if r.status == "FAIL"] - passed_reports = [r for r in reports if r.status == "PASS"] - - assert len(failed_reports) == 2 - assert len(passed_reports) == 1 + assert len(failed_reports) == 3 # Verify subprocess was called correctly mock_subprocess.assert_called_once_with( - ["checkov", "-d", "/test/directory", "-o", "json", "-f", "all"], + [ + "trivy", + "fs", + "/test/directory", + "--format", + "json", + "--scanners", + "vuln,misconfig,secret", + "--parallel", + "0", + "--include-non-failures", + ], capture_output=True, text=True, ) @patch("subprocess.run") def test_iac_provider_run_scan_empty_output(self, mock_subprocess): - """Test IAC scan with empty Checkov output""" + """Test IAC scan with empty Trivy output""" provider = IacProvider() mock_subprocess.return_value = MagicMock( - stdout=get_empty_checkov_output(), stderr="" + stdout=get_empty_trivy_output(), stderr="" ) - reports = provider.run_scan("/test/directory", ["all"], []) + reports = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) assert len(reports) == 0 def test_provider_run_local_scan(self): @@ -127,7 +136,9 @@ class TestIacProvider: "prowler.providers.iac.iac_provider.IacProvider.run_scan", ) as mock_run_scan: provider.run() - mock_run_scan.assert_called_with(scan_path, ["all"], []) + mock_run_scan.assert_called_with( + scan_path, ["vuln", "misconfig", "secret"], [] + ) @mock.patch.dict(os.environ, {}, clear=True) def test_provider_run_remote_scan(self): @@ -145,7 +156,9 @@ class TestIacProvider: ): provider.run() mock_clone.assert_called_with(scan_repository_url, None, None, None) - mock_run_scan.assert_called_with(temp_dir, ["all"], []) + mock_run_scan.assert_called_with( + temp_dir, ["vuln", "misconfig", "secret"], [] + ) @mock.patch.dict(os.environ, {}, clear=True) def test_print_credentials_local(self): @@ -183,7 +196,7 @@ class TestIacProvider: provider = IacProvider() mock_subprocess.return_value = MagicMock( - stdout=get_invalid_checkov_output(), stderr="" + stdout=get_invalid_trivy_output(), stderr="" ) with pytest.raises(SystemExit) as excinfo: @@ -193,92 +206,102 @@ class TestIacProvider: @patch("subprocess.run") def test_iac_provider_run_scan_null_output(self, mock_subprocess): - """Test IAC scan with null Checkov output""" + """Test IAC scan with null Trivy output""" provider = IacProvider() mock_subprocess.return_value = MagicMock(stdout="null", stderr="") - reports = provider.run_scan("/test/directory", ["all"], []) - assert len(reports) == 0 + with pytest.raises(SystemExit) as exc_info: + provider.run_scan("/test/directory", ["vuln", "misconfig", "secret"], []) + assert exc_info.value.code == 1 - def test_iac_provider_process_check_dockerfile(self): - """Test processing a Dockerfile check""" + def test_iac_provider_process_finding_dockerfile(self): + """Test processing a Dockerfile finding""" provider = IacProvider() - report = provider._process_check( - SAMPLE_DOCKERFILE_REPORT, SAMPLE_DOCKERFILE_CHECK, "FAIL" + report = provider._process_finding( + SAMPLE_DOCKERFILE_CHECK, "Dockerfile", "dockerfile" ) assert isinstance(report, CheckReportIAC) assert report.status == "FAIL" assert report.check_metadata.ServiceName == "dockerfile" - assert report.check_metadata.CheckID == SAMPLE_DOCKERFILE_CHECK["check_id"] + assert report.check_metadata.CheckID == SAMPLE_DOCKERFILE_CHECK["ID"] - def test_iac_provider_process_check_yaml(self): - """Test processing a YAML check""" + def test_iac_provider_process_finding_yaml(self): + """Test processing a YAML finding""" provider = IacProvider() - report = provider._process_check(SAMPLE_YAML_REPORT, SAMPLE_YAML_CHECK, "PASS") + report = provider._process_finding( + SAMPLE_YAML_CHECK, "deployment.yaml", "kubernetes" + ) assert isinstance(report, CheckReportIAC) - assert report.status == "PASS" - assert report.check_metadata.ServiceName == "yaml" - assert report.check_metadata.CheckID == SAMPLE_YAML_CHECK["check_id"] + assert report.status == "FAIL" # Trivy findings are always FAIL by default + assert report.check_metadata.ServiceName == "kubernetes" + assert report.check_metadata.CheckID == SAMPLE_YAML_CHECK["ID"] @patch("subprocess.run") def test_run_scan_success_with_failed_and_passed_checks(self, mock_subprocess): """Test successful run_scan with both failed and passed checks""" provider = IacProvider() - # Create sample output with both failed and passed checks - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [SAMPLE_FAILED_CHECK], - "passed_checks": [SAMPLE_PASSED_CHECK], - "skipped_checks": [], - }, - } - ] + # Create sample Trivy output with both failed and passed checks + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [SAMPLE_FAILED_CHECK, SAMPLE_PASSED_CHECK], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], + } + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" ) - result = provider.run_scan("/test/directory", ["terraform"], []) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) # Verify results assert len(result) == 2 assert all(isinstance(report, CheckReportIAC) for report in result) - # Check that we have one FAIL and one PASS report + # Check that we have FAIL reports (Trivy findings are always FAIL by default) statuses = [report.status for report in result] - assert "FAIL" in statuses - assert "PASS" in statuses + assert all(status == "FAIL" for status in statuses) @patch("subprocess.run") def test_run_scan_with_skipped_checks(self, mock_subprocess): """Test run_scan with skipped checks (muted)""" provider = IacProvider() - # Create sample output with skipped checks - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [], - "passed_checks": [], - "skipped_checks": [SAMPLE_SKIPPED_CHECK], - }, - } - ] + # Create sample Trivy output with skipped checks + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [SAMPLE_SKIPPED_CHECK], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], + } + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" ) - result = provider.run_scan("/test/directory", ["all"], ["exclude/path"]) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], ["exclude/path"] + ) # Verify results assert len(result) == 1 @@ -291,9 +314,13 @@ class TestIacProvider: """Test run_scan with no findings""" provider = IacProvider() - mock_subprocess.return_value = MagicMock(stdout="[]", stderr="") + mock_subprocess.return_value = MagicMock( + stdout=json.dumps({"Results": []}), stderr="" + ) - result = provider.run_scan("/test/directory", ["kubernetes"], []) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) # Verify results assert len(result) == 0 @@ -303,40 +330,43 @@ class TestIacProvider: """Test run_scan with multiple reports from different frameworks""" provider = IacProvider() - # Create sample output with multiple frameworks - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [SAMPLE_FAILED_CHECK], - "passed_checks": [], - "skipped_checks": [], + # Create sample Trivy output with multiple frameworks + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [SAMPLE_FAILED_CHECK], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], }, - }, - { - "check_type": "kubernetes", - "results": { - "failed_checks": [], - "passed_checks": [SAMPLE_PASSED_CHECK], - "skipped_checks": [], + { + "Target": "deployment.yaml", + "Type": "kubernetes", + "Misconfigurations": [SAMPLE_PASSED_CHECK], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], }, - }, - ] + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" ) - result = provider.run_scan("/test/directory", ["terraform", "kubernetes"], []) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) # Verify results assert len(result) == 2 assert all(isinstance(report, CheckReportIAC) for report in result) - # Check that we have one FAIL and one PASS report + # Check that we have FAIL reports (Trivy findings are always FAIL by default) statuses = [report.status for report in result] - assert "FAIL" in statuses - assert "PASS" in statuses + assert all(status == "FAIL" for status in statuses) @patch("subprocess.run") def test_run_scan_exception_handling(self, mock_subprocess): @@ -347,44 +377,49 @@ class TestIacProvider: mock_subprocess.side_effect = Exception("Test exception") with pytest.raises(SystemExit) as exc_info: - provider.run_scan("/test/directory", ["terraform"], []) + provider.run_scan("/test/directory", ["vuln", "misconfig", "secret"], []) assert exc_info.value.code == 1 @patch("subprocess.run") def test_run_scan_with_different_frameworks(self, mock_subprocess): - """Test run_scan with different framework configurations""" + """Test run_scan with different scanner configurations""" provider = IacProvider() - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [], - "passed_checks": [SAMPLE_PASSED_CHECK], - "skipped_checks": [], - }, - } - ] + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [SAMPLE_PASSED_CHECK], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], + } + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" ) - # Test with specific frameworks - frameworks = ["terraform", "kubernetes", "cloudformation"] - result = provider.run_scan("/test/directory", frameworks, []) + # Test with specific scanners + scanners = ["vuln", "misconfig", "secret"] + result = provider.run_scan("/test/directory", scanners, []) - # Verify subprocess was called with correct frameworks + # Verify subprocess was called with correct scanners mock_subprocess.assert_called_once_with( [ - "checkov", - "-d", + "trivy", + "fs", "/test/directory", - "-o", + "--format", "json", - "-f", - ",".join(frameworks), + "--scanners", + ",".join(scanners), + "--parallel", + "0", + "--include-non-failures", ], capture_output=True, text=True, @@ -392,23 +427,25 @@ class TestIacProvider: # Verify results assert len(result) == 1 - assert result[0].status == "PASS" + assert result[0].status == "FAIL" # Trivy findings are always FAIL by default @patch("subprocess.run") def test_run_scan_with_exclude_paths(self, mock_subprocess): """Test run_scan with exclude paths""" provider = IacProvider() - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [], - "passed_checks": [SAMPLE_PASSED_CHECK], - "skipped_checks": [], - }, - } - ] + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [SAMPLE_PASSED_CHECK], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], + } + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" @@ -416,18 +453,23 @@ class TestIacProvider: # Test with exclude paths exclude_paths = ["node_modules", ".git", "vendor"] - result = provider.run_scan("/test/directory", ["all"], exclude_paths) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], exclude_paths + ) # Verify subprocess was called with correct exclude paths expected_command = [ - "checkov", - "-d", + "trivy", + "fs", "/test/directory", - "-o", + "--format", "json", - "-f", - "all", - "--skip-path", + "--scanners", + "vuln,misconfig,secret", + "--parallel", + "0", + "--include-non-failures", + "--skip-dirs", ",".join(exclude_paths), ] mock_subprocess.assert_called_once_with( @@ -438,38 +480,47 @@ class TestIacProvider: # Verify results assert len(result) == 1 - assert result[0].status == "PASS" + assert result[0].status == "FAIL" # Trivy findings are always FAIL by default @patch("subprocess.run") def test_run_scan_all_check_types(self, mock_subprocess): """Test run_scan with all types of checks (failed, passed, skipped)""" provider = IacProvider() - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [SAMPLE_FAILED_CHECK, SAMPLE_HIGH_SEVERITY_CHECK], - "passed_checks": [SAMPLE_PASSED_CHECK, SAMPLE_CLOUDFORMATION_CHECK], - "skipped_checks": [SAMPLE_SKIPPED_CHECK], - }, - } - ] + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [ + SAMPLE_FAILED_CHECK, + SAMPLE_HIGH_SEVERITY_CHECK, + SAMPLE_PASSED_CHECK, + SAMPLE_CLOUDFORMATION_CHECK, + SAMPLE_SKIPPED_CHECK, + ], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], + } + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" ) - result = provider.run_scan("/test/directory", ["all"], []) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) # Verify results - assert len(result) == 5 # 2 failed + 2 passed + 1 skipped + assert len(result) == 5 # 5 misconfigurations # Check status distribution statuses = [report.status for report in result] - assert statuses.count("FAIL") == 2 - assert statuses.count("PASS") == 2 - assert statuses.count("MUTED") == 1 + assert statuses.count("FAIL") == 4 # 4 regular findings + assert statuses.count("MUTED") == 1 # 1 skipped finding # Check that muted reports have muted=True muted_reports = [report for report in result if report.status == "MUTED"] @@ -481,9 +532,13 @@ class TestIacProvider: provider = IacProvider() # Return empty list of reports - mock_subprocess.return_value = MagicMock(stdout="[]", stderr="") + mock_subprocess.return_value = MagicMock( + stdout=json.dumps({"Results": []}), stderr="" + ) - result = provider.run_scan("/test/directory", ["terraform"], []) + result = provider.run_scan( + "/test/directory", ["vuln", "misconfig", "secret"], [] + ) # Verify results assert len(result) == 0 @@ -493,60 +548,70 @@ class TestIacProvider: """Test run_scan with multiple frameworks and different types of checks""" provider = IacProvider() - # Create sample output with multiple frameworks and different check types - sample_output = [ - { - "check_type": "terraform", - "results": { - "failed_checks": [SAMPLE_FAILED_CHECK, SAMPLE_ANOTHER_FAILED_CHECK], - "passed_checks": [SAMPLE_PASSED_CHECK], - "skipped_checks": [], + # Create sample Trivy output with multiple frameworks and different check types + sample_output = { + "Results": [ + { + "Target": "main.tf", + "Type": "terraform", + "Misconfigurations": [ + SAMPLE_FAILED_CHECK, + SAMPLE_ANOTHER_FAILED_CHECK, + SAMPLE_PASSED_CHECK, + ], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], }, - }, - { - "check_type": "kubernetes", - "results": { - "failed_checks": [SAMPLE_KUBERNETES_CHECK], - "passed_checks": [], - "skipped_checks": [SAMPLE_ANOTHER_SKIPPED_CHECK], + { + "Target": "deployment.yaml", + "Type": "kubernetes", + "Misconfigurations": [ + SAMPLE_KUBERNETES_CHECK, + SAMPLE_ANOTHER_SKIPPED_CHECK, + ], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], }, - }, - { - "check_type": "cloudformation", - "results": { - "failed_checks": [], - "passed_checks": [ + { + "Target": "template.yaml", + "Type": "cloudformation", + "Misconfigurations": [ SAMPLE_CLOUDFORMATION_CHECK, SAMPLE_ANOTHER_PASSED_CHECK, ], - "skipped_checks": [], + "Vulnerabilities": [], + "Secrets": [], + "Licenses": [], }, - }, - ] + ] + } mock_subprocess.return_value = MagicMock( stdout=json.dumps(sample_output), stderr="" ) result = provider.run_scan( - "/test/directory", ["terraform", "kubernetes", "cloudformation"], [] + "/test/directory", ["vuln", "misconfig", "secret"], [] ) # Verify results assert ( len(result) == 7 - ) # 2 failed + 1 passed (terraform) + 1 failed + 1 skipped (kubernetes) + 2 passed (cloudformation) + ) # 3 terraform + 2 kubernetes + 2 cloudformation = 7 total # Check status distribution statuses = [report.status for report in result] - assert statuses.count("FAIL") == 3 - assert statuses.count("PASS") == 3 - assert statuses.count("MUTED") == 1 + assert statuses.count("FAIL") == 6 # 6 regular findings + assert statuses.count("MUTED") == 1 # 1 skipped finding def test_run_method_calls_run_scan(self): """Test that the run method calls run_scan with correct parameters""" provider = IacProvider( - scan_path="/custom/path", frameworks=["terraform"], exclude_path=["exclude"] + scan_path="/custom/path", + scanners=["vuln", "misconfig"], + exclude_path=["exclude"], ) with patch.object(provider, "run_scan") as mock_run_scan: @@ -554,7 +619,7 @@ class TestIacProvider: provider.run() mock_run_scan.assert_called_once_with( - "/custom/path", ["terraform"], ["exclude"] + "/custom/path", ["vuln", "misconfig"], ["exclude"] ) @mock.patch("prowler.providers.iac.iac_provider.porcelain.clone")