mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-03 22:17:03 +00:00
Compare commits
5 Commits
poc-gha-ia
...
wanr-sensi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
16b96cf7a2 | ||
|
|
246a29bd39 | ||
|
|
d59abbba6e | ||
|
|
1940cdc674 | ||
|
|
78db8b6ce5 |
@@ -750,6 +750,35 @@ def init_parser(self):
|
||||
# More arguments for the provider.
|
||||
```
|
||||
|
||||
##### Sensitive CLI Arguments
|
||||
|
||||
CLI flags that accept secrets (tokens, passwords, API keys) require special handling to protect credentials from leaking in HTML output and process listings:
|
||||
|
||||
1. **Use `nargs="?"` with `default=None`** so the flag works both with and without an inline value. This allows the provider to fall back to an environment variable when no value is passed.
|
||||
2. **Add a `SENSITIVE_ARGUMENTS` frozenset** at the top of the `arguments.py` file listing every flag that accepts secret values:
|
||||
|
||||
```python
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--your-provider-password", "--your-provider-token"})
|
||||
```
|
||||
|
||||
Prowler automatically discovers these frozensets and uses them to redact values in HTML output and warn users who pass secrets directly on the command line.
|
||||
|
||||
3. **Document the environment variable** in the `help` text so users know the recommended alternative:
|
||||
|
||||
```python
|
||||
<provider_name>_parser.add_argument(
|
||||
"--your-provider-password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="PASSWORD",
|
||||
help="Password for authentication. We recommend using the YOUR_PROVIDER_PASSWORD environment variable instead.",
|
||||
)
|
||||
```
|
||||
|
||||
<Warning>
|
||||
Do not add new arguments that require passing secrets as CLI values without an environment variable fallback. Prowler CLI warns users when sensitive flags receive explicit values on the command line.
|
||||
</Warning>
|
||||
|
||||
#### Step 5: Implement Mutelist
|
||||
|
||||
**Explanation:**
|
||||
|
||||
@@ -66,22 +66,38 @@ prowler <provider> --categories internet-exposed
|
||||
|
||||
### Shodan
|
||||
|
||||
Prowler allows you check if any public IPs in your Cloud environments are exposed in Shodan with the `-N`/`--shodan <shodan_api_key>` option:
|
||||
Prowler can check whether any public IPs in cloud environments are exposed in Shodan using the `-N`/`--shodan` option.
|
||||
|
||||
For example, you can check if any of your AWS Elastic Compute Cloud (EC2) instances has an elastic IP exposed in Shodan:
|
||||
#### Using the Environment Variable (Recommended)
|
||||
|
||||
Set the `SHODAN_API_KEY` environment variable to avoid exposing the API key in process listings and shell history:
|
||||
|
||||
```console
|
||||
prowler aws -N/--shodan <shodan_api_key> -c ec2_elastic_ip_shodan
|
||||
export SHODAN_API_KEY=<shodan_api_key>
|
||||
```
|
||||
|
||||
Also, you can check if any of your Azure Subscription has an public IP exposed in Shodan:
|
||||
Then run Prowler with the `--shodan` flag (no value needed):
|
||||
|
||||
```console
|
||||
prowler azure -N/--shodan <shodan_api_key> -c network_public_ip_shodan
|
||||
prowler aws --shodan -c ec2_elastic_ip_shodan
|
||||
```
|
||||
|
||||
And finally, you can check if any of your GCP projects has an public IP address exposed in Shodan:
|
||||
|
||||
```console
|
||||
prowler gcp -N/--shodan <shodan_api_key> -c compute_public_address_shodan
|
||||
prowler azure --shodan -c network_public_ip_shodan
|
||||
```
|
||||
|
||||
```console
|
||||
prowler gcp --shodan -c compute_public_address_shodan
|
||||
```
|
||||
|
||||
#### Using the CLI Flag
|
||||
|
||||
Alternatively, pass the API key directly on the command line:
|
||||
|
||||
```console
|
||||
prowler aws --shodan <shodan_api_key> -c ec2_elastic_ip_shodan
|
||||
```
|
||||
|
||||
<Warning>
|
||||
Passing secret values directly on the command line exposes them in process listings and shell history. Prowler CLI displays a warning when this pattern is detected. Use the `SHODAN_API_KEY` environment variable instead.
|
||||
</Warning>
|
||||
|
||||
@@ -24,6 +24,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
|
||||
- Added `internet-exposed` category to 13 AWS checks (CloudFront, CodeArtifact, EC2, EFS, RDS, SageMaker, Shield, VPC) [(#10502)](https://github.com/prowler-cloud/prowler/pull/10502)
|
||||
- Minimum Python version from 3.9 to 3.10 and updated classifiers to reflect supported versions (3.10, 3.11, 3.12) [(#10464)](https://github.com/prowler-cloud/prowler/pull/10464)
|
||||
- Sensitive CLI flags now warn when values are passed directly, recommending environment variables instead [(#10532)](https://github.com/prowler-cloud/prowler/pull/10532)
|
||||
|
||||
### 🐞 Fixed
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ from prowler.config.config import (
|
||||
json_asff_file_suffix,
|
||||
json_ocsf_file_suffix,
|
||||
orange_color,
|
||||
sarif_file_suffix,
|
||||
)
|
||||
from prowler.lib.banner import print_banner
|
||||
from prowler.lib.check.check import (
|
||||
@@ -70,11 +69,11 @@ from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_github import GithubCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_googleworkspace import GoogleWorkspaceCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_kubernetes import KubernetesCIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.cisa_scuba.cisa_scuba_googleworkspace import (
|
||||
GoogleWorkspaceCISASCuBA,
|
||||
)
|
||||
from prowler.lib.outputs.compliance.cis.cis_m365 import M365CIS
|
||||
from prowler.lib.outputs.compliance.cis.cis_oraclecloud import OracleCloudCIS
|
||||
from prowler.lib.outputs.compliance.compliance import display_compliance_table
|
||||
from prowler.lib.outputs.compliance.csa.csa_alibabacloud import AlibabaCloudCSA
|
||||
from prowler.lib.outputs.compliance.csa.csa_aws import AWSCSA
|
||||
@@ -123,7 +122,6 @@ from prowler.lib.outputs.html.html import HTML
|
||||
from prowler.lib.outputs.ocsf.ingestion import send_ocsf_to_api
|
||||
from prowler.lib.outputs.ocsf.ocsf import OCSF
|
||||
from prowler.lib.outputs.outputs import extract_findings_statistics, report
|
||||
from prowler.lib.outputs.sarif.sarif import SARIF
|
||||
from prowler.lib.outputs.slack.slack import Slack
|
||||
from prowler.lib.outputs.summary_table import display_summary_table
|
||||
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
|
||||
@@ -548,13 +546,6 @@ def prowler():
|
||||
html_output.batch_write_data_to_file(
|
||||
provider=global_provider, stats=stats
|
||||
)
|
||||
if mode == "sarif":
|
||||
sarif_output = SARIF(
|
||||
findings=finding_outputs,
|
||||
file_path=f"{filename}{sarif_file_suffix}",
|
||||
)
|
||||
generated_outputs["regular"].append(sarif_output)
|
||||
sarif_output.batch_write_data_to_file()
|
||||
|
||||
if getattr(args, "push_to_cloud", False):
|
||||
if not ocsf_output or not getattr(ocsf_output, "file_path", None):
|
||||
|
||||
@@ -110,7 +110,6 @@ json_file_suffix = ".json"
|
||||
json_asff_file_suffix = ".asff.json"
|
||||
json_ocsf_file_suffix = ".ocsf.json"
|
||||
html_file_suffix = ".html"
|
||||
sarif_file_suffix = ".sarif"
|
||||
default_config_file_path = (
|
||||
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
|
||||
)
|
||||
@@ -121,7 +120,7 @@ default_redteam_config_file_path = (
|
||||
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/llm_config.yaml"
|
||||
)
|
||||
encoding_format_utf_8 = "utf-8"
|
||||
available_output_formats = ["csv", "json-asff", "json-ocsf", "html", "sarif"]
|
||||
available_output_formats = ["csv", "json-asff", "json-ocsf", "html"]
|
||||
|
||||
# Prowler Cloud API settings
|
||||
cloud_api_base_url = os.getenv("PROWLER_CLOUD_API_BASE_URL", "https://api.prowler.com")
|
||||
|
||||
@@ -12,6 +12,7 @@ from prowler.config.config import (
|
||||
default_output_directory,
|
||||
)
|
||||
from prowler.lib.check.models import Severity
|
||||
from prowler.lib.cli.redact import warn_sensitive_argument_values
|
||||
from prowler.lib.outputs.common import Status
|
||||
from prowler.providers.common.arguments import (
|
||||
init_providers_parser,
|
||||
@@ -19,8 +20,6 @@ from prowler.providers.common.arguments import (
|
||||
validate_provider_arguments,
|
||||
)
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
|
||||
|
||||
|
||||
class ProwlerArgumentParser:
|
||||
# Set the default parser
|
||||
@@ -126,6 +125,10 @@ Detailed documentation at https://docs.prowler.com
|
||||
elif sys.argv[1] == "oci":
|
||||
sys.argv[1] = "oraclecloud"
|
||||
|
||||
# Warn about sensitive flags passed with explicit values
|
||||
# Snapshot argv before parse_args() which may exit on errors
|
||||
warn_sensitive_argument_values(list(sys.argv[1:]))
|
||||
|
||||
# Parse arguments
|
||||
args = self.parser.parse_args()
|
||||
|
||||
@@ -434,7 +437,7 @@ Detailed documentation at https://docs.prowler.com
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="SHODAN_API_KEY",
|
||||
help="Check if any public IPs in your Cloud environments are exposed in Shodan.",
|
||||
help="Check if any public IPs in your Cloud environments are exposed in Shodan. We recommend to use the SHODAN_API_KEY environment variable to provide the API key.",
|
||||
)
|
||||
third_party_subparser.add_argument(
|
||||
"--slack",
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
from functools import lru_cache
|
||||
from importlib import import_module
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.lib.cli.sensitive import SENSITIVE_ARGUMENTS as COMMON_SENSITIVE_ARGUMENTS
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.common.provider import Provider, providers_path
|
||||
|
||||
@@ -13,11 +16,7 @@ def get_sensitive_arguments() -> frozenset:
|
||||
sensitive: set[str] = set()
|
||||
|
||||
# Common parser sensitive arguments (e.g., --shodan)
|
||||
try:
|
||||
parser_module = import_module("prowler.lib.cli.parser")
|
||||
sensitive.update(getattr(parser_module, "SENSITIVE_ARGUMENTS", frozenset()))
|
||||
except Exception as error:
|
||||
logger.debug(f"Could not load SENSITIVE_ARGUMENTS from parser: {error}")
|
||||
sensitive.update(COMMON_SENSITIVE_ARGUMENTS)
|
||||
|
||||
# Provider-specific sensitive arguments
|
||||
for provider in Provider.get_available_providers():
|
||||
@@ -66,3 +65,49 @@ def redact_argv(argv: list[str]) -> str:
|
||||
result.append(arg)
|
||||
|
||||
return " ".join(result)
|
||||
|
||||
|
||||
def warn_sensitive_argument_values(argv: list[str]) -> None:
|
||||
"""Log a warning for each sensitive CLI flag that was passed with an explicit value.
|
||||
|
||||
Scans the raw argv list (not parsed args) to detect when users pass
|
||||
secret values directly on the command line instead of using environment
|
||||
variables. Handles both ``--flag value`` and ``--flag=value`` syntax.
|
||||
|
||||
Args:
|
||||
argv: The argument list to check (typically ``sys.argv[1:]``).
|
||||
"""
|
||||
sensitive = get_sensitive_arguments()
|
||||
if not sensitive:
|
||||
return
|
||||
|
||||
use_color = "--no-color" not in argv
|
||||
flags_with_values: list[str] = []
|
||||
|
||||
for i, arg in enumerate(argv):
|
||||
# --flag=value syntax
|
||||
if "=" in arg:
|
||||
flag = arg.split("=", 1)[0]
|
||||
if flag in sensitive:
|
||||
flags_with_values.append(flag)
|
||||
continue
|
||||
|
||||
# --flag value syntax
|
||||
if arg in sensitive:
|
||||
if i + 1 < len(argv) and not argv[i + 1].startswith("-"):
|
||||
flags_with_values.append(arg)
|
||||
|
||||
for flag in flags_with_values:
|
||||
if use_color:
|
||||
logger.warning(
|
||||
f"{Fore.YELLOW}{Style.BRIGHT}WARNING:{Style.RESET_ALL}{Fore.YELLOW} "
|
||||
f"Passing a value directly to {flag} is not recommended. "
|
||||
f"Use the corresponding environment variable instead to avoid "
|
||||
f"exposing secrets in process listings and shell history.{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Passing a value directly to {flag} is not recommended. "
|
||||
f"Use the corresponding environment variable instead to avoid "
|
||||
f"exposing secrets in process listings and shell history."
|
||||
)
|
||||
|
||||
8
prowler/lib/cli/sensitive.py
Normal file
8
prowler/lib/cli/sensitive.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Common parser sensitive arguments.
|
||||
|
||||
This module is kept dependency-free (no prowler-internal imports) so that
|
||||
``prowler.lib.cli.redact`` and any provider argument module can import it
|
||||
without circular-import risk.
|
||||
"""
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
|
||||
@@ -354,9 +354,6 @@ class Finding(BaseModel):
|
||||
check_output, "resource_line_range", ""
|
||||
)
|
||||
output_data["framework"] = check_output.check_metadata.ServiceName
|
||||
output_data["raw"] = {
|
||||
"resource_line_range": output_data.get("resource_line_range", ""),
|
||||
}
|
||||
|
||||
elif provider.type == "llm":
|
||||
output_data["auth_method"] = provider.auth_method
|
||||
|
||||
@@ -1,155 +0,0 @@
|
||||
from json import dump
|
||||
from typing import List
|
||||
|
||||
from prowler.config.config import prowler_version
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.finding import Finding
|
||||
from prowler.lib.outputs.output import Output
|
||||
|
||||
SARIF_SCHEMA_URL = "https://json.schemastore.org/sarif-2.1.0.json"
|
||||
SARIF_VERSION = "2.1.0"
|
||||
|
||||
SEVERITY_TO_SARIF_LEVEL = {
|
||||
"critical": "error",
|
||||
"high": "error",
|
||||
"medium": "warning",
|
||||
"low": "note",
|
||||
"informational": "note",
|
||||
}
|
||||
|
||||
SEVERITY_TO_SECURITY_SEVERITY = {
|
||||
"critical": "9.0",
|
||||
"high": "7.0",
|
||||
"medium": "4.0",
|
||||
"low": "2.0",
|
||||
"informational": "0.0",
|
||||
}
|
||||
|
||||
|
||||
class SARIF(Output):
|
||||
"""Generates SARIF 2.1.0 output compatible with GitHub Code Scanning."""
|
||||
|
||||
def transform(self, findings: List[Finding]) -> None:
|
||||
rules = {}
|
||||
results = []
|
||||
|
||||
for finding in findings:
|
||||
if finding.status != "FAIL":
|
||||
continue
|
||||
|
||||
check_id = finding.metadata.CheckID
|
||||
severity = finding.metadata.Severity.lower()
|
||||
|
||||
if check_id not in rules:
|
||||
rule = {
|
||||
"id": check_id,
|
||||
"name": check_id,
|
||||
"shortDescription": {"text": finding.metadata.CheckTitle},
|
||||
"fullDescription": {
|
||||
"text": finding.metadata.Description or check_id
|
||||
},
|
||||
"help": {
|
||||
"text": finding.metadata.Remediation.Recommendation.Text
|
||||
or finding.metadata.Description
|
||||
or check_id,
|
||||
},
|
||||
"defaultConfiguration": {
|
||||
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
|
||||
},
|
||||
"properties": {
|
||||
"tags": [
|
||||
"security",
|
||||
f"prowler/{finding.metadata.Provider}",
|
||||
f"severity/{severity}",
|
||||
],
|
||||
"security-severity": SEVERITY_TO_SECURITY_SEVERITY.get(
|
||||
severity, "0.0"
|
||||
),
|
||||
},
|
||||
}
|
||||
if finding.metadata.RelatedUrl:
|
||||
rule["helpUri"] = finding.metadata.RelatedUrl
|
||||
rules[check_id] = rule
|
||||
|
||||
rule_index = list(rules.keys()).index(check_id)
|
||||
result = {
|
||||
"ruleId": check_id,
|
||||
"ruleIndex": rule_index,
|
||||
"level": SEVERITY_TO_SARIF_LEVEL.get(severity, "note"),
|
||||
"message": {
|
||||
"text": finding.status_extended or finding.metadata.CheckTitle
|
||||
},
|
||||
}
|
||||
|
||||
location = self._build_location(finding)
|
||||
if location:
|
||||
result["locations"] = [location]
|
||||
|
||||
results.append(result)
|
||||
|
||||
sarif_document = {
|
||||
"$schema": SARIF_SCHEMA_URL,
|
||||
"version": SARIF_VERSION,
|
||||
"runs": [
|
||||
{
|
||||
"tool": {
|
||||
"driver": {
|
||||
"name": "Prowler",
|
||||
"version": prowler_version,
|
||||
"informationUri": "https://prowler.com",
|
||||
"rules": list(rules.values()),
|
||||
},
|
||||
},
|
||||
"results": results,
|
||||
},
|
||||
],
|
||||
}
|
||||
|
||||
self._data = sarif_document
|
||||
|
||||
def batch_write_data_to_file(self) -> None:
|
||||
try:
|
||||
if (
|
||||
getattr(self, "_file_descriptor", None)
|
||||
and not self._file_descriptor.closed
|
||||
and self._data
|
||||
):
|
||||
dump(self._data, self._file_descriptor, indent=2)
|
||||
self._file_descriptor.close()
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _build_location(finding: Finding) -> dict:
|
||||
"""Build a SARIF physicalLocation from a Finding.
|
||||
|
||||
Uses resource_name as the artifact URI and resource_line_range
|
||||
(stored in finding.raw for IaC findings) for region info.
|
||||
"""
|
||||
if not finding.resource_name:
|
||||
return {}
|
||||
|
||||
location = {
|
||||
"physicalLocation": {
|
||||
"artifactLocation": {
|
||||
"uri": finding.resource_name,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
line_range = finding.raw.get("resource_line_range", "")
|
||||
if line_range and ":" in line_range:
|
||||
parts = line_range.split(":")
|
||||
try:
|
||||
start_line = int(parts[0])
|
||||
end_line = int(parts[1])
|
||||
location["physicalLocation"]["region"] = {
|
||||
"startLine": start_line,
|
||||
"endLine": end_line,
|
||||
}
|
||||
except (ValueError, IndexError):
|
||||
pass
|
||||
|
||||
return location
|
||||
@@ -9,7 +9,6 @@ from prowler.config.config import (
|
||||
json_asff_file_suffix,
|
||||
json_ocsf_file_suffix,
|
||||
orange_color,
|
||||
sarif_file_suffix,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.github.models import GithubAppIdentityInfo, GithubIdentityInfo
|
||||
@@ -208,10 +207,6 @@ def display_summary_table(
|
||||
print(
|
||||
f" - HTML: {output_directory}/{output_filename}{html_file_suffix}"
|
||||
)
|
||||
if "sarif" in output_options.output_modes:
|
||||
print(
|
||||
f" - SARIF: {output_directory}/{output_filename}{sarif_file_suffix}"
|
||||
)
|
||||
|
||||
else:
|
||||
print(
|
||||
|
||||
@@ -13,7 +13,11 @@ def init_parser(self):
|
||||
"--nhn-username", nargs="?", default=None, help="NHN API Username"
|
||||
)
|
||||
nhn_auth_subparser.add_argument(
|
||||
"--nhn-password", nargs="?", default=None, help="NHN API Password"
|
||||
"--nhn-password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="NHN_PASSWORD",
|
||||
help="NHN API Password",
|
||||
)
|
||||
nhn_auth_subparser.add_argument(
|
||||
"--nhn-tenant-id", nargs="?", default=None, help="NHN Tenant ID"
|
||||
|
||||
@@ -46,6 +46,7 @@ def init_parser(self):
|
||||
"--os-password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="OS_PASSWORD",
|
||||
help="OpenStack password for authentication. Can also be set via OS_PASSWORD environment variable",
|
||||
)
|
||||
openstack_explicit_subparser.add_argument(
|
||||
|
||||
@@ -45,6 +45,34 @@ prowler/providers/{provider}/
|
||||
└── {check_name}.metadata.json
|
||||
```
|
||||
|
||||
## Sensitive CLI Arguments
|
||||
|
||||
Flags that accept secrets (tokens, passwords, API keys) MUST follow these rules:
|
||||
|
||||
1. **Use `nargs="?"` with `default=None`** — the flag accepts an optional value for backward compatibility; the recommended path is environment variables.
|
||||
2. **Set `metavar` to the environment variable name** users should use (e.g., `metavar="GITHUB_PERSONAL_ACCESS_TOKEN"`).
|
||||
3. **Add the flag to the `SENSITIVE_ARGUMENTS` frozenset** at the top of the provider's `arguments.py`. This set is used to redact values in HTML output and warn users who pass secrets directly.
|
||||
4. **Do not add new arguments that require passing secrets as CLI values** — secrets should come from environment variables. The flag accepts a value for backward compatibility, but CLI warns users to prefer env vars.
|
||||
|
||||
### Pattern
|
||||
|
||||
```python
|
||||
# prowler/providers/{provider}/lib/arguments/arguments.py
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--my-api-key", "--my-password"})
|
||||
|
||||
|
||||
def init_parser(self):
|
||||
auth_subparser = parser.add_argument_group("Authentication Modes")
|
||||
auth_subparser.add_argument(
|
||||
"--my-api-key",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="MY_API_KEY",
|
||||
help="API key for authentication. Use MY_API_KEY env var instead of passing directly.",
|
||||
)
|
||||
```
|
||||
|
||||
## Provider Class Template
|
||||
|
||||
```python
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
import logging
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.cli.redact import REDACTED_VALUE, get_sensitive_arguments, redact_argv
|
||||
from prowler.lib.cli.redact import (
|
||||
REDACTED_VALUE,
|
||||
get_sensitive_arguments,
|
||||
redact_argv,
|
||||
warn_sensitive_argument_values,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -87,6 +93,62 @@ class TestRedactArgv:
|
||||
assert redact_argv(argv) == "aws --region=us-east-1"
|
||||
|
||||
|
||||
class TestWarnSensitiveArgumentValues:
|
||||
def test_no_warning_without_sensitive_flags(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["aws", "--region", "eu-west-1"])
|
||||
assert caplog.text == ""
|
||||
|
||||
def test_no_warning_flag_without_value(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["github", "--personal-access-token"])
|
||||
assert caplog.text == ""
|
||||
|
||||
def test_no_warning_flag_followed_by_another_flag(
|
||||
self, caplog, mock_sensitive_args
|
||||
):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(
|
||||
["github", "--personal-access-token", "--region", "eu-west-1"]
|
||||
)
|
||||
assert caplog.text == ""
|
||||
|
||||
def test_warning_flag_with_value(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(
|
||||
["github", "--personal-access-token", "ghp_secret"]
|
||||
)
|
||||
assert "--personal-access-token" in caplog.text
|
||||
assert "not recommended" in caplog.text
|
||||
|
||||
def test_warning_flag_with_equals_syntax(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["aws", "--shodan=key123"])
|
||||
assert "--shodan" in caplog.text
|
||||
assert "not recommended" in caplog.text
|
||||
|
||||
def test_warning_multiple_flags(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(
|
||||
[
|
||||
"github",
|
||||
"--personal-access-token",
|
||||
"ghp_secret",
|
||||
"--shodan",
|
||||
"key",
|
||||
]
|
||||
)
|
||||
assert "--personal-access-token" in caplog.text
|
||||
assert "--shodan" in caplog.text
|
||||
|
||||
def test_no_color_output(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["--no-color", "aws", "--shodan", "key123"])
|
||||
assert "not recommended" in caplog.text
|
||||
# Should not contain ANSI escape codes
|
||||
assert "\033[" not in caplog.text
|
||||
|
||||
|
||||
class TestGetSensitiveArguments:
|
||||
def test_discovers_known_sensitive_arguments(self):
|
||||
"""Integration test: verify the discovery mechanism finds flags from provider modules."""
|
||||
|
||||
@@ -1,257 +0,0 @@
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.outputs.sarif.sarif import SARIF, SARIF_SCHEMA_URL, SARIF_VERSION
|
||||
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
|
||||
|
||||
|
||||
class TestSARIF:
|
||||
def test_transform_fail_finding(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="S3 bucket is not encrypted",
|
||||
severity="high",
|
||||
resource_name="main.tf",
|
||||
service_name="s3",
|
||||
check_id="s3_encryption_check",
|
||||
check_title="S3 Bucket Encryption",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
assert sarif.data["$schema"] == SARIF_SCHEMA_URL
|
||||
assert sarif.data["version"] == SARIF_VERSION
|
||||
assert len(sarif.data["runs"]) == 1
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert run["tool"]["driver"]["name"] == "Prowler"
|
||||
assert len(run["tool"]["driver"]["rules"]) == 1
|
||||
assert len(run["results"]) == 1
|
||||
|
||||
rule = run["tool"]["driver"]["rules"][0]
|
||||
assert rule["id"] == "s3_encryption_check"
|
||||
assert rule["shortDescription"]["text"] == "S3 Bucket Encryption"
|
||||
assert rule["defaultConfiguration"]["level"] == "error"
|
||||
assert rule["properties"]["security-severity"] == "7.0"
|
||||
|
||||
result = run["results"][0]
|
||||
assert result["ruleId"] == "s3_encryption_check"
|
||||
assert result["ruleIndex"] == 0
|
||||
assert result["level"] == "error"
|
||||
assert result["message"]["text"] == "S3 bucket is not encrypted"
|
||||
|
||||
def test_transform_pass_finding_excluded(self):
|
||||
finding = generate_finding_output(status="PASS", severity="high")
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
|
||||
def test_transform_muted_finding_excluded(self):
|
||||
finding = generate_finding_output(status="FAIL", severity="high", muted=True)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["results"]) == 1
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"severity,expected_level,expected_security_severity",
|
||||
[
|
||||
("critical", "error", "9.0"),
|
||||
("high", "error", "7.0"),
|
||||
("medium", "warning", "4.0"),
|
||||
("low", "note", "2.0"),
|
||||
("informational", "note", "0.0"),
|
||||
],
|
||||
)
|
||||
def test_transform_severity_mapping(
|
||||
self, severity, expected_level, expected_security_severity
|
||||
):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
severity=severity,
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
result = run["results"][0]
|
||||
rule = run["tool"]["driver"]["rules"][0]
|
||||
|
||||
assert result["level"] == expected_level
|
||||
assert rule["defaultConfiguration"]["level"] == expected_level
|
||||
assert rule["properties"]["security-severity"] == expected_security_severity
|
||||
|
||||
def test_transform_multiple_findings_dedup_rules(self):
|
||||
findings = [
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="file1.tf",
|
||||
status_extended="Finding in file1",
|
||||
),
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="file2.tf",
|
||||
status_extended="Finding in file2",
|
||||
),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["tool"]["driver"]["rules"]) == 1
|
||||
assert len(run["results"]) == 2
|
||||
assert run["results"][0]["ruleIndex"] == 0
|
||||
assert run["results"][1]["ruleIndex"] == 0
|
||||
|
||||
def test_transform_multiple_different_rules(self):
|
||||
findings = [
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
service_name="alpha",
|
||||
check_id="alpha_check_one",
|
||||
status_extended="Finding A",
|
||||
),
|
||||
generate_finding_output(
|
||||
status="FAIL",
|
||||
service_name="beta",
|
||||
check_id="beta_check_two",
|
||||
status_extended="Finding B",
|
||||
),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["tool"]["driver"]["rules"]) == 2
|
||||
assert run["results"][0]["ruleIndex"] == 0
|
||||
assert run["results"][1]["ruleIndex"] == 1
|
||||
|
||||
def test_transform_location_with_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="modules/s3/main.tf",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "10:25"}
|
||||
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data["runs"][0]["results"][0]
|
||||
location = result["locations"][0]["physicalLocation"]
|
||||
assert location["artifactLocation"]["uri"] == "modules/s3/main.tf"
|
||||
assert location["region"]["startLine"] == 10
|
||||
assert location["region"]["endLine"] == 25
|
||||
|
||||
def test_transform_location_without_line_range(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data["runs"][0]["results"][0]
|
||||
location = result["locations"][0]["physicalLocation"]
|
||||
assert location["artifactLocation"]["uri"] == "main.tf"
|
||||
assert "region" not in location
|
||||
|
||||
def test_transform_no_resource_name(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
resource_name="",
|
||||
)
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
|
||||
result = sarif.data["runs"][0]["results"][0]
|
||||
assert "locations" not in result
|
||||
|
||||
def test_batch_write_data_to_file(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="test finding",
|
||||
resource_name="main.tf",
|
||||
)
|
||||
|
||||
with tempfile.NamedTemporaryFile(
|
||||
mode="w", suffix=".sarif", delete=False
|
||||
) as tmp:
|
||||
tmp_path = tmp.name
|
||||
|
||||
sarif = SARIF(
|
||||
findings=[finding],
|
||||
file_path=tmp_path,
|
||||
)
|
||||
sarif.batch_write_data_to_file()
|
||||
|
||||
with open(tmp_path) as f:
|
||||
content = json.load(f)
|
||||
|
||||
assert content["$schema"] == SARIF_SCHEMA_URL
|
||||
assert content["version"] == SARIF_VERSION
|
||||
assert len(content["runs"][0]["results"]) == 1
|
||||
|
||||
def test_sarif_schema_structure(self):
|
||||
finding = generate_finding_output(
|
||||
status="FAIL",
|
||||
severity="critical",
|
||||
resource_name="infra/main.tf",
|
||||
service_name="iac",
|
||||
check_id="iac_misconfig_check",
|
||||
check_title="IaC Misconfiguration",
|
||||
description="Checks for misconfigurations",
|
||||
remediation_recommendation_text="Fix the configuration",
|
||||
)
|
||||
finding.raw = {"resource_line_range": "5:15"}
|
||||
|
||||
sarif = SARIF(findings=[finding], file_path=None)
|
||||
doc = sarif.data
|
||||
|
||||
assert "$schema" in doc
|
||||
assert "version" in doc
|
||||
assert "runs" in doc
|
||||
|
||||
run = doc["runs"][0]
|
||||
|
||||
assert "tool" in run
|
||||
assert "driver" in run["tool"]
|
||||
driver = run["tool"]["driver"]
|
||||
assert "name" in driver
|
||||
assert "version" in driver
|
||||
assert "informationUri" in driver
|
||||
assert "rules" in driver
|
||||
|
||||
rule = driver["rules"][0]
|
||||
assert "id" in rule
|
||||
assert "shortDescription" in rule
|
||||
assert "fullDescription" in rule
|
||||
assert "help" in rule
|
||||
assert "defaultConfiguration" in rule
|
||||
assert "properties" in rule
|
||||
assert "tags" in rule["properties"]
|
||||
assert "security-severity" in rule["properties"]
|
||||
|
||||
result = run["results"][0]
|
||||
assert "ruleId" in result
|
||||
assert "ruleIndex" in result
|
||||
assert "level" in result
|
||||
assert "message" in result
|
||||
assert "locations" in result
|
||||
|
||||
loc = result["locations"][0]["physicalLocation"]
|
||||
assert "artifactLocation" in loc
|
||||
assert "uri" in loc["artifactLocation"]
|
||||
assert "region" in loc
|
||||
assert "startLine" in loc["region"]
|
||||
assert "endLine" in loc["region"]
|
||||
|
||||
def test_empty_findings(self):
|
||||
sarif = SARIF(findings=[], file_path=None)
|
||||
assert sarif.data == []
|
||||
|
||||
def test_only_pass_findings(self):
|
||||
findings = [
|
||||
generate_finding_output(status="PASS"),
|
||||
generate_finding_output(status="PASS"),
|
||||
]
|
||||
sarif = SARIF(findings=findings, file_path=None)
|
||||
|
||||
run = sarif.data["runs"][0]
|
||||
assert len(run["results"]) == 0
|
||||
assert len(run["tool"]["driver"]["rules"]) == 0
|
||||
Reference in New Issue
Block a user