mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-01-25 02:08:11 +00:00
Compare commits
25 Commits
update-api
...
PRWLR-3773
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e3858cdd19 | ||
|
|
c026fe09d5 | ||
|
|
63e5318a2c | ||
|
|
31a770848a | ||
|
|
8c28962d12 | ||
|
|
c3975cf0e4 | ||
|
|
d7fbc80e50 | ||
|
|
77f58cbf68 | ||
|
|
ddc68e78ee | ||
|
|
02800185ba | ||
|
|
dd15e135e1 | ||
|
|
73cb656eb7 | ||
|
|
2053868914 | ||
|
|
3f1c4d9295 | ||
|
|
a55bd1462c | ||
|
|
59c946ac0b | ||
|
|
0b5650059e | ||
|
|
054035e335 | ||
|
|
b474247758 | ||
|
|
2cefd32b5d | ||
|
|
f459d58314 | ||
|
|
66c8d0dc40 | ||
|
|
96e38054bc | ||
|
|
fc7d9b5b20 | ||
|
|
573c0c3a74 |
27
cli/cli.md
27
cli/cli.md
@@ -2,5 +2,30 @@
|
||||
To show the banner, use:
|
||||
`python cli/cli.py banner`
|
||||
## Listing
|
||||
List services by provider.
|
||||
List services by provider
|
||||
|
||||
`python cli/cli.py <provider> list-services`
|
||||
|
||||
List fixers by provider
|
||||
|
||||
`python cli/cli.py <provider> list-services`
|
||||
|
||||
List categories by provider
|
||||
|
||||
`python cli/cli.py <provider> list-categories`
|
||||
|
||||
List compliance by provider
|
||||
|
||||
`python cli/cli.py <provider> list-compliance`
|
||||
|
||||
List compliance requirements by provider
|
||||
|
||||
`python cli/cli.py <provider> list-compliance-requirements [compliance(s)]`
|
||||
|
||||
List checks by provider
|
||||
|
||||
`python cli/cli.py <provider> list-checks`
|
||||
|
||||
List checks in JSON format by provider
|
||||
|
||||
`python cli/cli.py <provider> list-checks-json`
|
||||
|
||||
407
cli/cli.py
407
cli/cli.py
@@ -1,62 +1,387 @@
|
||||
from argparse import Namespace
|
||||
from typing import List, Optional
|
||||
|
||||
import typer
|
||||
|
||||
from prowler.lib.banner import print_banner
|
||||
from prowler.config.config import (
|
||||
available_compliance_frameworks,
|
||||
default_output_directory,
|
||||
finding_statuses,
|
||||
)
|
||||
from prowler.lib.check.check import (
|
||||
bulk_load_checks_metadata,
|
||||
bulk_load_compliance_frameworks,
|
||||
list_categories,
|
||||
list_checks_json,
|
||||
list_fixers,
|
||||
list_services,
|
||||
print_categories,
|
||||
print_checks,
|
||||
print_compliance_frameworks,
|
||||
print_compliance_requirements,
|
||||
print_fixers,
|
||||
print_services,
|
||||
)
|
||||
from prowler.lib.check.checks_loader import load_checks_to_execute
|
||||
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
|
||||
from prowler.lib.logger import logger, logging_levels, set_logging_config
|
||||
from prowler.lib.outputs.security_hub.security_hub import SecurityHub
|
||||
from prowler.lib.scan.scan import Scan
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
app = typer.Typer()
|
||||
aws = typer.Typer(name="aws")
|
||||
azure = typer.Typer(name="azure")
|
||||
gcp = typer.Typer(name="gcp")
|
||||
kubernetes = typer.Typer(name="kubernetes")
|
||||
|
||||
app.add_typer(aws, name="aws")
|
||||
app.add_typer(azure, name="azure")
|
||||
app.add_typer(gcp, name="gcp")
|
||||
app.add_typer(kubernetes, name="kubernetes")
|
||||
|
||||
|
||||
def list_resources(provider: str, resource_type: str):
|
||||
if resource_type == "services":
|
||||
print_services(list_services(provider))
|
||||
elif resource_type == "fixers":
|
||||
print_fixers(list_fixers(provider))
|
||||
def check_provider(provider: str):
|
||||
if provider not in ["aws", "azure", "gcp", "kubernetes"]:
|
||||
raise typer.BadParameter(
|
||||
"Invalid provider. Choose between aws, azure, gcp or kubernetes."
|
||||
)
|
||||
return provider
|
||||
|
||||
|
||||
def create_list_commands(provider_typer: typer.Typer):
|
||||
provider_name = provider_typer.info.name
|
||||
def check_compliance_framework(provider: str, compliance_framework: list):
|
||||
# From the available_compliance_frameworks, check if the compliance_framework is valid for the provider
|
||||
compliance_frameworks_provider = []
|
||||
valid_compliance_frameworks = []
|
||||
for provider_compliance_framework in available_compliance_frameworks:
|
||||
if provider in provider_compliance_framework:
|
||||
compliance_frameworks_provider.append(provider_compliance_framework)
|
||||
for compliance in compliance_framework:
|
||||
if compliance not in compliance_frameworks_provider:
|
||||
print(f"{compliance} is not a valid Compliance Framework\n")
|
||||
else:
|
||||
valid_compliance_frameworks.append(compliance)
|
||||
return valid_compliance_frameworks
|
||||
|
||||
@provider_typer.command(
|
||||
"list-services",
|
||||
help=f"List the {provider_name} services that are supported by Prowler.",
|
||||
|
||||
def validate_log_level(log_level: str):
|
||||
log_levels = list(logging_levels.keys())
|
||||
if log_level not in log_levels:
|
||||
raise typer.BadParameter(f"Log level must be one of {log_levels}")
|
||||
return log_level
|
||||
|
||||
|
||||
def split_space_separated_values(value: str) -> List[str]:
|
||||
output = []
|
||||
if value:
|
||||
for item in value:
|
||||
for input in item.split(" "):
|
||||
output.append(input)
|
||||
return output
|
||||
|
||||
|
||||
def validate_status(status: List[str]):
|
||||
valid_status = []
|
||||
for s in status:
|
||||
if s not in finding_statuses:
|
||||
raise typer.BadParameter(f"Status must be one of {finding_statuses}")
|
||||
valid_status.append(s)
|
||||
return valid_status
|
||||
|
||||
|
||||
def validate_output_formats(output_formats: List[str]):
|
||||
valid_formats = []
|
||||
valid_output_formats = ["csv", "json-ocsf", "html", "json-asff"]
|
||||
for output_format in output_formats:
|
||||
if output_format not in valid_output_formats:
|
||||
raise typer.BadParameter(
|
||||
f"Output format must be one of {valid_output_formats}"
|
||||
)
|
||||
else:
|
||||
valid_formats.append(output_format)
|
||||
return output_formats
|
||||
|
||||
|
||||
class CLI:
|
||||
def __init__(
|
||||
self,
|
||||
provider: str,
|
||||
list_services: bool,
|
||||
list_fixers: bool,
|
||||
list_categories: bool,
|
||||
list_compliance: bool,
|
||||
list_compliance_requirements: List[str],
|
||||
list_checks: bool,
|
||||
list_checks_json: bool,
|
||||
log_level: str,
|
||||
log_file: Optional[str],
|
||||
only_logs: bool,
|
||||
status: List[str],
|
||||
output_formats: List[str],
|
||||
output_filename: Optional[str],
|
||||
output_directory: Optional[str],
|
||||
verbose: bool,
|
||||
ignore_exit_code_3: bool,
|
||||
no_banner: bool,
|
||||
unix_timestamp: bool,
|
||||
profile: Optional[str],
|
||||
):
|
||||
self.provider = provider
|
||||
self.list_services = list_services
|
||||
self.list_fixers = list_fixers
|
||||
self.list_categories = list_categories
|
||||
self.list_compliance = list_compliance
|
||||
self.list_compliance_requirements = list_compliance_requirements
|
||||
self.list_checks = list_checks
|
||||
self.list_checks_json = list_checks_json
|
||||
self.log_level = log_level
|
||||
self.log_file = log_file
|
||||
self.only_logs = only_logs
|
||||
self.status = status
|
||||
self.output_formats = output_formats
|
||||
self.output_filename = output_filename
|
||||
self.output_directory = output_directory
|
||||
self.verbose = verbose
|
||||
self.ignore_exit_code_3 = ignore_exit_code_3
|
||||
self.no_banner = no_banner
|
||||
self.unix_timestamp = unix_timestamp
|
||||
self.profile = profile
|
||||
|
||||
|
||||
@app.command()
|
||||
def main(
|
||||
provider: str = typer.Argument(
|
||||
..., help="The provider to check", callback=check_provider
|
||||
),
|
||||
list_services_bool: bool = typer.Option(
|
||||
False, "--list-services", help="List the services of the provider"
|
||||
),
|
||||
list_fixers_bool: bool = typer.Option(
|
||||
False, "--list-fixers", help="List the fixers of the provider"
|
||||
),
|
||||
list_categories_bool: bool = typer.Option(
|
||||
False, "--list-categories", help="List the categories of the provider"
|
||||
),
|
||||
list_compliance_bool: bool = typer.Option(
|
||||
False,
|
||||
"--list-compliance",
|
||||
help="List the compliance frameworks of the provider",
|
||||
),
|
||||
list_compliance_requirements_value: List[str] = typer.Option(
|
||||
None,
|
||||
"--list-compliance-requirements",
|
||||
help="List the compliance requirements of the provider",
|
||||
callback=split_space_separated_values,
|
||||
),
|
||||
list_checks_bool: bool = typer.Option(
|
||||
False, "--list-checks", help="List the checks of the provider"
|
||||
),
|
||||
list_checks_json_bool: bool = typer.Option(
|
||||
False,
|
||||
"--list-checks-json",
|
||||
help="List the checks of the provider in JSON format",
|
||||
),
|
||||
log_level: str = typer.Option("INFO", "--log-level", help="Set the Log level"),
|
||||
log_file: str = typer.Option(None, "--log-file", help="Set the Log file"),
|
||||
only_logs: bool = typer.Option(False, "--only-logs", help="Only show logs"),
|
||||
status_value: List[str] = typer.Option(
|
||||
[],
|
||||
"--status",
|
||||
help=f"Filter by the status of the findings {finding_statuses}",
|
||||
callback=split_space_separated_values,
|
||||
),
|
||||
output_formats_value: List[str] = typer.Option(
|
||||
["csv json-ocsf html"],
|
||||
"--output-formats",
|
||||
help="Output format for the findings",
|
||||
callback=split_space_separated_values,
|
||||
),
|
||||
output_filename_value: str = typer.Option(
|
||||
None, "--output-filename", help="Output filename"
|
||||
),
|
||||
output_directory_value: str = typer.Option(
|
||||
None, "--output-directory", help="Output directory"
|
||||
),
|
||||
verbose: bool = typer.Option(False, "--verbose", help="Show verbose output"),
|
||||
ignore_exit_code_3: bool = typer.Option(
|
||||
False, "--ignore-exit-code-3", help="Ignore exit code 3"
|
||||
),
|
||||
no_banner: bool = typer.Option(False, "--no-banner", help="Do not show the banner"),
|
||||
unix_timestamp: bool = typer.Option(
|
||||
False, "--unix-timestamp", help="Use Unix timestamp"
|
||||
),
|
||||
profile: str = typer.Option(None, "--profile", help="The profile to use"),
|
||||
):
|
||||
# Make sure the values are valid
|
||||
if status_value:
|
||||
status_value = validate_status(status_value)
|
||||
if output_formats_value:
|
||||
output_formats_value = validate_output_formats(output_formats_value)
|
||||
if not output_directory_value:
|
||||
output_directory_value = default_output_directory
|
||||
options = CLI(
|
||||
provider,
|
||||
list_services_bool,
|
||||
list_fixers_bool,
|
||||
list_categories_bool,
|
||||
list_compliance_bool,
|
||||
list_compliance_requirements_value,
|
||||
list_checks_bool,
|
||||
list_checks_json_bool,
|
||||
log_level,
|
||||
log_file,
|
||||
only_logs,
|
||||
status_value,
|
||||
output_formats_value,
|
||||
output_filename_value,
|
||||
output_directory_value,
|
||||
verbose,
|
||||
ignore_exit_code_3,
|
||||
no_banner,
|
||||
unix_timestamp,
|
||||
profile,
|
||||
)
|
||||
def list_services_command():
|
||||
list_resources(provider_name, "services")
|
||||
|
||||
@provider_typer.command(
|
||||
"list-fixers",
|
||||
help=f"List the {provider_name} fixers that are supported by Prowler.",
|
||||
if options.list_services:
|
||||
services = list_services(options.provider)
|
||||
print_services(services)
|
||||
if options.list_fixers:
|
||||
fixers = list_fixers(options.provider)
|
||||
print_fixers(fixers)
|
||||
if options.list_categories:
|
||||
checks_metadata = bulk_load_checks_metadata(options.provider)
|
||||
categories = list_categories(checks_metadata)
|
||||
print_categories(categories)
|
||||
if options.list_compliance:
|
||||
compliance_frameworks = bulk_load_compliance_frameworks(options.provider)
|
||||
print_compliance_frameworks(compliance_frameworks)
|
||||
if options.list_compliance_requirements:
|
||||
valid_compliance = check_compliance_framework(
|
||||
options.provider, options.list_compliance_requirements
|
||||
)
|
||||
print_compliance_requirements(
|
||||
bulk_load_compliance_frameworks(options.provider),
|
||||
valid_compliance,
|
||||
)
|
||||
if options.list_checks:
|
||||
checks_metadata = bulk_load_checks_metadata(options.provider)
|
||||
checks = load_checks_to_execute(
|
||||
checks_metadata,
|
||||
bulk_load_compliance_frameworks(options.provider),
|
||||
None,
|
||||
[],
|
||||
[],
|
||||
[],
|
||||
[],
|
||||
[],
|
||||
options.provider,
|
||||
)
|
||||
print_checks(options.provider, sorted(checks), checks_metadata)
|
||||
if options.list_checks_json:
|
||||
checks_metadata = bulk_load_checks_metadata(options.provider)
|
||||
checks_to_execute = load_checks_to_execute(
|
||||
checks_metadata,
|
||||
bulk_load_compliance_frameworks(options.provider),
|
||||
None,
|
||||
[],
|
||||
[],
|
||||
[],
|
||||
[],
|
||||
[],
|
||||
options.provider,
|
||||
)
|
||||
print(list_checks_json(options.provider, sorted(checks_to_execute)))
|
||||
if options.log_level:
|
||||
set_logging_config(validate_log_level(options.log_level))
|
||||
logger.info(f"Log level set to {options.log_level}")
|
||||
if options.log_file:
|
||||
if options.log_level:
|
||||
set_logging_config(validate_log_level(options.log_level), options.log_file)
|
||||
else:
|
||||
set_logging_config("INFO", options.log_file)
|
||||
logger.info(f"Log file set to {options.log_file}")
|
||||
if options.only_logs:
|
||||
if options.log_level:
|
||||
set_logging_config(validate_log_level(options.log_level), only_logs=True)
|
||||
else:
|
||||
set_logging_config("INFO", only_logs=True)
|
||||
logger.info("Only logs are shown")
|
||||
if options.status:
|
||||
logger.info(f"Filtering by status: {options.status}")
|
||||
# TODO: Implement filtering by status in a class
|
||||
if options.output_formats:
|
||||
logger.info(f"Output formats: {options.output_formats}")
|
||||
# TODO: Implement output formats in a class
|
||||
if options.output_filename:
|
||||
logger.info(f"Output filename: {options.output_filename}")
|
||||
# TODO: Implement output filename in a class
|
||||
if options.output_directory:
|
||||
logger.info(f"Output directory: {options.output_directory}")
|
||||
# TODO: Implement output directory in a class
|
||||
if options.verbose:
|
||||
logger.info("Verbose output is enabled")
|
||||
if options.ignore_exit_code_3:
|
||||
logger.info("Ignoring exit code 3")
|
||||
if options.no_banner:
|
||||
logger.info("No banner is shown")
|
||||
if options.unix_timestamp:
|
||||
logger.info("Using Unix timestamp")
|
||||
if options.profile:
|
||||
logger.info(f"Using profile: {options.profile}")
|
||||
|
||||
run_scan(options)
|
||||
|
||||
return options
|
||||
|
||||
|
||||
def run_scan(options: CLI):
|
||||
# Execute Prowler
|
||||
checks_to_execute = ["s3_account_level_public_access_blocks"]
|
||||
# Create the provider
|
||||
args = Namespace
|
||||
args.provider = options.provider
|
||||
args.profile = options.profile
|
||||
args.verbose = options.verbose
|
||||
args.fixer = False
|
||||
args.only_logs = options.only_logs
|
||||
args.status = options.status
|
||||
args.output_formats = options.output_formats
|
||||
args.output_filename = options.output_filename
|
||||
args.unix_timestamp = options.unix_timestamp
|
||||
args.output_directory = options.output_directory
|
||||
args.shodan = None
|
||||
args.security_hub = False
|
||||
args.send_sh_only_fails = False
|
||||
args.ignore_exit_code_3 = options.ignore_exit_code_3
|
||||
args.no_banner = options.no_banner
|
||||
# args.region = ("eu-west-1")
|
||||
Provider.set_global_provider(args)
|
||||
provider = Provider.get_global_provider()
|
||||
bulk_checks_metadata = bulk_load_checks_metadata(provider.type)
|
||||
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider.type)
|
||||
bulk_checks_metadata = update_checks_metadata_with_compliance(
|
||||
bulk_compliance_frameworks, bulk_checks_metadata
|
||||
)
|
||||
def list_fixers_command():
|
||||
list_resources(provider_name, "fixers")
|
||||
|
||||
|
||||
create_list_commands(aws)
|
||||
create_list_commands(azure)
|
||||
create_list_commands(gcp)
|
||||
create_list_commands(kubernetes)
|
||||
|
||||
|
||||
@app.command("banner", help="Prints the banner of the tool.")
|
||||
def banner(show: bool = True):
|
||||
if show:
|
||||
print_banner(show)
|
||||
else:
|
||||
print("Banner is not shown.")
|
||||
provider.output_options = (args, bulk_checks_metadata)
|
||||
provider.output_options.bulk_checks_metadata = bulk_checks_metadata
|
||||
scan = Scan(provider, checks_to_execute)
|
||||
custom_checks_metadata = None
|
||||
scan_results = scan.scan(custom_checks_metadata)
|
||||
# Verify where AWS Security Hub is enabled
|
||||
aws_security_enabled_regions = []
|
||||
security_hub_regions = (
|
||||
provider.get_available_aws_service_regions("securityhub")
|
||||
if not provider.identity.audited_regions
|
||||
else provider.identity.audited_regions
|
||||
)
|
||||
security_hub = SecurityHub(provider)
|
||||
for region in security_hub_regions:
|
||||
# Save the regions where AWS Security Hub is enabled
|
||||
if security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
region,
|
||||
):
|
||||
aws_security_enabled_regions.append(region)
|
||||
# Prepare the findings to be sent to Security Hub
|
||||
security_hub_findings_per_region = security_hub.prepare_security_hub_findings(
|
||||
scan_results,
|
||||
aws_security_enabled_regions,
|
||||
)
|
||||
# Send the findings to Security Hub
|
||||
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub(
|
||||
security_hub_findings_per_region
|
||||
)
|
||||
print(findings_sent_to_security_hub)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -40,15 +40,10 @@ from prowler.lib.outputs.compliance.compliance import display_compliance_table
|
||||
from prowler.lib.outputs.html.html import add_html_footer, fill_html_overview_statistics
|
||||
from prowler.lib.outputs.json.json import close_json
|
||||
from prowler.lib.outputs.outputs import extract_findings_statistics
|
||||
from prowler.lib.outputs.security_hub.security_hub import SecurityHub
|
||||
from prowler.lib.outputs.slack.slack import Slack
|
||||
from prowler.lib.outputs.summary_table import display_summary_table
|
||||
from prowler.providers.aws.lib.s3.s3 import send_to_s3_bucket
|
||||
from prowler.providers.aws.lib.security_hub.security_hub import (
|
||||
batch_send_to_security_hub,
|
||||
prepare_security_hub_findings,
|
||||
resolve_security_hub_previous_findings,
|
||||
verify_security_hub_integration_enabled_per_region,
|
||||
)
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.common.quick_inventory import run_provider_quick_inventory
|
||||
|
||||
@@ -321,42 +316,43 @@ def prowler():
|
||||
if not global_provider.identity.audited_regions
|
||||
else global_provider.identity.audited_regions
|
||||
)
|
||||
security_hub = SecurityHub(global_provider)
|
||||
|
||||
for region in security_hub_regions:
|
||||
# Save the regions where AWS Security Hub is enabled
|
||||
if verify_security_hub_integration_enabled_per_region(
|
||||
global_provider.identity.partition,
|
||||
if security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
region,
|
||||
global_provider.session.current_session,
|
||||
global_provider.identity.account,
|
||||
):
|
||||
aws_security_enabled_regions.append(region)
|
||||
|
||||
# Prepare the findings to be sent to Security Hub
|
||||
security_hub_findings_per_region = prepare_security_hub_findings(
|
||||
security_hub_findings_per_region = security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
global_provider,
|
||||
global_provider.output_options,
|
||||
aws_security_enabled_regions,
|
||||
)
|
||||
|
||||
# Send the findings to Security Hub
|
||||
findings_sent_to_security_hub = batch_send_to_security_hub(
|
||||
security_hub_findings_per_region, global_provider.session.current_session
|
||||
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub(
|
||||
security_hub_findings_per_region
|
||||
)
|
||||
|
||||
# Refactor(CLI)
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent_to_security_hub} findings sent to AWS Security Hub!{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
# Resolve previous fails of Security Hub
|
||||
if not args.skip_sh_update:
|
||||
# Refactor(CLI)
|
||||
print(
|
||||
f"{Style.BRIGHT}\nArchiving previous findings in AWS Security Hub, please wait...{Style.RESET_ALL}"
|
||||
)
|
||||
findings_archived_in_security_hub = resolve_security_hub_previous_findings(
|
||||
security_hub_findings_per_region,
|
||||
global_provider,
|
||||
findings_archived_in_security_hub = (
|
||||
security_hub.resolve_security_hub_previous_findings(
|
||||
security_hub_findings_per_region,
|
||||
)
|
||||
)
|
||||
# Refactor(CLI)
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
@@ -593,12 +593,17 @@ def execute_checks(
|
||||
service,
|
||||
check_name,
|
||||
global_provider,
|
||||
services_executed,
|
||||
checks_executed,
|
||||
custom_checks_metadata,
|
||||
)
|
||||
all_findings.extend(check_findings)
|
||||
|
||||
# Update Audit Status
|
||||
services_executed.add(service)
|
||||
checks_executed.add(check_name)
|
||||
global_provider.audit_metadata = update_audit_metadata(
|
||||
global_provider.audit_metadata, services_executed, checks_executed
|
||||
)
|
||||
|
||||
# If check does not exists in the provider or is from another provider
|
||||
except ModuleNotFoundError:
|
||||
logger.error(
|
||||
@@ -651,12 +656,19 @@ def execute_checks(
|
||||
service,
|
||||
check_name,
|
||||
global_provider,
|
||||
services_executed,
|
||||
checks_executed,
|
||||
custom_checks_metadata,
|
||||
)
|
||||
all_findings.extend(check_findings)
|
||||
|
||||
# Update Audit Status
|
||||
services_executed.add(service)
|
||||
checks_executed.add(check_name)
|
||||
global_provider.audit_metadata = update_audit_metadata(
|
||||
global_provider.audit_metadata,
|
||||
services_executed,
|
||||
checks_executed,
|
||||
)
|
||||
|
||||
# If check does not exists in the provider or is from another provider
|
||||
except ModuleNotFoundError:
|
||||
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
|
||||
@@ -677,8 +689,6 @@ def execute(
|
||||
service: str,
|
||||
check_name: str,
|
||||
global_provider: Any,
|
||||
services_executed: set,
|
||||
checks_executed: set,
|
||||
custom_checks_metadata: Any,
|
||||
):
|
||||
try:
|
||||
@@ -706,13 +716,6 @@ def execute(
|
||||
check_class, verbose, global_provider.output_options.only_logs
|
||||
)
|
||||
|
||||
# Update Audit Status
|
||||
services_executed.add(service)
|
||||
checks_executed.add(check_name)
|
||||
global_provider.audit_metadata = update_audit_metadata(
|
||||
global_provider.audit_metadata, services_executed, checks_executed
|
||||
)
|
||||
|
||||
# Mutelist findings
|
||||
if hasattr(global_provider, "mutelist") and global_provider.mutelist:
|
||||
check_findings = mutelist_findings(
|
||||
|
||||
235
prowler/lib/outputs/security_hub/security_hub.py
Normal file
235
prowler/lib/outputs/security_hub/security_hub.py
Normal file
@@ -0,0 +1,235 @@
|
||||
from boto3.session import Session
|
||||
from botocore.client import ClientError
|
||||
|
||||
from prowler.config.config import timestamp_utc
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.json_asff.json_asff import fill_json_asff
|
||||
from prowler.providers.aws.aws_provider import AwsProvider
|
||||
|
||||
SECURITY_HUB_INTEGRATION_NAME = "prowler/prowler"
|
||||
SECURITY_HUB_MAX_BATCH = 100
|
||||
|
||||
|
||||
class SecurityHub:
|
||||
_session: Session
|
||||
_provider: AwsProvider
|
||||
_account: str
|
||||
_partition: str
|
||||
|
||||
def __init__(self, provider: AwsProvider) -> "SecurityHub":
|
||||
self._provider = provider
|
||||
self._session = provider.session.current_session
|
||||
self._account = provider.identity.account
|
||||
self._partition = provider.identity.partition
|
||||
|
||||
@property
|
||||
def findings_per_region(self):
|
||||
return self._findings_per_region
|
||||
|
||||
def prepare_security_hub_findings(
|
||||
self, findings: list, enabled_regions: list
|
||||
) -> dict:
|
||||
security_hub_findings_per_region = {}
|
||||
|
||||
# Create a key per audited region
|
||||
for region in enabled_regions:
|
||||
security_hub_findings_per_region[region] = []
|
||||
|
||||
for finding in findings:
|
||||
# We don't send the MANUAL findings to AWS Security Hub
|
||||
if finding.status == "MANUAL":
|
||||
continue
|
||||
|
||||
# We don't send findings to not enabled regions
|
||||
if finding.region not in enabled_regions:
|
||||
continue
|
||||
|
||||
if (
|
||||
finding.status != "FAIL" or finding.muted
|
||||
) and self._provider.output_options.send_sh_only_fails:
|
||||
continue
|
||||
|
||||
if self._provider.output_options.status:
|
||||
if finding.status not in self._provider.output_options.status:
|
||||
continue
|
||||
|
||||
if finding.muted:
|
||||
continue
|
||||
|
||||
# Get the finding region
|
||||
region = finding.region
|
||||
|
||||
# Format the finding in the JSON ASFF format
|
||||
finding_json_asff = fill_json_asff(self._provider, finding)
|
||||
|
||||
# Include that finding within their region in the JSON format
|
||||
security_hub_findings_per_region[region].append(
|
||||
finding_json_asff.dict(exclude_none=True)
|
||||
)
|
||||
|
||||
return security_hub_findings_per_region
|
||||
|
||||
def verify_security_hub_integration_enabled_per_region(
|
||||
self,
|
||||
region: str,
|
||||
) -> bool:
|
||||
f"""
|
||||
verify_security_hub_integration_enabled returns True if the {SECURITY_HUB_INTEGRATION_NAME} is enabled for the given region. Otherwise returns false.
|
||||
"""
|
||||
prowler_integration_enabled = False
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
f"Checking if the {SECURITY_HUB_INTEGRATION_NAME} is enabled in the {region} region."
|
||||
)
|
||||
# Check if security hub is enabled in current region
|
||||
security_hub_client = self._session.client(
|
||||
"securityhub", region_name=region
|
||||
)
|
||||
security_hub_client.describe_hub()
|
||||
|
||||
# Check if Prowler integration is enabled in Security Hub
|
||||
security_hub_prowler_integration_arn = f"arn:{self._partition}:securityhub:{region}:{self._account}:product-subscription/{SECURITY_HUB_INTEGRATION_NAME}"
|
||||
if security_hub_prowler_integration_arn not in str(
|
||||
security_hub_client.list_enabled_products_for_import()
|
||||
):
|
||||
logger.warning(
|
||||
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/"
|
||||
)
|
||||
else:
|
||||
prowler_integration_enabled = True
|
||||
|
||||
# Handle all the permissions / configuration errors
|
||||
except ClientError as client_error:
|
||||
# Check if Account is subscribed to Security Hub
|
||||
error_code = client_error.response["Error"]["Code"]
|
||||
error_message = client_error.response["Error"]["Message"]
|
||||
if (
|
||||
error_code == "InvalidAccessException"
|
||||
and f"Account {self._account} is not subscribed to AWS Security Hub"
|
||||
in error_message
|
||||
):
|
||||
logger.warning(
|
||||
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
finally:
|
||||
return prowler_integration_enabled
|
||||
|
||||
def batch_send_to_security_hub(
|
||||
self,
|
||||
security_hub_findings_per_region: dict,
|
||||
) -> int:
|
||||
"""
|
||||
send_to_security_hub sends findings to Security Hub and returns the number of findings that were successfully sent.
|
||||
"""
|
||||
|
||||
success_count = 0
|
||||
try:
|
||||
# Iterate findings by region
|
||||
for region, findings in security_hub_findings_per_region.items():
|
||||
# Send findings to Security Hub
|
||||
logger.info(f"Sending findings to Security Hub in the region {region}")
|
||||
|
||||
security_hub_client = self._session.client(
|
||||
"securityhub", region_name=region
|
||||
)
|
||||
|
||||
success_count += self.__send_findings_to_security_hub__(
|
||||
findings, region, security_hub_client
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
|
||||
)
|
||||
return success_count
|
||||
|
||||
# Move previous Security Hub check findings to ARCHIVED (as prowler didn't re-detect them)
|
||||
def resolve_security_hub_previous_findings(
|
||||
self, security_hub_findings_per_region: dict
|
||||
) -> list:
|
||||
"""
|
||||
resolve_security_hub_previous_findings archives all the findings that does not appear in the current execution
|
||||
"""
|
||||
logger.info("Checking previous findings in Security Hub to archive them.")
|
||||
success_count = 0
|
||||
for region in security_hub_findings_per_region.keys():
|
||||
try:
|
||||
current_findings = security_hub_findings_per_region[region]
|
||||
# Get current findings IDs
|
||||
current_findings_ids = []
|
||||
for finding in current_findings:
|
||||
current_findings_ids.append(finding["Id"])
|
||||
# Get findings of that region
|
||||
security_hub_client = self._session.client(
|
||||
"securityhub", region_name=region
|
||||
)
|
||||
findings_filter = {
|
||||
"ProductName": [{"Value": "Prowler", "Comparison": "EQUALS"}],
|
||||
"RecordState": [{"Value": "ACTIVE", "Comparison": "EQUALS"}],
|
||||
"AwsAccountId": [{"Value": self._account, "Comparison": "EQUALS"}],
|
||||
"Region": [{"Value": region, "Comparison": "EQUALS"}],
|
||||
}
|
||||
get_findings_paginator = security_hub_client.get_paginator(
|
||||
"get_findings"
|
||||
)
|
||||
findings_to_archive = []
|
||||
for page in get_findings_paginator.paginate(Filters=findings_filter):
|
||||
# Archive findings that have not appear in this execution
|
||||
for finding in page["Findings"]:
|
||||
if finding["Id"] not in current_findings_ids:
|
||||
finding["RecordState"] = "ARCHIVED"
|
||||
finding["UpdatedAt"] = timestamp_utc.strftime(
|
||||
"%Y-%m-%dT%H:%M:%SZ"
|
||||
)
|
||||
|
||||
findings_to_archive.append(finding)
|
||||
logger.info(f"Archiving {len(findings_to_archive)} findings.")
|
||||
|
||||
# Send archive findings to SHub
|
||||
success_count += self.__send_findings_to_security_hub__(
|
||||
findings_to_archive, region, security_hub_client
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
|
||||
)
|
||||
return success_count
|
||||
|
||||
def __send_findings_to_security_hub__(
|
||||
self, findings: list[dict], region: str, security_hub_client
|
||||
):
|
||||
"""Private function send_findings_to_security_hub chunks the findings in groups of 100 findings and send them to AWS Security Hub. It returns the number of sent findings."""
|
||||
success_count = 0
|
||||
try:
|
||||
list_chunked = [
|
||||
findings[i : i + SECURITY_HUB_MAX_BATCH]
|
||||
for i in range(0, len(findings), SECURITY_HUB_MAX_BATCH)
|
||||
]
|
||||
|
||||
for findings in list_chunked:
|
||||
batch_import = security_hub_client.batch_import_findings(
|
||||
Findings=findings
|
||||
)
|
||||
if batch_import["FailedCount"] > 0:
|
||||
failed_import = batch_import["FailedFindings"][0]
|
||||
logger.error(
|
||||
f"Failed to send findings to AWS Security Hub -- {failed_import['ErrorCode']} -- {failed_import['ErrorMessage']}"
|
||||
)
|
||||
success_count += batch_import["SuccessCount"]
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
|
||||
)
|
||||
finally:
|
||||
return success_count
|
||||
@@ -1,60 +1,150 @@
|
||||
from typing import Any
|
||||
|
||||
from prowler.lib.check.check import execute
|
||||
from prowler.lib.check.check import execute, update_audit_metadata
|
||||
from prowler.lib.check.models import Check_Report
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.common.models import Audit_Metadata
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
|
||||
def scan(
|
||||
checks_to_execute: list,
|
||||
global_provider: Any,
|
||||
custom_checks_metadata: Any,
|
||||
) -> list[Check_Report]:
|
||||
try:
|
||||
# List to store all the check's findings
|
||||
all_findings = []
|
||||
# Services and checks executed for the Audit Status
|
||||
services_executed = set()
|
||||
checks_executed = set()
|
||||
class Scan:
|
||||
# Maybe not needed
|
||||
_provider: Provider
|
||||
# Refactor(Core): This should replace the Audit_Metadata
|
||||
_number_of_checks_to_execute: int = 0
|
||||
_number_of_checks_completed: int = 0
|
||||
# TODO: these should hold a list of Checks()
|
||||
_checks_to_execute: set[str]
|
||||
_service_checks_to_execute: dict[str, set[str]]
|
||||
_service_checks_completed: dict[str, set[str]]
|
||||
_progress: float = 0.0
|
||||
_findings: list = []
|
||||
|
||||
# Initialize the Audit Metadata
|
||||
# TODO: this should be done in the provider class
|
||||
# Refactor(Core): Audit manager?
|
||||
global_provider.audit_metadata = Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=checks_to_execute,
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
)
|
||||
def __init__(self, provider, checks_to_execute):
|
||||
self._provider = provider
|
||||
|
||||
for check_name in checks_to_execute:
|
||||
try:
|
||||
# Recover service from check name
|
||||
service = check_name.split("_")[0]
|
||||
self._number_of_checks_to_execute = len(checks_to_execute)
|
||||
|
||||
check_findings = execute(
|
||||
service,
|
||||
check_name,
|
||||
global_provider,
|
||||
services_executed,
|
||||
checks_executed,
|
||||
custom_checks_metadata,
|
||||
)
|
||||
all_findings.extend(check_findings)
|
||||
service_checks_to_execute = dict()
|
||||
service_checks_completed = dict()
|
||||
|
||||
# If check does not exists in the provider or is from another provider
|
||||
except ModuleNotFoundError:
|
||||
logger.error(
|
||||
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
for check in checks_to_execute:
|
||||
# check -> accessanalyzer_enabled
|
||||
# service -> accessanalyzer
|
||||
service = get_service_name_from_check_name(check)
|
||||
if service not in service_checks_to_execute:
|
||||
service_checks_to_execute[service] = set()
|
||||
service_checks_to_execute[service].add(check)
|
||||
|
||||
return all_findings
|
||||
self._service_checks_to_execute = service_checks_to_execute
|
||||
self._service_checks_completed = service_checks_completed
|
||||
self._checks_to_execute = checks_to_execute
|
||||
|
||||
@property
|
||||
def checks_to_execute(self) -> set[str]:
|
||||
return self._checks_to_execute
|
||||
|
||||
@property
|
||||
def service_checks_to_execute(self) -> dict[str, set[str]]:
|
||||
return self._service_checks_to_execute
|
||||
|
||||
@property
|
||||
def service_checks_completed(self) -> dict[str, set[str]]:
|
||||
return self._service_checks_completed
|
||||
|
||||
@property
|
||||
def provider(self) -> Provider:
|
||||
return self._provider
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
return self._number_of_checks_completed / self._number_of_checks_to_execute
|
||||
|
||||
@property
|
||||
def findings(self) -> list:
|
||||
return self._findings
|
||||
|
||||
def scan(
|
||||
self,
|
||||
custom_checks_metadata: Any,
|
||||
) -> list[Check_Report]:
|
||||
try:
|
||||
checks_to_execute = self.checks_to_execute
|
||||
# Initialize the Audit Metadata
|
||||
# TODO: this should be done in the provider class
|
||||
# Refactor(Core): Audit manager?
|
||||
self._provider.audit_metadata = Audit_Metadata(
|
||||
services_scanned=0, # Refactor(Core): This shouldn't be nee
|
||||
expected_checks=checks_to_execute,
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
)
|
||||
|
||||
for check_name in checks_to_execute:
|
||||
try:
|
||||
# Recover service from check name
|
||||
service = get_service_name_from_check_name(check_name)
|
||||
|
||||
# Execute the check
|
||||
check_findings = execute(
|
||||
service,
|
||||
check_name,
|
||||
self._provider,
|
||||
custom_checks_metadata,
|
||||
)
|
||||
# Store findings
|
||||
self._findings.extend(check_findings)
|
||||
|
||||
# Remove the executed check
|
||||
self._service_checks_to_execute[service].remove(check_name)
|
||||
if len(self._service_checks_to_execute[service]) == 0:
|
||||
self._service_checks_to_execute.pop(service, None)
|
||||
# Add the completed check
|
||||
if service not in self._service_checks_completed:
|
||||
self._service_checks_completed[service] = set()
|
||||
self._service_checks_completed[service].add(check_name)
|
||||
self._number_of_checks_completed += 1
|
||||
|
||||
# This should be done just once all the service's checks are completed
|
||||
# This metadata needs to get to the services not within the provider
|
||||
# since it is present in the Scan class
|
||||
self._provider.audit_metadata = update_audit_metadata(
|
||||
self._provider.audit_metadata,
|
||||
self.get_completed_services(),
|
||||
self.get_completed_checks(),
|
||||
)
|
||||
|
||||
# If check does not exists in the provider or is from another provider
|
||||
except ModuleNotFoundError:
|
||||
logger.error(
|
||||
f"Check '{check_name}' was not found for the {self._provider.type.upper()} provider"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
return self._findings
|
||||
|
||||
def get_completed_services(self):
|
||||
return self._service_checks_completed.keys()
|
||||
|
||||
def get_completed_checks(self):
|
||||
completed_checks = set()
|
||||
for checks in self._service_checks_completed.values():
|
||||
completed_checks.update(checks)
|
||||
return completed_checks
|
||||
|
||||
|
||||
def get_service_name_from_check_name(check_name: str) -> str:
|
||||
"""
|
||||
get_service_name_from_check_name returns the service name for a given check name.
|
||||
|
||||
Example:
|
||||
get_service_name_from_check_name("ec2_instance_public") -> "ec2"
|
||||
"""
|
||||
return check_name.split("_")[0]
|
||||
|
||||
@@ -1,216 +0,0 @@
|
||||
from boto3 import session
|
||||
from botocore.client import ClientError
|
||||
|
||||
from prowler.config.config import timestamp_utc
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.outputs.json_asff.json_asff import fill_json_asff
|
||||
|
||||
SECURITY_HUB_INTEGRATION_NAME = "prowler/prowler"
|
||||
SECURITY_HUB_MAX_BATCH = 100
|
||||
|
||||
|
||||
def prepare_security_hub_findings(
|
||||
findings: list, provider, output_options, enabled_regions: list
|
||||
) -> dict:
|
||||
security_hub_findings_per_region = {}
|
||||
|
||||
# Create a key per audited region
|
||||
for region in enabled_regions:
|
||||
security_hub_findings_per_region[region] = []
|
||||
|
||||
for finding in findings:
|
||||
# We don't send the MANUAL findings to AWS Security Hub
|
||||
if finding.status == "MANUAL":
|
||||
continue
|
||||
|
||||
# We don't send findings to not enabled regions
|
||||
if finding.region not in enabled_regions:
|
||||
continue
|
||||
|
||||
if (
|
||||
finding.status != "FAIL" or finding.muted
|
||||
) and output_options.send_sh_only_fails:
|
||||
continue
|
||||
|
||||
if output_options.status:
|
||||
if finding.status not in output_options.status:
|
||||
continue
|
||||
|
||||
if finding.muted:
|
||||
continue
|
||||
|
||||
# Get the finding region
|
||||
region = finding.region
|
||||
|
||||
# Format the finding in the JSON ASFF format
|
||||
finding_json_asff = fill_json_asff(provider, finding)
|
||||
|
||||
# Include that finding within their region in the JSON format
|
||||
security_hub_findings_per_region[region].append(
|
||||
finding_json_asff.dict(exclude_none=True)
|
||||
)
|
||||
|
||||
return security_hub_findings_per_region
|
||||
|
||||
|
||||
def verify_security_hub_integration_enabled_per_region(
|
||||
partition: str,
|
||||
region: str,
|
||||
session: session.Session,
|
||||
aws_account_number: str,
|
||||
) -> bool:
|
||||
f"""verify_security_hub_integration_enabled returns True if the {SECURITY_HUB_INTEGRATION_NAME} is enabled for the given region. Otherwise returns false."""
|
||||
prowler_integration_enabled = False
|
||||
|
||||
try:
|
||||
logger.info(
|
||||
f"Checking if the {SECURITY_HUB_INTEGRATION_NAME} is enabled in the {region} region."
|
||||
)
|
||||
# Check if security hub is enabled in current region
|
||||
security_hub_client = session.client("securityhub", region_name=region)
|
||||
security_hub_client.describe_hub()
|
||||
|
||||
# Check if Prowler integration is enabled in Security Hub
|
||||
security_hub_prowler_integration_arn = f"arn:{partition}:securityhub:{region}:{aws_account_number}:product-subscription/{SECURITY_HUB_INTEGRATION_NAME}"
|
||||
if security_hub_prowler_integration_arn not in str(
|
||||
security_hub_client.list_enabled_products_for_import()
|
||||
):
|
||||
logger.warning(
|
||||
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/"
|
||||
)
|
||||
else:
|
||||
prowler_integration_enabled = True
|
||||
|
||||
# Handle all the permissions / configuration errors
|
||||
except ClientError as client_error:
|
||||
# Check if Account is subscribed to Security Hub
|
||||
error_code = client_error.response["Error"]["Code"]
|
||||
error_message = client_error.response["Error"]["Message"]
|
||||
if (
|
||||
error_code == "InvalidAccessException"
|
||||
and f"Account {aws_account_number} is not subscribed to AWS Security Hub"
|
||||
in error_message
|
||||
):
|
||||
logger.warning(
|
||||
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
finally:
|
||||
return prowler_integration_enabled
|
||||
|
||||
|
||||
def batch_send_to_security_hub(
|
||||
security_hub_findings_per_region: dict,
|
||||
session: session.Session,
|
||||
) -> int:
|
||||
"""
|
||||
send_to_security_hub sends findings to Security Hub and returns the number of findings that were successfully sent.
|
||||
"""
|
||||
|
||||
success_count = 0
|
||||
try:
|
||||
# Iterate findings by region
|
||||
for region, findings in security_hub_findings_per_region.items():
|
||||
# Send findings to Security Hub
|
||||
logger.info(f"Sending findings to Security Hub in the region {region}")
|
||||
|
||||
security_hub_client = session.client("securityhub", region_name=region)
|
||||
|
||||
success_count = __send_findings_to_security_hub__(
|
||||
findings, region, security_hub_client
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
|
||||
)
|
||||
return success_count
|
||||
|
||||
|
||||
# Move previous Security Hub check findings to ARCHIVED (as prowler didn't re-detect them)
|
||||
def resolve_security_hub_previous_findings(
|
||||
security_hub_findings_per_region: dict, provider
|
||||
) -> list:
|
||||
"""
|
||||
resolve_security_hub_previous_findings archives all the findings that does not appear in the current execution
|
||||
"""
|
||||
logger.info("Checking previous findings in Security Hub to archive them.")
|
||||
success_count = 0
|
||||
for region in security_hub_findings_per_region.keys():
|
||||
try:
|
||||
current_findings = security_hub_findings_per_region[region]
|
||||
# Get current findings IDs
|
||||
current_findings_ids = []
|
||||
for finding in current_findings:
|
||||
current_findings_ids.append(finding["Id"])
|
||||
# Get findings of that region
|
||||
security_hub_client = provider.session.current_session.client(
|
||||
"securityhub", region_name=region
|
||||
)
|
||||
findings_filter = {
|
||||
"ProductName": [{"Value": "Prowler", "Comparison": "EQUALS"}],
|
||||
"RecordState": [{"Value": "ACTIVE", "Comparison": "EQUALS"}],
|
||||
"AwsAccountId": [
|
||||
{"Value": provider.identity.account, "Comparison": "EQUALS"}
|
||||
],
|
||||
"Region": [{"Value": region, "Comparison": "EQUALS"}],
|
||||
}
|
||||
get_findings_paginator = security_hub_client.get_paginator("get_findings")
|
||||
findings_to_archive = []
|
||||
for page in get_findings_paginator.paginate(Filters=findings_filter):
|
||||
# Archive findings that have not appear in this execution
|
||||
for finding in page["Findings"]:
|
||||
if finding["Id"] not in current_findings_ids:
|
||||
finding["RecordState"] = "ARCHIVED"
|
||||
finding["UpdatedAt"] = timestamp_utc.strftime(
|
||||
"%Y-%m-%dT%H:%M:%SZ"
|
||||
)
|
||||
|
||||
findings_to_archive.append(finding)
|
||||
logger.info(f"Archiving {len(findings_to_archive)} findings.")
|
||||
|
||||
# Send archive findings to SHub
|
||||
success_count += __send_findings_to_security_hub__(
|
||||
findings_to_archive, region, security_hub_client
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
|
||||
)
|
||||
return success_count
|
||||
|
||||
|
||||
def __send_findings_to_security_hub__(
|
||||
findings: list[dict], region: str, security_hub_client
|
||||
):
|
||||
"""Private function send_findings_to_security_hub chunks the findings in groups of 100 findings and send them to AWS Security Hub. It returns the number of sent findings."""
|
||||
success_count = 0
|
||||
try:
|
||||
list_chunked = [
|
||||
findings[i : i + SECURITY_HUB_MAX_BATCH]
|
||||
for i in range(0, len(findings), SECURITY_HUB_MAX_BATCH)
|
||||
]
|
||||
|
||||
for findings in list_chunked:
|
||||
batch_import = security_hub_client.batch_import_findings(Findings=findings)
|
||||
if batch_import["FailedCount"] > 0:
|
||||
failed_import = batch_import["FailedFindings"][0]
|
||||
logger.error(
|
||||
f"Failed to send findings to AWS Security Hub -- {failed_import['ErrorCode']} -- {failed_import['ErrorMessage']}"
|
||||
)
|
||||
success_count += batch_import["SuccessCount"]
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]:{error} in region {region}"
|
||||
)
|
||||
finally:
|
||||
return success_count
|
||||
@@ -7,6 +7,7 @@ from prowler.providers.common.provider import Provider
|
||||
|
||||
|
||||
# TODO: include this for all the providers
|
||||
# Rename to AuditMetadata or ScanMetadata
|
||||
class Audit_Metadata(BaseModel):
|
||||
services_scanned: int
|
||||
# We can't use a set in the expected
|
||||
|
||||
147
tests/cli/cli_test.py
Normal file
147
tests/cli/cli_test.py
Normal file
@@ -0,0 +1,147 @@
|
||||
import json
|
||||
|
||||
from typer.testing import CliRunner
|
||||
|
||||
from cli.cli import app
|
||||
|
||||
runner = CliRunner()
|
||||
|
||||
|
||||
class TestCLI:
|
||||
|
||||
def test_list_services_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-services"])
|
||||
assert result.exit_code == 0
|
||||
assert "available services." in result.output
|
||||
|
||||
def test_list_fixers_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-fixers"])
|
||||
assert result.exit_code == 0
|
||||
assert "available fixers." in result.output
|
||||
|
||||
def test_list_categories_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-categories"])
|
||||
assert result.exit_code == 0
|
||||
assert "available categories." in result.output
|
||||
|
||||
def test_list_compliance_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-compliance"])
|
||||
assert result.exit_code == 0
|
||||
assert "available Compliance Frameworks." in result.output
|
||||
|
||||
def test_list_compliance_requirements_aws(self):
|
||||
result = runner.invoke(
|
||||
app, ["aws", "--list-compliance-requirements", "cis_2.0_aws soc2_aws"]
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Listing CIS 2.0 AWS Compliance Requirements:" in result.output
|
||||
assert "Listing SOC2 AWS Compliance Requirements:" in result.output
|
||||
|
||||
def test_list_compliance_requirements_no_compliance_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-compliance-requirements"])
|
||||
assert result.exit_code == 2
|
||||
assert "requires an argument" in result.output
|
||||
|
||||
def test_list_compliance_requirements_one_invalid_aws(self):
|
||||
invalid_name = "invalid"
|
||||
result = runner.invoke(
|
||||
app,
|
||||
["aws", "--list-compliance-requirements", f"cis_2.0_aws {invalid_name}"],
|
||||
)
|
||||
assert result.exit_code == 0
|
||||
assert "Listing CIS 2.0 AWS Compliance Requirements:" in result.output
|
||||
assert f"{invalid_name} is not a valid Compliance Framework" in result.output
|
||||
|
||||
def test_list_checks_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-checks"])
|
||||
assert result.exit_code == 0
|
||||
assert "available checks." in result.output
|
||||
|
||||
def test_list_checks_json_aws(self):
|
||||
result = runner.invoke(app, ["aws", "--list-checks-json"])
|
||||
assert result.exit_code == 0
|
||||
assert "aws" in result.output
|
||||
# validate the json output
|
||||
try:
|
||||
json.loads(result.output)
|
||||
except ValueError:
|
||||
assert False
|
||||
|
||||
def test_log_level(self):
|
||||
result = runner.invoke(app, ["aws", "--log-level", "ERROR"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_log_level_invalid(self):
|
||||
result = runner.invoke(app, ["aws", "--log-level", "INVALID"])
|
||||
assert result.exit_code == 2
|
||||
assert "Log level must be one of" in result.output
|
||||
|
||||
def test_log_level_no_value(self):
|
||||
result = runner.invoke(app, ["aws", "--log-level"])
|
||||
assert result.exit_code == 2
|
||||
assert "Option '--log-level' requires an argument." in result.output
|
||||
|
||||
def test_log_file(self):
|
||||
result = runner.invoke(app, ["aws", "--log-file", "test.log"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_log_file_no_value(self):
|
||||
result = runner.invoke(app, ["aws", "--log-file"])
|
||||
assert result.exit_code == 2
|
||||
assert "Option '--log-file' requires an argument." in result.output
|
||||
|
||||
def test_only_logs(self):
|
||||
result = runner.invoke(app, ["aws", "--only-logs"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_status(self):
|
||||
result = runner.invoke(app, ["aws", "--status", "PASS"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_status_invalid(self):
|
||||
result = runner.invoke(app, ["aws", "--status", "INVALID"])
|
||||
assert result.exit_code == 2
|
||||
assert "Status must be one of" in result.output
|
||||
|
||||
def test_status_no_value(self):
|
||||
result = runner.invoke(app, ["aws", "--status"])
|
||||
assert result.exit_code == 2
|
||||
assert "Option '--status' requires an argument." in result.output
|
||||
|
||||
def test_outputs_formats(self):
|
||||
result = runner.invoke(app, ["aws", "--output-filename", "csv html"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_outputs_formats_no_value(self):
|
||||
result = runner.invoke(app, ["aws", "--output-filename"])
|
||||
assert result.exit_code == 2
|
||||
assert "Option '--output-filename' requires an argument." in result.output
|
||||
|
||||
def test_output_directory(self):
|
||||
result = runner.invoke(app, ["aws", "--output-directory", "test"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_output_directory_no_value(self):
|
||||
result = runner.invoke(app, ["aws", "--output-directory"])
|
||||
assert result.exit_code == 2
|
||||
assert "Option '--output-directory' requires an argument." in result.output
|
||||
|
||||
def test_verbose(self):
|
||||
result = runner.invoke(app, ["aws", "--verbose"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_ignore_exit_code_3(self):
|
||||
result = runner.invoke(app, ["aws", "--ignore-exit-code-3"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_no_banner(self):
|
||||
result = runner.invoke(app, ["aws", "--no-banner"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_unix_timestamp(self):
|
||||
result = runner.invoke(app, ["aws", "--unix-timestamp"])
|
||||
assert result.exit_code == 0
|
||||
|
||||
def test_profile(self):
|
||||
result = runner.invoke(app, ["aws", "--profile", "test"])
|
||||
assert result.exit_code == 0
|
||||
@@ -1,21 +1,16 @@
|
||||
from argparse import Namespace
|
||||
from logging import ERROR, WARNING
|
||||
from os import path
|
||||
|
||||
import botocore
|
||||
from boto3 import session
|
||||
from botocore.client import ClientError
|
||||
from mock import MagicMock, patch
|
||||
from mock import patch
|
||||
|
||||
from prowler.config.config import prowler_version, timestamp_utc
|
||||
from prowler.lib.check.models import Check_Report, load_check_metadata
|
||||
from prowler.providers.aws.lib.security_hub.security_hub import (
|
||||
batch_send_to_security_hub,
|
||||
prepare_security_hub_findings,
|
||||
verify_security_hub_integration_enabled_per_region,
|
||||
)
|
||||
from prowler.lib.outputs.security_hub.security_hub import SecurityHub
|
||||
from tests.providers.aws.utils import (
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
AWS_COMMERCIAL_PARTITION,
|
||||
AWS_REGION_EU_WEST_1,
|
||||
AWS_REGION_EU_WEST_2,
|
||||
set_mocked_aws_provider,
|
||||
@@ -108,37 +103,25 @@ class Test_SecurityHub:
|
||||
|
||||
return finding
|
||||
|
||||
def set_mocked_output_options(
|
||||
self, status: list[str] = [], send_sh_only_fails: bool = False
|
||||
):
|
||||
output_options = MagicMock
|
||||
output_options.bulk_checks_metadata = {}
|
||||
output_options.status = status
|
||||
output_options.send_sh_only_fails = send_sh_only_fails
|
||||
|
||||
return output_options
|
||||
|
||||
def set_mocked_session(self, region):
|
||||
# Create mock session
|
||||
return session.Session(
|
||||
region_name=region,
|
||||
)
|
||||
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
def test_verify_security_hub_integration_enabled_per_region(self):
|
||||
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
|
||||
assert verify_security_hub_integration_enabled_per_region(
|
||||
AWS_COMMERCIAL_PARTITION, AWS_REGION_EU_WEST_1, session, AWS_ACCOUNT_NUMBER
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
AWS_REGION_EU_WEST_1
|
||||
)
|
||||
|
||||
def test_verify_security_hub_integration_enabled_per_region_security_hub_disabled(
|
||||
self, caplog
|
||||
):
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
caplog.set_level(WARNING)
|
||||
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
|
||||
|
||||
with patch(
|
||||
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
|
||||
"prowler.lib.outputs.security_hub.security_hub.Session.client",
|
||||
) as mock_security_hub:
|
||||
error_message = f"Account {AWS_ACCOUNT_NUMBER} is not subscribed to AWS Security Hub in region {AWS_REGION_EU_WEST_1}"
|
||||
error_code = "InvalidAccessException"
|
||||
@@ -151,37 +134,33 @@ class Test_SecurityHub:
|
||||
operation_name = "DescribeHub"
|
||||
mock_security_hub.side_effect = ClientError(error_response, operation_name)
|
||||
|
||||
assert not verify_security_hub_integration_enabled_per_region(
|
||||
AWS_COMMERCIAL_PARTITION,
|
||||
assert not security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
AWS_REGION_EU_WEST_1,
|
||||
session,
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
)
|
||||
assert caplog.record_tuples == [
|
||||
(
|
||||
"root",
|
||||
WARNING,
|
||||
f"ClientError -- [70]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
|
||||
f"ClientError -- [86]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
|
||||
)
|
||||
]
|
||||
|
||||
def test_verify_security_hub_integration_enabled_per_region_prowler_not_subscribed(
|
||||
self, caplog
|
||||
):
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
caplog.set_level(WARNING)
|
||||
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
|
||||
|
||||
with patch(
|
||||
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
|
||||
"prowler.lib.outputs.security_hub.security_hub.Session.client",
|
||||
) as mock_security_hub:
|
||||
mock_security_hub.describe_hub.return_value = None
|
||||
mock_security_hub.list_enabled_products_for_import.return_value = []
|
||||
|
||||
assert not verify_security_hub_integration_enabled_per_region(
|
||||
AWS_COMMERCIAL_PARTITION,
|
||||
assert not security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
AWS_REGION_EU_WEST_1,
|
||||
session,
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
)
|
||||
assert caplog.record_tuples == [
|
||||
(
|
||||
@@ -194,11 +173,13 @@ class Test_SecurityHub:
|
||||
def test_verify_security_hub_integration_enabled_per_region_another_ClientError(
|
||||
self, caplog
|
||||
):
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
caplog.set_level(WARNING)
|
||||
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
|
||||
|
||||
with patch(
|
||||
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
|
||||
"prowler.lib.outputs.security_hub.security_hub.Session.client",
|
||||
) as mock_security_hub:
|
||||
error_message = f"Another exception in region {AWS_REGION_EU_WEST_1}"
|
||||
error_code = "AnotherException"
|
||||
@@ -211,58 +192,52 @@ class Test_SecurityHub:
|
||||
operation_name = "DescribeHub"
|
||||
mock_security_hub.side_effect = ClientError(error_response, operation_name)
|
||||
|
||||
assert not verify_security_hub_integration_enabled_per_region(
|
||||
AWS_COMMERCIAL_PARTITION,
|
||||
assert not security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
AWS_REGION_EU_WEST_1,
|
||||
session,
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
)
|
||||
assert caplog.record_tuples == [
|
||||
(
|
||||
"root",
|
||||
ERROR,
|
||||
f"ClientError -- [70]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
|
||||
f"ClientError -- [86]: An error occurred ({error_code}) when calling the {operation_name} operation: {error_message}",
|
||||
)
|
||||
]
|
||||
|
||||
def test_verify_security_hub_integration_enabled_per_region_another_Exception(
|
||||
self, caplog
|
||||
):
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
caplog.set_level(WARNING)
|
||||
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
|
||||
|
||||
with patch(
|
||||
"prowler.providers.aws.lib.security_hub.security_hub.session.Session.client",
|
||||
"prowler.lib.outputs.security_hub.security_hub.Session.client",
|
||||
) as mock_security_hub:
|
||||
error_message = f"Another exception in region {AWS_REGION_EU_WEST_1}"
|
||||
mock_security_hub.side_effect = Exception(error_message)
|
||||
|
||||
assert not verify_security_hub_integration_enabled_per_region(
|
||||
AWS_COMMERCIAL_PARTITION,
|
||||
assert not security_hub.verify_security_hub_integration_enabled_per_region(
|
||||
AWS_REGION_EU_WEST_1,
|
||||
session,
|
||||
AWS_ACCOUNT_NUMBER,
|
||||
)
|
||||
assert caplog.record_tuples == [
|
||||
(
|
||||
"root",
|
||||
ERROR,
|
||||
f"Exception -- [70]: {error_message}",
|
||||
f"Exception -- [86]: {error_message}",
|
||||
)
|
||||
]
|
||||
|
||||
def test_prepare_security_hub_findings_enabled_region_all_statuses(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options()
|
||||
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {
|
||||
AWS_REGION_EU_WEST_1: [get_security_hub_finding("PASSED")],
|
||||
@@ -270,104 +245,109 @@ class Test_SecurityHub:
|
||||
|
||||
def test_prepare_security_hub_findings_all_statuses_MANUAL_finding(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options()
|
||||
findings = [self.generate_finding("MANUAL", AWS_REGION_EU_WEST_1)]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {AWS_REGION_EU_WEST_1: []}
|
||||
|
||||
def test_prepare_security_hub_findings_disabled_region(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options()
|
||||
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_2)]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {AWS_REGION_EU_WEST_1: []}
|
||||
|
||||
def test_prepare_security_hub_findings_PASS_and_FAIL_statuses(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options(status=["FAIL"])
|
||||
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
|
||||
|
||||
args = Namespace()
|
||||
args.status = ["FAIL"]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
|
||||
)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {AWS_REGION_EU_WEST_1: []}
|
||||
|
||||
def test_prepare_security_hub_findings_FAIL_and_FAIL_statuses(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options(status=["FAIL"])
|
||||
findings = [self.generate_finding("FAIL", AWS_REGION_EU_WEST_1)]
|
||||
|
||||
args = Namespace()
|
||||
args.status = ["FAIL"]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
|
||||
)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {AWS_REGION_EU_WEST_1: [get_security_hub_finding("FAILED")]}
|
||||
|
||||
def test_prepare_security_hub_findings_send_sh_only_fails_PASS(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options(send_sh_only_fails=True)
|
||||
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
args = Namespace()
|
||||
args.send_sh_only_fails = True
|
||||
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
|
||||
)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {AWS_REGION_EU_WEST_1: []}
|
||||
|
||||
def test_prepare_security_hub_findings_send_sh_only_fails_FAIL(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options(send_sh_only_fails=True)
|
||||
findings = [self.generate_finding("FAIL", AWS_REGION_EU_WEST_1)]
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
args = Namespace()
|
||||
args.send_sh_only_fails = True
|
||||
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2], arguments=args
|
||||
)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {AWS_REGION_EU_WEST_1: [get_security_hub_finding("FAILED")]}
|
||||
|
||||
def test_prepare_security_hub_findings_no_audited_regions(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options()
|
||||
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {
|
||||
AWS_REGION_EU_WEST_1: [get_security_hub_finding("PASSED")],
|
||||
@@ -375,20 +355,19 @@ class Test_SecurityHub:
|
||||
|
||||
def test_prepare_security_hub_findings_muted_fail_with_send_sh_only_fails(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options(
|
||||
send_sh_only_fails=True,
|
||||
)
|
||||
findings = [
|
||||
self.generate_finding(
|
||||
status="FAIL", region=AWS_REGION_EU_WEST_1, muted=True
|
||||
)
|
||||
]
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
args = Namespace()
|
||||
args.send_sh_only_fails = True
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
aws_provider = set_mocked_aws_provider(arguments=args)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {
|
||||
AWS_REGION_EU_WEST_1: [],
|
||||
@@ -396,18 +375,18 @@ class Test_SecurityHub:
|
||||
|
||||
def test_prepare_security_hub_findings_muted_fail_with_status_FAIL(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options(status=["FAIL"])
|
||||
findings = [
|
||||
self.generate_finding(
|
||||
status="FAIL", region=AWS_REGION_EU_WEST_1, muted=True
|
||||
)
|
||||
]
|
||||
aws_provider = set_mocked_aws_provider()
|
||||
args = Namespace()
|
||||
args.status = ["FAIL"]
|
||||
aws_provider = set_mocked_aws_provider(arguments=args)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
assert prepare_security_hub_findings(
|
||||
assert security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
) == {
|
||||
AWS_REGION_EU_WEST_1: [],
|
||||
@@ -416,24 +395,21 @@ class Test_SecurityHub:
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
def test_batch_send_to_security_hub_one_finding(self):
|
||||
enabled_regions = [AWS_REGION_EU_WEST_1]
|
||||
output_options = self.set_mocked_output_options()
|
||||
findings = [self.generate_finding("PASS", AWS_REGION_EU_WEST_1)]
|
||||
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
audited_regions=[AWS_REGION_EU_WEST_1, AWS_REGION_EU_WEST_2]
|
||||
)
|
||||
session = self.set_mocked_session(AWS_REGION_EU_WEST_1)
|
||||
security_hub = SecurityHub(aws_provider)
|
||||
|
||||
security_hub_findings = prepare_security_hub_findings(
|
||||
security_hub_findings = security_hub.prepare_security_hub_findings(
|
||||
findings,
|
||||
aws_provider,
|
||||
output_options,
|
||||
enabled_regions,
|
||||
)
|
||||
|
||||
assert (
|
||||
batch_send_to_security_hub(
|
||||
security_hub.batch_send_to_security_hub(
|
||||
security_hub_findings,
|
||||
session,
|
||||
)
|
||||
== 1
|
||||
)
|
||||
@@ -153,8 +153,12 @@ def set_mocked_aws_provider(
|
||||
return provider
|
||||
|
||||
|
||||
def set_default_provider_arguments(arguments: Namespace) -> Namespace:
|
||||
def set_default_provider_arguments(input_arguments: Namespace) -> Namespace:
|
||||
|
||||
arguments = Namespace
|
||||
arguments.status = []
|
||||
if hasattr(input_arguments, "status") and input_arguments.status:
|
||||
arguments.status = input_arguments.status
|
||||
arguments.output_formats = []
|
||||
arguments.output_directory = ""
|
||||
arguments.verbose = False
|
||||
@@ -162,7 +166,14 @@ def set_default_provider_arguments(arguments: Namespace) -> Namespace:
|
||||
arguments.unix_timestamp = False
|
||||
arguments.shodan = None
|
||||
arguments.security_hub = False
|
||||
|
||||
arguments.send_sh_only_fails = False
|
||||
if (
|
||||
hasattr(input_arguments, "send_sh_only_fails")
|
||||
and input_arguments.send_sh_only_fails
|
||||
):
|
||||
arguments.send_sh_only_fails = input_arguments.send_sh_only_fails
|
||||
|
||||
arguments.config_file = default_config_file_path
|
||||
arguments.fixer_config = default_fixer_config_file_path
|
||||
|
||||
|
||||
Reference in New Issue
Block a user