Compare commits

...

25 Commits

Author SHA1 Message Date
Pedro Martín
e3858cdd19 chore(CLI): add output functions (#4188) 2024-06-07 09:09:45 +02:00
Pedro Martín
c026fe09d5 Merge branch 'master' into PRWLR-3773-add-listing-functions-to-new-cli 2024-06-05 09:03:19 +02:00
Pedro Martín
63e5318a2c chore(CLI): run a Prowler scan (#4182)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2024-06-05 09:01:42 +02:00
Pedro Martín
31a770848a chore(CLI): add logging commands (#4114) 2024-06-04 11:58:17 +02:00
pedrooot
8c28962d12 chore(cli): improve tests 2024-06-04 10:43:04 +02:00
pedrooot
c3975cf0e4 chore(cli): improve CLI logic 2024-06-04 10:27:15 +02:00
pedrooot
d7fbc80e50 refactor CLI 2024-06-03 18:39:26 +02:00
pedrooot
77f58cbf68 resolve comments 2024-06-03 16:04:32 +02:00
pedrooot
ddc68e78ee improve tests 2024-06-03 14:11:39 +02:00
pedrooot
02800185ba resolve comments 2024-06-03 14:10:41 +02:00
pedrooot
dd15e135e1 fix tests 2024-06-03 13:13:10 +02:00
pedrooot
73cb656eb7 chore(cli): add Prowler banner 2024-06-03 12:44:30 +02:00
Pedro Martín
2053868914 fix(dependencies): ignore jinja vulnerability (#4154) 2024-06-03 12:44:05 +02:00
pedrooot
3f1c4d9295 resolve comments 2024-05-27 17:46:26 +02:00
pedrooot
a55bd1462c resolve comments 2024-05-27 17:25:59 +02:00
pedrooot
59c946ac0b chore(cli): update docs and add test class 2024-05-27 15:59:45 +02:00
pedrooot
0b5650059e update cli.md 2024-05-27 14:09:43 +02:00
pedrooot
054035e335 chore(cli): improve tests 2024-05-27 14:00:48 +02:00
pedrooot
b474247758 add tests still wip 2024-05-27 12:02:19 +02:00
pedrooot
2cefd32b5d improve code 2024-05-27 08:57:55 +02:00
pedrooot
f459d58314 chore(cli): add list-checks-json 2024-05-27 08:56:40 +02:00
pedrooot
66c8d0dc40 chore(cli): add list checks 2024-05-27 08:10:42 +02:00
pedrooot
96e38054bc chore(cli): add list-compliance-requirements [compliance(s)] 2024-05-24 14:06:22 +02:00
pedrooot
fc7d9b5b20 chore(CLI): add list compliance 2024-05-24 12:57:27 +02:00
pedrooot
573c0c3a74 chore(CLI): add list categories 2024-05-24 12:33:55 +02:00
14 changed files with 1045 additions and 452 deletions

View File

@@ -2,5 +2,30 @@
To show the banner, use:
`python cli/cli.py banner`
## Listing
List services by provider.
List services by provider
`python cli/cli.py <provider> list-services`
List fixers by provider
`python cli/cli.py <provider> list-services`
List categories by provider
`python cli/cli.py <provider> list-categories`
List compliance by provider
`python cli/cli.py <provider> list-compliance`
List compliance requirements by provider
`python cli/cli.py <provider> list-compliance-requirements [compliance(s)]`
List checks by provider
`python cli/cli.py <provider> list-checks`
List checks in JSON format by provider
`python cli/cli.py <provider> list-checks-json`

View File

@@ -1,62 +1,387 @@
from argparse import Namespace
from typing import List, Optional
import typer
from prowler.lib.banner import print_banner
from prowler.config.config import (
available_compliance_frameworks,
default_output_directory,
finding_statuses,
)
from prowler.lib.check.check import (
bulk_load_checks_metadata,
bulk_load_compliance_frameworks,
list_categories,
list_checks_json,
list_fixers,
list_services,
print_categories,
print_checks,
print_compliance_frameworks,
print_compliance_requirements,
print_fixers,
print_services,
)
from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
from prowler.lib.logger import logger, logging_levels, set_logging_config
from prowler.lib.outputs.security_hub.security_hub import SecurityHub
from prowler.lib.scan.scan import Scan
from prowler.providers.common.provider import Provider
app = typer.Typer()
aws = typer.Typer(name="aws")
azure = typer.Typer(name="azure")
gcp = typer.Typer(name="gcp")
kubernetes = typer.Typer(name="kubernetes")
app.add_typer(aws, name="aws")
app.add_typer(azure, name="azure")
app.add_typer(gcp, name="gcp")
app.add_typer(kubernetes, name="kubernetes")
def list_resources(provider: str, resource_type: str):
if resource_type == "services":
print_services(list_services(provider))
elif resource_type == "fixers":
print_fixers(list_fixers(provider))
def check_provider(provider: str):
if provider not in ["aws", "azure", "gcp", "kubernetes"]:
raise typer.BadParameter(
"Invalid provider. Choose between aws, azure, gcp or kubernetes."
)
return provider
def create_list_commands(provider_typer: typer.Typer):
provider_name = provider_typer.info.name
def check_compliance_framework(provider: str, compliance_framework: list):
# From the available_compliance_frameworks, check if the compliance_framework is valid for the provider
compliance_frameworks_provider = []
valid_compliance_frameworks = []
for provider_compliance_framework in available_compliance_frameworks:
if provider in provider_compliance_framework:
compliance_frameworks_provider.append(provider_compliance_framework)
for compliance in compliance_framework:
if compliance not in compliance_frameworks_provider:
print(f"{compliance} is not a valid Compliance Framework\n")
else:
valid_compliance_frameworks.append(compliance)
return valid_compliance_frameworks
@provider_typer.command(
"list-services",
help=f"List the {provider_name} services that are supported by Prowler.",
def validate_log_level(log_level: str):
log_levels = list(logging_levels.keys())
if log_level not in log_levels:
raise typer.BadParameter(f"Log level must be one of {log_levels}")
return log_level
def split_space_separated_values(value: str) -> List[str]:
output = []
if value:
for item in value:
for input in item.split(" "):
output.append(input)
return output
def validate_status(status: List[str]):
valid_status = []
for s in status:
if s not in finding_statuses:
raise typer.BadParameter(f"Status must be one of {finding_statuses}")
valid_status.append(s)
return valid_status
def validate_output_formats(output_formats: List[str]):
valid_formats = []
valid_output_formats = ["csv", "json-ocsf", "html", "json-asff"]
for output_format in output_formats:
if output_format not in valid_output_formats:
raise typer.BadParameter(
f"Output format must be one of {valid_output_formats}"
)
else:
valid_formats.append(output_format)
return output_formats
class CLI:
def __init__(
self,
provider: str,
list_services: bool,
list_fixers: bool,
list_categories: bool,
list_compliance: bool,
list_compliance_requirements: List[str],
list_checks: bool,
list_checks_json: bool,
log_level: str,
log_file: Optional[str],
only_logs: bool,
status: List[str],
output_formats: List[str],
output_filename: Optional[str],
output_directory: Optional[str],
verbose: bool,
ignore_exit_code_3: bool,
no_banner: bool,
unix_timestamp: bool,
profile: Optional[str],
):
self.provider = provider
self.list_services = list_services
self.list_fixers = list_fixers
self.list_categories = list_categories
self.list_compliance = list_compliance
self.list_compliance_requirements = list_compliance_requirements
self.list_checks = list_checks
self.list_checks_json = list_checks_json
self.log_level = log_level
self.log_file = log_file
self.only_logs = only_logs
self.status = status
self.output_formats = output_formats
self.output_filename = output_filename
self.output_directory = output_directory
self.verbose = verbose
self.ignore_exit_code_3 = ignore_exit_code_3
self.no_banner = no_banner
self.unix_timestamp = unix_timestamp
self.profile = profile
@app.command()
def main(
provider: str = typer.Argument(
..., help="The provider to check", callback=check_provider
),
list_services_bool: bool = typer.Option(
False, "--list-services", help="List the services of the provider"
),
list_fixers_bool: bool = typer.Option(
False, "--list-fixers", help="List the fixers of the provider"
),
list_categories_bool: bool = typer.Option(
False, "--list-categories", help="List the categories of the provider"
),
list_compliance_bool: bool = typer.Option(
False,
"--list-compliance",
help="List the compliance frameworks of the provider",
),
list_compliance_requirements_value: List[str] = typer.Option(
None,
"--list-compliance-requirements",
help="List the compliance requirements of the provider",
callback=split_space_separated_values,
),
list_checks_bool: bool = typer.Option(
False, "--list-checks", help="List the checks of the provider"
),
list_checks_json_bool: bool = typer.Option(
False,
"--list-checks-json",
help="List the checks of the provider in JSON format",
),
log_level: str = typer.Option("INFO", "--log-level", help="Set the Log level"),
log_file: str = typer.Option(None, "--log-file", help="Set the Log file"),
only_logs: bool = typer.Option(False, "--only-logs", help="Only show logs"),
status_value: List[str] = typer.Option(
[],
"--status",
help=f"Filter by the status of the findings {finding_statuses}",
callback=split_space_separated_values,
),
output_formats_value: List[str] = typer.Option(
["csv json-ocsf html"],
"--output-formats",
help="Output format for the findings",
callback=split_space_separated_values,
),
output_filename_value: str = typer.Option(
None, "--output-filename", help="Output filename"
),
output_directory_value: str = typer.Option(
None, "--output-directory", help="Output directory"
),
verbose: bool = typer.Option(False, "--verbose", help="Show verbose output"),
ignore_exit_code_3: bool = typer.Option(
False, "--ignore-exit-code-3", help="Ignore exit code 3"
),
no_banner: bool = typer.Option(False, "--no-banner", help="Do not show the banner"),
unix_timestamp: bool = typer.Option(
False, "--unix-timestamp", help="Use Unix timestamp"
),
profile: str = typer.Option(None, "--profile", help="The profile to use"),
):
# Make sure the values are valid
if status_value:
status_value = validate_status(status_value)
if output_formats_value:
output_formats_value = validate_output_formats(output_formats_value)
if not output_directory_value:
output_directory_value = default_output_directory
options = CLI(
provider,
list_services_bool,
list_fixers_bool,
list_categories_bool,
list_compliance_bool,
list_compliance_requirements_value,
list_checks_bool,
list_checks_json_bool,
log_level,
log_file,
only_logs,
status_value,
output_formats_value,
output_filename_value,
output_directory_value,
verbose,
ignore_exit_code_3,
no_banner,
unix_timestamp,
profile,
)
def list_services_command():
list_resources(provider_name, "services")
@provider_typer.command(
"list-fixers",
help=f"List the {provider_name} fixers that are supported by Prowler.",
if options.list_services:
services = list_services(options.provider)
print_services(services)
if options.list_fixers:
fixers = list_fixers(options.provider)
print_fixers(fixers)
if options.list_categories:
checks_metadata = bulk_load_checks_metadata(options.provider)
categories = list_categories(checks_metadata)
print_categories(categories)
if options.list_compliance:
compliance_frameworks = bulk_load_compliance_frameworks(options.provider)
print_compliance_frameworks(compliance_frameworks)
if options.list_compliance_requirements:
valid_compliance = check_compliance_framework(
options.provider, options.list_compliance_requirements
)
print_compliance_requirements(
bulk_load_compliance_frameworks(options.provider),
valid_compliance,
)
if options.list_checks:
checks_metadata = bulk_load_checks_metadata(options.provider)
checks = load_checks_to_execute(
checks_metadata,
bulk_load_compliance_frameworks(options.provider),
None,
[],
[],
[],
[],
[],
options.provider,
)
print_checks(options.provider, sorted(checks), checks_metadata)
if options.list_checks_json:
checks_metadata = bulk_load_checks_metadata(options.provider)
checks_to_execute = load_checks_to_execute(
checks_metadata,
bulk_load_compliance_frameworks(options.provider),
None,
[],
[],
[],
[],
[],
options.provider,
)
print(list_checks_json(options.provider, sorted(checks_to_execute)))
if options.log_level:
set_logging_config(validate_log_level(options.log_level))
logger.info(f"Log level set to {options.log_level}")
if options.log_file:
if options.log_level:
set_logging_config(validate_log_level(options.log_level), options.log_file)
else:
set_logging_config("INFO", options.log_file)
logger.info(f"Log file set to {options.log_file}")
if options.only_logs:
if options.log_level:
set_logging_config(validate_log_level(options.log_level), only_logs=True)
else:
set_logging_config("INFO", only_logs=True)
logger.info("Only logs are shown")
if options.status:
logger.info(f"Filtering by status: {options.status}")
# TODO: Implement filtering by status in a class
if options.output_formats:
logger.info(f"Output formats: {options.output_formats}")
# TODO: Implement output formats in a class
if options.output_filename:
logger.info(f"Output filename: {options.output_filename}")
# TODO: Implement output filename in a class
if options.output_directory:
logger.info(f"Output directory: {options.output_directory}")
# TODO: Implement output directory in a class
if options.verbose:
logger.info("Verbose output is enabled")
if options.ignore_exit_code_3:
logger.info("Ignoring exit code 3")
if options.no_banner:
logger.info("No banner is shown")
if options.unix_timestamp:
logger.info("Using Unix timestamp")
if options.profile:
logger.info(f"Using profile: {options.profile}")
run_scan(options)
return options
def run_scan(options: CLI):
# Execute Prowler
checks_to_execute = ["s3_account_level_public_access_blocks"]
# Create the provider
args = Namespace
args.provider = options.provider
args.profile = options.profile
args.verbose = options.verbose
args.fixer = False
args.only_logs = options.only_logs
args.status = options.status
args.output_formats = options.output_formats
args.output_filename = options.output_filename
args.unix_timestamp = options.unix_timestamp
args.output_directory = options.output_directory
args.shodan = None
args.security_hub = False
args.send_sh_only_fails = False
args.ignore_exit_code_3 = options.ignore_exit_code_3
args.no_banner = options.no_banner
# args.region = ("eu-west-1")
Provider.set_global_provider(args)
provider = Provider.get_global_provider()
bulk_checks_metadata = bulk_load_checks_metadata(provider.type)
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider.type)
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
def list_fixers_command():
list_resources(provider_name, "fixers")
create_list_commands(aws)
create_list_commands(azure)
create_list_commands(gcp)
create_list_commands(kubernetes)
@app.command("banner", help="Prints the banner of the tool.")
def banner(show: bool = True):
if show:
print_banner(show)
else:
print("Banner is not shown.")
provider.output_options = (args, bulk_checks_metadata)
provider.output_options.bulk_checks_metadata = bulk_checks_metadata
scan = Scan(provider, checks_to_execute)
custom_checks_metadata = None
scan_results = scan.scan(custom_checks_metadata)
# Verify where AWS Security Hub is enabled
aws_security_enabled_regions = []
security_hub_regions = (
provider.get_available_aws_service_regions("securityhub")
if not provider.identity.audited_regions
else provider.identity.audited_regions
)
security_hub = SecurityHub(provider)
for region in security_hub_regions:
# Save the regions where AWS Security Hub is enabled
if security_hub.verify_security_hub_integration_enabled_per_region(
region,
):
aws_security_enabled_regions.append(region)
# Prepare the findings to be sent to Security Hub
security_hub_findings_per_region = security_hub.prepare_security_hub_findings(
scan_results,
aws_security_enabled_regions,
)
# Send the findings to Security Hub
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub(
security_hub_findings_per_region
)
print(findings_sent_to_security_hub)
if __name__ == "__main__":

View File

@@ -40,15 +40,10 @@ from prowler.lib.outputs.compliance.compliance import display_compliance_table
from prowler.lib.outputs.html.html import add_html_footer, fill_html_overview_statistics
from prowler.lib.outputs.json.json import close_json
from prowler.lib.outputs.outputs import extract_findings_statistics
from prowler.lib.outputs.security_hub.security_hub import SecurityHub
from prowler.lib.outputs.slack.slack import Slack
from prowler.lib.outputs.summary_table import display_summary_table
from prowler.providers.aws.lib.s3.s3 import send_to_s3_bucket
from prowler.providers.aws.lib.security_hub.security_hub import (
batch_send_to_security_hub,
prepare_security_hub_findings,
resolve_security_hub_previous_findings,
verify_security_hub_integration_enabled_per_region,
)
from prowler.providers.common.provider import Provider
from prowler.providers.common.quick_inventory import run_provider_quick_inventory
@@ -321,42 +316,43 @@ def prowler():
if not global_provider.identity.audited_regions
else global_provider.identity.audited_regions
)
security_hub = SecurityHub(global_provider)
for region in security_hub_regions:
# Save the regions where AWS Security Hub is enabled
if verify_security_hub_integration_enabled_per_region(
global_provider.identity.partition,
if security_hub.verify_security_hub_integration_enabled_per_region(
region,
global_provider.session.current_session,
global_provider.identity.account,
):
aws_security_enabled_regions.append(region)
# Prepare the findings to be sent to Security Hub
security_hub_findings_per_region = prepare_security_hub_findings(
security_hub_findings_per_region = security_hub.prepare_security_hub_findings(
findings,
global_provider,
global_provider.output_options,
aws_security_enabled_regions,
)
# Send the findings to Security Hub
findings_sent_to_security_hub = batch_send_to_security_hub(
security_hub_findings_per_region, global_provider.session.current_session
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub(
security_hub_findings_per_region
)
# Refactor(CLI)
print(
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent_to_security_hub} findings sent to AWS Security Hub!{Style.RESET_ALL}"
)
# Resolve previous fails of Security Hub
if not args.skip_sh_update:
# Refactor(CLI)
print(
f"{Style.BRIGHT}\nArchiving previous findings in AWS Security Hub, please wait...{Style.RESET_ALL}"
)
findings_archived_in_security_hub = resolve_security_hub_previous_findings(
security_hub_findings_per_region,
global_provider,
findings_archived_in_security_hub = (
security_hub.resolve_security_hub_previous_findings(
security_hub_findings_per_region,
)
)
# Refactor(CLI)
print(
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
)

View File

@@ -593,12 +593,17 @@ def execute_checks(
service,
check_name,
global_provider,
services_executed,
checks_executed,
custom_checks_metadata,
)
all_findings.extend(check_findings)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata, services_executed, checks_executed
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
@@ -651,12 +656,19 @@ def execute_checks(
service,
check_name,
global_provider,
services_executed,
checks_executed,
custom_checks_metadata,
)
all_findings.extend(check_findings)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata,
services_executed,
checks_executed,
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
@@ -677,8 +689,6 @@ def execute(
service: str,
check_name: str,
global_provider: Any,
services_executed: set,
checks_executed: set,
custom_checks_metadata: Any,
):
try:
@@ -706,13 +716,6 @@ def execute(
check_class, verbose, global_provider.output_options.only_logs
)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata, services_executed, checks_executed
)
# Mutelist findings
if hasattr(global_provider, "mutelist") and global_provider.mutelist:
check_findings = mutelist_findings(

View File

@@ -0,0 +1,235 @@
from boto3.session import Session
from botocore.client import ClientError
from prowler.config.config import timestamp_utc
from prowler.lib.logger import logger
from prowler.lib.outputs.json_asff.json_asff import fill_json_asff
from prowler.providers.aws.aws_provider import AwsProvider
SECURITY_HUB_INTEGRATION_NAME = "prowler/prowler"
SECURITY_HUB_MAX_BATCH = 100
class SecurityHub:
_session: Session
_provider: AwsProvider
_account: str
_partition: str
def __init__(self, provider: AwsProvider) -> "SecurityHub":
self._provider = provider
self._session = provider.session.current_session
self._account = provider.identity.account
self._partition = provider.identity.partition
@property
def findings_per_region(self):
return self._findings_per_region
def prepare_security_hub_findings(
self, findings: list, enabled_regions: list
) -> dict:
security_hub_findings_per_region = {}
# Create a key per audited region
for region in enabled_regions:
security_hub_findings_per_region[region] = []
for finding in findings:
# We don't send the MANUAL findings to AWS Security Hub
if finding.status == "MANUAL":
continue
# We don't send findings to not enabled regions
if finding.region not in enabled_regions:
continue
if (
finding.status != "FAIL" or finding.muted
) and self._provider.output_options.send_sh_only_fails:
continue
if self._provider.output_options.status:
if finding.status not in self._provider.output_options.status:
continue
if finding.muted:
continue
# Get the finding region
region = finding.region
# Format the finding in the JSON ASFF format
finding_json_asff = fill_json_asff(self._provider, finding)
# Include that finding within their region in the JSON format
security_hub_findings_per_region[region].append(
finding_json_asff.dict(exclude_none=True)
)
return security_hub_findings_per_region
def verify_security_hub_integration_enabled_per_region(
self,
region: str,
) -> bool:
f"""
verify_security_hub_integration_enabled returns True if the {SECURITY_HUB_INTEGRATION_NAME} is enabled for the given region. Otherwise returns false.
"""
prowler_integration_enabled = False
try:
logger.info(
f"Checking if the {SECURITY_HUB_INTEGRATION_NAME} is enabled in the {region} region."
)
# Check if security hub is enabled in current region
security_hub_client = self._session.client(
"securityhub", region_name=region
)
security_hub_client.describe_hub()
# Check if Prowler integration is enabled in Security Hub
security_hub_prowler_integration_arn = f"arn:{self._partition}:securityhub:{region}:{self._account}:product-subscription/{SECURITY_HUB_INTEGRATION_NAME}"
if security_hub_prowler_integration_arn not in str(
security_hub_client.list_enabled_products_for_import()
):
logger.warning(
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/"
)
else:
prowler_integration_enabled = True
# Handle all the permissions / configuration errors
except ClientError as client_error:
# Check if Account is subscribed to Security Hub
error_code = client_error.response["Error"]["Code"]
error_message = client_error.response["Error"]["Message"]
if (
error_code == "InvalidAccessException"
and f"Account {self._account} is not subscribed to AWS Security Hub"
in error_message
):
logger.warning(
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
)
else:
logger.error(
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return prowler_integration_enabled
def batch_send_to_security_hub(
self,
security_hub_findings_per_region: dict,
) -> int:
"""
send_to_security_hub sends findings to Security Hub and returns the number of findings that were successfully sent.
"""
success_count = 0
try:
# Iterate findings by region
for region, findings in security_hub_findings_per_region.items():
# Send findings to Security Hub
logger.info(f"Sending findings to Security Hub in the region {region}")
security_hub_client = self._session.client(
"securityhub", region_name=region
)
success_count += self.__send_findings_to_security_hub__(
findings, region, security_hub_client
)
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
)
return success_count
# Move previous Security Hub check findings to ARCHIVED (as prowler didn't re-detect them)
def resolve_security_hub_previous_findings(
self, security_hub_findings_per_region: dict
) -> list:
"""
resolve_security_hub_previous_findings archives all the findings that does not appear in the current execution
"""
logger.info("Checking previous findings in Security Hub to archive them.")
success_count = 0
for region in security_hub_findings_per_region.keys():
try:
current_findings = security_hub_findings_per_region[region]
# Get current findings IDs
current_findings_ids = []
for finding in current_findings:
current_findings_ids.append(finding["Id"])
# Get findings of that region
security_hub_client = self._session.client(
"securityhub", region_name=region
)
findings_filter = {
"ProductName": [{"Value": "Prowler", "Comparison": "EQUALS"}],
"RecordState": [{"Value": "ACTIVE", "Comparison": "EQUALS"}],
"AwsAccountId": [{"Value": self._account, "Comparison": "EQUALS"}],
"Region": [{"Value": region, "Comparison": "EQUALS"}],
}
get_findings_paginator = security_hub_client.get_paginator(
"get_findings"
)
findings_to_archive = []
for page in get_findings_paginator.paginate(Filters=findings_filter):
# Archive findings that have not appear in this execution
for finding in page["Findings"]:
if finding["Id"] not in current_findings_ids:
finding["RecordState"] = "ARCHIVED"
finding["UpdatedAt"] = timestamp_utc.strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
findings_to_archive.append(finding)
logger.info(f"Archiving {len(findings_to_archive)} findings.")
# Send archive findings to SHub
success_count += self.__send_findings_to_security_hub__(
findings_to_archive, region, security_hub_client
)
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
)
return success_count
def __send_findings_to_security_hub__(
self, findings: list[dict], region: str, security_hub_client
):
"""Private function send_findings_to_security_hub chunks the findings in groups of 100 findings and send them to AWS Security Hub. It returns the number of sent findings."""
success_count = 0
try:
list_chunked = [
findings[i : i + SECURITY_HUB_MAX_BATCH]
for i in range(0, len(findings), SECURITY_HUB_MAX_BATCH)
]
for findings in list_chunked:
batch_import = security_hub_client.batch_import_findings(
Findings=findings
)
if batch_import["FailedCount"] > 0:
failed_import = batch_import["FailedFindings"][0]
logger.error(
f"Failed to send findings to AWS Security Hub -- {failed_import['ErrorCode']} -- {failed_import['ErrorMessage']}"
)
success_count += batch_import["SuccessCount"]
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
)
finally:
return success_count

View File

@@ -1,60 +1,150 @@
from typing import Any
from prowler.lib.check.check import execute
from prowler.lib.check.check import execute, update_audit_metadata
from prowler.lib.check.models import Check_Report
from prowler.lib.logger import logger
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.common.provider import Provider
def scan(
checks_to_execute: list,
global_provider: Any,
custom_checks_metadata: Any,
) -> list[Check_Report]:
try:
# List to store all the check's findings
all_findings = []
# Services and checks executed for the Audit Status
services_executed = set()
checks_executed = set()
class Scan:
# Maybe not needed
_provider: Provider
# Refactor(Core): This should replace the Audit_Metadata
_number_of_checks_to_execute: int = 0
_number_of_checks_completed: int = 0
# TODO: these should hold a list of Checks()
_checks_to_execute: set[str]
_service_checks_to_execute: dict[str, set[str]]
_service_checks_completed: dict[str, set[str]]
_progress: float = 0.0
_findings: list = []
# Initialize the Audit Metadata
# TODO: this should be done in the provider class
# Refactor(Core): Audit manager?
global_provider.audit_metadata = Audit_Metadata(
services_scanned=0,
expected_checks=checks_to_execute,
completed_checks=0,
audit_progress=0,
)
def __init__(self, provider, checks_to_execute):
self._provider = provider
for check_name in checks_to_execute:
try:
# Recover service from check name
service = check_name.split("_")[0]
self._number_of_checks_to_execute = len(checks_to_execute)
check_findings = execute(
service,
check_name,
global_provider,
services_executed,
checks_executed,
custom_checks_metadata,
)
all_findings.extend(check_findings)
service_checks_to_execute = dict()
service_checks_completed = dict()
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
)
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
for check in checks_to_execute:
# check -> accessanalyzer_enabled
# service -> accessanalyzer
service = get_service_name_from_check_name(check)
if service not in service_checks_to_execute:
service_checks_to_execute[service] = set()
service_checks_to_execute[service].add(check)
return all_findings
self._service_checks_to_execute = service_checks_to_execute
self._service_checks_completed = service_checks_completed
self._checks_to_execute = checks_to_execute
@property
def checks_to_execute(self) -> set[str]:
return self._checks_to_execute
@property
def service_checks_to_execute(self) -> dict[str, set[str]]:
return self._service_checks_to_execute
@property
def service_checks_completed(self) -> dict[str, set[str]]:
return self._service_checks_completed
@property
def provider(self) -> Provider:
return self._provider
@property
def progress(self) -> float:
return self._number_of_checks_completed / self._number_of_checks_to_execute
@property
def findings(self) -> list:
return self._findings
def scan(
self,
custom_checks_metadata: Any,
) -> list[Check_Report]:
try:
checks_to_execute = self.checks_to_execute
# Initialize the Audit Metadata
# TODO: this should be done in the provider class
# Refactor(Core): Audit manager?
self._provider.audit_metadata = Audit_Metadata(
services_scanned=0, # Refactor(Core): This shouldn't be nee
expected_checks=checks_to_execute,
completed_checks=0,
audit_progress=0,
)
for check_name in checks_to_execute:
try:
# Recover service from check name
service = get_service_name_from_check_name(check_name)
# Execute the check
check_findings = execute(
service,
check_name,
self._provider,
custom_checks_metadata,
)
# Store findings
self._findings.extend(check_findings)
# Remove the executed check
self._service_checks_to_execute[service].remove(check_name)
if len(self._service_checks_to_execute[service]) == 0:
self._service_checks_to_execute.pop(service, None)
# Add the completed check
if service not in self._service_checks_completed:
self._service_checks_completed[service] = set()
self._service_checks_completed[service].add(check_name)
self._number_of_checks_completed += 1
# This should be done just once all the service's checks are completed
# This metadata needs to get to the services not within the provider
# since it is present in the Scan class
self._provider.audit_metadata = update_audit_metadata(
self._provider.audit_metadata,
self.get_completed_services(),
self.get_completed_checks(),
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.error(
f"Check '{check_name}' was not found for the {self._provider.type.upper()} provider"
)
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return self._findings
def get_completed_services(self):
return self._service_checks_completed.keys()
def get_completed_checks(self):
completed_checks = set()
for checks in self._service_checks_completed.values():
completed_checks.update(checks)
return completed_checks
def get_service_name_from_check_name(check_name: str) -> str:
"""
get_service_name_from_check_name returns the service name for a given check name.
Example:
get_service_name_from_check_name("ec2_instance_public") -> "ec2"
"""
return check_name.split("_")[0]

View File

@@ -1,216 +0,0 @@
from boto3 import session
from botocore.client import ClientError
from prowler.config.config import timestamp_utc
from prowler.lib.logger import logger
from prowler.lib.outputs.json_asff.json_asff import fill_json_asff
SECURITY_HUB_INTEGRATION_NAME = "prowler/prowler"
SECURITY_HUB_MAX_BATCH = 100
def prepare_security_hub_findings(
findings: list, provider, output_options, enabled_regions: list
) -> dict:
security_hub_findings_per_region = {}
# Create a key per audited region
for region in enabled_regions:
security_hub_findings_per_region[region] = []
for finding in findings:
# We don't send the MANUAL findings to AWS Security Hub
if finding.status == "MANUAL":
continue
# We don't send findings to not enabled regions
if finding.region not in enabled_regions:
continue
if (
finding.status != "FAIL" or finding.muted
) and output_options.send_sh_only_fails:
continue
if output_options.status:
if finding.status not in output_options.status:
continue
if finding.muted:
continue
# Get the finding region
region = finding.region
# Format the finding in the JSON ASFF format
finding_json_asff = fill_json_asff(provider, finding)
# Include that finding within their region in the JSON format
security_hub_findings_per_region[region].append(
finding_json_asff.dict(exclude_none=True)
)
return security_hub_findings_per_region
def verify_security_hub_integration_enabled_per_region(
partition: str,
region: str,
session: session.Session,
aws_account_number: str,
) -> bool:
f"""verify_security_hub_integration_enabled returns True if the {SECURITY_HUB_INTEGRATION_NAME} is enabled for the given region. Otherwise returns false."""
prowler_integration_enabled = False
try:
logger.info(
f"Checking if the {SECURITY_HUB_INTEGRATION_NAME} is enabled in the {region} region."
)
# Check if security hub is enabled in current region
security_hub_client = session.client("securityhub", region_name=region)
security_hub_client.describe_hub()
# Check if Prowler integration is enabled in Security Hub
security_hub_prowler_integration_arn = f"arn:{partition}:securityhub:{region}:{aws_account_number}:product-subscription/{SECURITY_HUB_INTEGRATION_NAME}"
if security_hub_prowler_integration_arn not in str(
security_hub_client.list_enabled_products_for_import()
):
logger.warning(
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/"
)
else:
prowler_integration_enabled = True
# Handle all the permissions / configuration errors
except ClientError as client_error:
# Check if Account is subscribed to Security Hub
error_code = client_error.response["Error"]["Code"]
error_message = client_error.response["Error"]["Message"]
if (
error_code == "InvalidAccessException"
and f"Account {aws_account_number} is not subscribed to AWS Security Hub"
in error_message
):
logger.warning(
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
)
else:
logger.error(
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
)
finally:
return prowler_integration_enabled
def batch_send_to_security_hub(
security_hub_findings_per_region: dict,
session: session.Session,
) -> int:
"""
send_to_security_hub sends findings to Security Hub and returns the number of findings that were successfully sent.
"""
success_count = 0
try:
# Iterate findings by region
for region, findings in security_hub_findings_per_region.items():
# Send findings to Security Hub
logger.info(f"Sending findings to Security Hub in the region {region}")
security_hub_client = session.client("securityhub", region_name=region)
success_count = __send_findings_to_security_hub__(
findings, region, security_hub_client
)
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
)
return success_count
# Move previous Security Hub check findings to ARCHIVED (as prowler didn't re-detect them)
def resolve_security_hub_previous_findings(
security_hub_findings_per_region: dict, provider
) -> list:
"""
resolve_security_hub_previous_findings archives all the findings that does not appear in the current execution
"""
logger.info("Checking previous findings in Security Hub to archive them.")
success_count = 0
for region in security_hub_findings_per_region.keys():
try:
current_findings = security_hub_findings_per_region[region]
# Get current findings IDs
current_findings_ids = []
for finding in current_findings:
current_findings_ids.append(finding["Id"])
# Get findings of that region
security_hub_client = provider.session.current_session.client(
"securityhub", region_name=region
)
findings_filter = {
"ProductName": [{"Value": "Prowler", "Comparison": "EQUALS"}],
"RecordState": [{"Value": "ACTIVE", "Comparison": "EQUALS"}],
"AwsAccountId": [
{"Value": provider.identity.account, "Comparison": "EQUALS"}
],
"Region": [{"Value": region, "Comparison": "EQUALS"}],
}
get_findings_paginator = security_hub_client.get_paginator("get_findings")
findings_to_archive = []
for page in get_findings_paginator.paginate(Filters=findings_filter):
# Archive findings that have not appear in this execution
for finding in page["Findings"]:
if finding["Id"] not in current_findings_ids:
finding["RecordState"] = "ARCHIVED"
finding["UpdatedAt"] = timestamp_utc.strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
findings_to_archive.append(finding)
logger.info(f"Archiving {len(findings_to_archive)} findings.")
# Send archive findings to SHub
success_count += __send_findings_to_security_hub__(
findings_to_archive, region, security_hub_client
)
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
)
return success_count
def __send_findings_to_security_hub__(
findings: list[dict], region: str, security_hub_client
):
"""Private function send_findings_to_security_hub chunks the findings in groups of 100 findings and send them to AWS Security Hub. It returns the number of sent findings."""
success_count = 0
try:
list_chunked = [
findings[i : i + SECURITY_HUB_MAX_BATCH]
for i in range(0, len(findings), SECURITY_HUB_MAX_BATCH)
]
for findings in list_chunked:
batch_import = security_hub_client.batch_import_findings(Findings=findings)
if batch_import["FailedCount"] > 0:
failed_import = batch_import["FailedFindings"][0]
logger.error(
f"Failed to send findings to AWS Security Hub -- {failed_import['ErrorCode']} -- {failed_import['ErrorMessage']}"
)
success_count += batch_import["SuccessCount"]
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
)
finally:
return success_count

View File

@@ -7,6 +7,7 @@ from prowler.providers.common.provider import Provider
# TODO: include this for all the providers
# Rename to AuditMetadata or ScanMetadata
class Audit_Metadata(BaseModel):
services_scanned: int
# We can't use a set in the expected

0
test.log Normal file
View File

147
tests/cli/cli_test.py Normal file
View File

@@ -0,0 +1,147 @@
import json
from typer.testing import CliRunner
from cli.cli import app
runner = CliRunner()
class TestCLI:
def test_list_services_aws(self):
result = runner.invoke(app, ["aws", "--list-services"])
assert result.exit_code == 0
assert "available services." in result.output
def test_list_fixers_aws(self):
result = runner.invoke(app, ["aws", "--list-fixers"])
assert result.exit_code == 0
assert "available fixers." in result.output
def test_list_categories_aws(self):
result = runner.invoke(app, ["aws", "--list-categories"])
assert result.exit_code == 0
assert "available categories." in result.output
def test_list_compliance_aws(self):
result = runner.invoke(app, ["aws", "--list-compliance"])
assert result.exit_code == 0
assert "available Compliance Frameworks." in result.output
def test_list_compliance_requirements_aws(self):
result = runner.invoke(
app, ["aws", "--list-compliance-requirements", "cis_2.0_aws soc2_aws"]
)
assert result.exit_code == 0
assert "Listing CIS 2.0 AWS Compliance Requirements:" in result.output
assert "Listing SOC2 AWS Compliance Requirements:" in result.output
def test_list_compliance_requirements_no_compliance_aws(self):
result = runner.invoke(app, ["aws", "--list-compliance-requirements"])
assert result.exit_code == 2
assert "requires an argument" in result.output
def test_list_compliance_requirements_one_invalid_aws(self):
invalid_name = "invalid"
result = runner.invoke(
app,
["aws", "--list-compliance-requirements", f"cis_2.0_aws {invalid_name}"],
)
assert result.exit_code == 0
assert "Listing CIS 2.0 AWS Compliance Requirements:" in result.output
assert f"{invalid_name} is not a valid Compliance Framework" in result.output
def test_list_checks_aws(self):
result = runner.invoke(app, ["aws", "--list-checks"])
assert result.exit_code == 0
assert "available checks." in result.output
def test_list_checks_json_aws(self):
result = runner.invoke(app, ["aws", "--list-checks-json"])
assert result.exit_code == 0
assert "aws" in result.output
# validate the json output
try:
json.loads(result.output)
except ValueError:
assert False
def test_log_level(self):
result = runner.invoke(app, ["aws", "--log-level", "ERROR"])
assert result.exit_code == 0
def test_log_level_invalid(self):
result = runner.invoke(app, ["aws", "--log-level", "INVALID"])
assert result.exit_code == 2
assert "Log level must be one of" in result.output
def test_log_level_no_value(self):
result = runner.invoke(app, ["aws", "--log-level"])
assert result.exit_code == 2
assert "Option '--log-level' requires an argument." in result.output
def test_log_file(self):
result = runner.invoke(app, ["aws", "--log-file", "test.log"])
assert result.exit_code == 0
def test_log_file_no_value(self):
result = runner.invoke(app, ["aws", "--log-file"])
assert result.exit_code == 2
assert "Option '--log-file' requires an argument." in result.output
def test_only_logs(self):
result = runner.invoke(app, ["aws", "--only-logs"])
assert result.exit_code == 0
def test_status(self):
result = runner.invoke(app, ["aws", "--status", "PASS"])
assert result.exit_code == 0
def test_status_invalid(self):
result = runner.invoke(app, ["aws", "--status", "INVALID"])
assert result.exit_code == 2
assert "Status must be one of" in result.output
def test_status_no_value(self):
result = runner.invoke(app, ["aws", "--status"])
assert result.exit_code == 2
assert "Option '--status' requires an argument." in result.output
def test_outputs_formats(self):
result = runner.invoke(app, ["aws", "--output-filename", "csv html"])
assert result.exit_code == 0
def test_outputs_formats_no_value(self):
result = runner.invoke(app, ["aws", "--output-filename"])
assert result.exit_code == 2
assert "Option '--output-filename' requires an argument." in result.output
def test_output_directory(self):
result = runner.invoke(app, ["aws", "--output-directory", "test"])
assert result.exit_code == 0
def test_output_directory_no_value(self):
result = runner.invoke(app, ["aws", "--output-directory"])
assert result.exit_code == 2
assert "Option '--output-directory' requires an argument." in result.output
def test_verbose(self):
result = runner.invoke(app, ["aws", "--verbose"])
assert result.exit_code == 0
def test_ignore_exit_code_3(self):
result = runner.invoke(app, ["aws", "--ignore-exit-code-3"])
assert result.exit_code == 0
def test_no_banner(self):
result = runner.invoke(app, ["aws", "--no-banner"])
assert result.exit_code == 0
def test_unix_timestamp(self):
result = runner.invoke(app, ["aws", "--unix-timestamp"])
assert result.exit_code == 0
def test_profile(self):
result = runner.invoke(app, ["aws", "--profile", "test"])
assert result.exit_code == 0

View File

@@ -1,21 +1,16 @@
from argparse import Namespace
from logging import ERROR, WARNING
from os import path
import botocore
from boto3 import session
from botocore.client import ClientError
from mock import MagicMock, patch
from mock import patch
from prowler.config.config import prowler_version, timestamp_utc
from prowler.lib.check.models import Check_Report, load_check_metadata
from prowler.providers.aws.lib.security_hub.security_hub import (
batch_send_to_security_hub,
prepare_security_hub_findings,
verify_security_hub_integration_enabled_per_region,
)
from prowler.lib.outputs.security_hub.security_hub import SecurityHub
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_COMMERCIAL_PARTITION,
AWS_REGION_EU_WEST_1,
AWS_REGION_EU_WEST_2,
set_mocked_aws_provider,
@@ -108,37 +103,25 @@ class Test_SecurityHub:
return finding
def set_mocked_output_options(
self, status: list[str] = [], send_sh_only_fails: bool = False
):
output_options = MagicMock
output_options.bulk_checks_metadata = {}
output_options.status = status
output_options.send_sh_only_fails = send_sh_only_fails
return output_options
def set_mocked_session(self, region):
# Create mock session
return session.Session(
region_name=region,
)
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_verify_security_hub_integration_enabled_per_region(self):
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
assert verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION, AWS_REGION_EU_WEST_1, session, AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider()
security_hub = SecurityHub(aws_provider)
assert security_hub.verify_security_hub_integration_enabled_per_region(
AWS_REGION_EU_WEST_1
)
def test_verify_security_hub_integration_enabled_per_region_security_hub_disabled(
self, caplog
):
aws_provider = set_mocked_aws_provider()
security_hub = SecurityHub(aws_provider)
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
"prowler.lib.outputs.security_hub.security_hub.Session.client",
) as mock_security_hub:
error_message = f"Account {AWS_ACCOUNT_NUMBER} is not subscribed to AWS Security Hub in region {AWS_REGION_EU_WEST_1}"
error_code = "InvalidAccessException"
@@ -151,37 +134,33 @@ class Test_SecurityHub:
operation_name = "DescribeHub"
mock_security_hub.side_effect = ClientError(error_response, operation_name)
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
assert not security_hub.verify_security_hub_integration_enabled_per_region(
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
WARNING,
f"ClientError -- [70]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
f"ClientError -- [86]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
)
]
def test_verify_security_hub_integration_enabled_per_region_prowler_not_subscribed(
self, caplog
):
aws_provider = set_mocked_aws_provider()
security_hub = SecurityHub(aws_provider)
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
"prowler.lib.outputs.security_hub.security_hub.Session.client",
) as mock_security_hub:
mock_security_hub.describe_hub.return_value = None
mock_security_hub.list_enabled_products_for_import.return_value = []
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
assert not security_hub.verify_security_hub_integration_enabled_per_region(
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
@@ -194,11 +173,13 @@ class Test_SecurityHub:
def test_verify_security_hub_integration_enabled_per_region_another_ClientError(
self, caplog
):
aws_provider = set_mocked_aws_provider()
security_hub = SecurityHub(aws_provider)
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
"prowler.lib.outputs.security_hub.security_hub.Session.client",
) as mock_security_hub:
error_message = f"Another exception in region {AWS_REGION_EU_WEST_1}"
error_code = "AnotherException"
@@ -211,58 +192,52 @@ class Test_SecurityHub:
operation_name = "DescribeHub"
mock_security_hub.side_effect = ClientError(error_response, operation_name)
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
assert not security_hub.verify_security_hub_integration_enabled_per_region(
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
ERROR,
f"ClientError -- [70]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
f"ClientError -- [86]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
)
]
def test_verify_security_hub_integration_enabled_per_region_another_Exception(
self, caplog
):
aws_provider = set_mocked_aws_provider()
security_hub = SecurityHub(aws_provider)
caplog.set_level(WARNING)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
with patch(
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
"prowler.lib.outputs.security_hub.security_hub.Session.client",
) as mock_security_hub:
error_message = f"Another exception in region {AWS_REGION_EU_WEST_1}"
mock_security_hub.side_effect = Exception(error_message)
assert not verify_security_hub_integration_enabled_per_region(
AWS_COMMERCIAL_PARTITION,
assert not security_hub.verify_security_hub_integration_enabled_per_region(
AWS_REGION_EU_WEST_1,
session,
AWS_ACCOUNT_NUMBER,
)
assert caplog.record_tuples == [
(
"root",
ERROR,
f"Exception -- [70]: {error_message}",
f"Exception -- [86]: {error_message}",
)
]
def test_prepare_security_hub_findings_enabled_region_all_statuses(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options()
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
security_hub = SecurityHub(aws_provider)
assert prepare_security_hub_findings(
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {
AWS_REGION_EU_WEST_1: [get_security_hub_finding("PASSED")],
@@ -270,104 +245,109 @@ class Test_SecurityHub:
def test_prepare_security_hub_findings_all_statuses_MANUAL_finding(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options()
findings = [self.generate_finding("MANUAL", AWS_REGION_EU_WEST_1)]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
security_hub = SecurityHub(aws_provider)
assert prepare_security_hub_findings(
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_disabled_region(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options()
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_2)]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
security_hub = SecurityHub(aws_provider)
assert prepare_security_hub_findings(
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_PASS_and_FAIL_statuses(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(status=["FAIL"])
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
args = Namespace()
args.status = ["FAIL"]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
)
assert prepare_security_hub_findings(
security_hub = SecurityHub(aws_provider)
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_FAIL_and_FAIL_statuses(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(status=["FAIL"])
findings = [self.generate_finding("FAIL", AWS_REGION_EU_WEST_1)]
args = Namespace()
args.status = ["FAIL"]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
)
assert prepare_security_hub_findings(
security_hub = SecurityHub(aws_provider)
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: [get_security_hub_finding("FAILED")]}
def test_prepare_security_hub_findings_send_sh_only_fails_PASS(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(send_sh_only_fails=True)
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
args = Namespace()
args.send_sh_only_fails = True
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
)
security_hub = SecurityHub(aws_provider)
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: []}
def test_prepare_security_hub_findings_send_sh_only_fails_FAIL(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(send_sh_only_fails=True)
findings = [self.generate_finding("FAIL", AWS_REGION_EU_WEST_1)]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
assert prepare_security_hub_findings(
args = Namespace()
args.send_sh_only_fails = True
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
)
security_hub = SecurityHub(aws_provider)
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {AWS_REGION_EU_WEST_1: [get_security_hub_finding("FAILED")]}
def test_prepare_security_hub_findings_no_audited_regions(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options()
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
aws_provider = set_mocked_aws_provider()
assert prepare_security_hub_findings(
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
security_hub = SecurityHub(aws_provider)
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {
AWS_REGION_EU_WEST_1: [get_security_hub_finding("PASSED")],
@@ -375,20 +355,19 @@ class Test_SecurityHub:
def test_prepare_security_hub_findings_muted_fail_with_send_sh_only_fails(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(
send_sh_only_fails=True,
)
findings = [
self.generate_finding(
status="FAIL", region=AWS_REGION_EU_WEST_1, muted=True
)
]
aws_provider = set_mocked_aws_provider()
args = Namespace()
args.send_sh_only_fails = True
assert prepare_security_hub_findings(
aws_provider = set_mocked_aws_provider(arguments=args)
security_hub = SecurityHub(aws_provider)
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {
AWS_REGION_EU_WEST_1: [],
@@ -396,18 +375,18 @@ class Test_SecurityHub:
def test_prepare_security_hub_findings_muted_fail_with_status_FAIL(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options(status=["FAIL"])
findings = [
self.generate_finding(
status="FAIL", region=AWS_REGION_EU_WEST_1, muted=True
)
]
aws_provider = set_mocked_aws_provider()
args = Namespace()
args.status = ["FAIL"]
aws_provider = set_mocked_aws_provider(arguments=args)
security_hub = SecurityHub(aws_provider)
assert prepare_security_hub_findings(
assert security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
) == {
AWS_REGION_EU_WEST_1: [],
@@ -416,24 +395,21 @@ class Test_SecurityHub:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_batch_send_to_security_hub_one_finding(self):
enabled_regions = [AWS_REGION_EU_WEST_1]
output_options = self.set_mocked_output_options()
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
aws_provider = set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
)
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
security_hub = SecurityHub(aws_provider)
security_hub_findings = prepare_security_hub_findings(
security_hub_findings = security_hub.prepare_security_hub_findings(
findings,
aws_provider,
output_options,
enabled_regions,
)
assert (
batch_send_to_security_hub(
security_hub.batch_send_to_security_hub(
security_hub_findings,
session,
)
== 1
)

View File

@@ -153,8 +153,12 @@ def set_mocked_aws_provider(
return provider
def set_default_provider_arguments(arguments: Namespace) -> Namespace:
def set_default_provider_arguments(input_arguments: Namespace) -> Namespace:
arguments = Namespace
arguments.status = []
if hasattr(input_arguments, "status") and input_arguments.status:
arguments.status = input_arguments.status
arguments.output_formats = []
arguments.output_directory = ""
arguments.verbose = False
@@ -162,7 +166,14 @@ def set_default_provider_arguments(arguments: Namespace) -> Namespace:
arguments.unix_timestamp = False
arguments.shodan = None
arguments.security_hub = False
arguments.send_sh_only_fails = False
if (
hasattr(input_arguments, "send_sh_only_fails")
and input_arguments.send_sh_only_fails
):
arguments.send_sh_only_fails = input_arguments.send_sh_only_fails
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path