chore(CLI): add output functions (#4188)

This commit is contained in:
Pedro Martín
2024-06-07 09:09:45 +02:00
committed by GitHub
parent c026fe09d5
commit e3858cdd19
2 changed files with 316 additions and 96 deletions

View File

@@ -1,8 +1,13 @@
from argparse import Namespace
from typing import List, Optional
import typer
from prowler.config.config import available_compliance_frameworks
from prowler.config.config import (
available_compliance_frameworks,
default_output_directory,
finding_statuses,
)
from prowler.lib.check.check import (
bulk_load_checks_metadata,
bulk_load_compliance_frameworks,
@@ -32,6 +37,7 @@ def check_provider(provider: str):
raise typer.BadParameter(
"Invalid provider. Choose between aws, azure, gcp or kubernetes."
)
return provider
def check_compliance_framework(provider: str, compliance_framework: list):
@@ -56,9 +62,88 @@ def validate_log_level(log_level: str):
return log_level
def split_space_separated_values(value: str) -> List[str]:
output = []
if value:
for item in value:
for input in item.split(" "):
output.append(input)
return output
def validate_status(status: List[str]):
valid_status = []
for s in status:
if s not in finding_statuses:
raise typer.BadParameter(f"Status must be one of {finding_statuses}")
valid_status.append(s)
return valid_status
def validate_output_formats(output_formats: List[str]):
valid_formats = []
valid_output_formats = ["csv", "json-ocsf", "html", "json-asff"]
for output_format in output_formats:
if output_format not in valid_output_formats:
raise typer.BadParameter(
f"Output format must be one of {valid_output_formats}"
)
else:
valid_formats.append(output_format)
return output_formats
class CLI:
def __init__(
self,
provider: str,
list_services: bool,
list_fixers: bool,
list_categories: bool,
list_compliance: bool,
list_compliance_requirements: List[str],
list_checks: bool,
list_checks_json: bool,
log_level: str,
log_file: Optional[str],
only_logs: bool,
status: List[str],
output_formats: List[str],
output_filename: Optional[str],
output_directory: Optional[str],
verbose: bool,
ignore_exit_code_3: bool,
no_banner: bool,
unix_timestamp: bool,
profile: Optional[str],
):
self.provider = provider
self.list_services = list_services
self.list_fixers = list_fixers
self.list_categories = list_categories
self.list_compliance = list_compliance
self.list_compliance_requirements = list_compliance_requirements
self.list_checks = list_checks
self.list_checks_json = list_checks_json
self.log_level = log_level
self.log_file = log_file
self.only_logs = only_logs
self.status = status
self.output_formats = output_formats
self.output_filename = output_filename
self.output_directory = output_directory
self.verbose = verbose
self.ignore_exit_code_3 = ignore_exit_code_3
self.no_banner = no_banner
self.unix_timestamp = unix_timestamp
self.profile = profile
@app.command()
def main(
provider: str = typer.Argument(..., help="The provider to check"),
provider: str = typer.Argument(
..., help="The provider to check", callback=check_provider
),
list_services_bool: bool = typer.Option(
False, "--list-services", help="List the services of the provider"
),
@@ -73,10 +158,11 @@ def main(
"--list-compliance",
help="List the compliance frameworks of the provider",
),
list_compliance_requirements_value: str = typer.Option(
list_compliance_requirements_value: List[str] = typer.Option(
None,
"--list-compliance-requirements",
help="List the compliance requirements of the provider",
callback=split_space_separated_values,
),
list_checks_bool: bool = typer.Option(
False, "--list-checks", help="List the checks of the provider"
@@ -89,131 +175,213 @@ def main(
log_level: str = typer.Option("INFO", "--log-level", help="Set the Log level"),
log_file: str = typer.Option(None, "--log-file", help="Set the Log file"),
only_logs: bool = typer.Option(False, "--only-logs", help="Only show logs"),
status_value: List[str] = typer.Option(
[],
"--status",
help=f"Filter by the status of the findings {finding_statuses}",
callback=split_space_separated_values,
),
output_formats_value: List[str] = typer.Option(
["csv json-ocsf html"],
"--output-formats",
help="Output format for the findings",
callback=split_space_separated_values,
),
output_filename_value: str = typer.Option(
None, "--output-filename", help="Output filename"
),
output_directory_value: str = typer.Option(
None, "--output-directory", help="Output directory"
),
verbose: bool = typer.Option(False, "--verbose", help="Show verbose output"),
ignore_exit_code_3: bool = typer.Option(
False, "--ignore-exit-code-3", help="Ignore exit code 3"
),
no_banner: bool = typer.Option(False, "--no-banner", help="Do not show the banner"),
unix_timestamp: bool = typer.Option(
False, "--unix-timestamp", help="Use Unix timestamp"
),
profile: str = typer.Option(None, "--profile", help="The profile to use"),
):
check_provider(provider)
if list_services_bool:
services = list_services(provider)
# Make sure the values are valid
if status_value:
status_value = validate_status(status_value)
if output_formats_value:
output_formats_value = validate_output_formats(output_formats_value)
if not output_directory_value:
output_directory_value = default_output_directory
options = CLI(
provider,
list_services_bool,
list_fixers_bool,
list_categories_bool,
list_compliance_bool,
list_compliance_requirements_value,
list_checks_bool,
list_checks_json_bool,
log_level,
log_file,
only_logs,
status_value,
output_formats_value,
output_filename_value,
output_directory_value,
verbose,
ignore_exit_code_3,
no_banner,
unix_timestamp,
profile,
)
if options.list_services:
services = list_services(options.provider)
print_services(services)
if list_fixers_bool:
fixers = list_fixers(provider)
if options.list_fixers:
fixers = list_fixers(options.provider)
print_fixers(fixers)
if list_categories_bool:
checks_metadata = bulk_load_checks_metadata(provider)
if options.list_categories:
checks_metadata = bulk_load_checks_metadata(options.provider)
categories = list_categories(checks_metadata)
print_categories(categories)
if list_compliance_bool:
compliance_frameworks = bulk_load_compliance_frameworks(provider)
if options.list_compliance:
compliance_frameworks = bulk_load_compliance_frameworks(options.provider)
print_compliance_frameworks(compliance_frameworks)
if list_compliance_requirements_value:
list_compliance_requirements_value = list_compliance_requirements_value.split(
","
)
if options.list_compliance_requirements:
valid_compliance = check_compliance_framework(
provider, list_compliance_requirements_value
options.provider, options.list_compliance_requirements
)
print_compliance_requirements(
bulk_load_compliance_frameworks(provider),
bulk_load_compliance_frameworks(options.provider),
valid_compliance,
)
if list_checks_bool:
checks_metadata = bulk_load_checks_metadata(provider)
if options.list_checks:
checks_metadata = bulk_load_checks_metadata(options.provider)
checks = load_checks_to_execute(
checks_metadata,
bulk_load_compliance_frameworks(provider),
bulk_load_compliance_frameworks(options.provider),
None,
[],
[],
[],
[],
[],
provider,
options.provider,
)
print_checks(provider, sorted(checks), checks_metadata)
if list_checks_json_bool:
checks_metadata = bulk_load_checks_metadata(provider)
print_checks(options.provider, sorted(checks), checks_metadata)
if options.list_checks_json:
checks_metadata = bulk_load_checks_metadata(options.provider)
checks_to_execute = load_checks_to_execute(
checks_metadata,
bulk_load_compliance_frameworks(provider),
bulk_load_compliance_frameworks(options.provider),
None,
[],
[],
[],
[],
[],
provider,
options.provider,
)
print(list_checks_json(provider, sorted(checks_to_execute)))
if log_level:
set_logging_config(validate_log_level(log_level))
logger.info(f"Log level set to {log_level}")
if log_file:
if log_level:
set_logging_config(validate_log_level(log_level), log_file)
print(list_checks_json(options.provider, sorted(checks_to_execute)))
if options.log_level:
set_logging_config(validate_log_level(options.log_level))
logger.info(f"Log level set to {options.log_level}")
if options.log_file:
if options.log_level:
set_logging_config(validate_log_level(options.log_level), options.log_file)
else:
set_logging_config("INFO", log_file)
logger.info(f"Log file set to {log_file}")
if only_logs:
if log_level:
set_logging_config(validate_log_level(log_level), only_logs=True)
set_logging_config("INFO", options.log_file)
logger.info(f"Log file set to {options.log_file}")
if options.only_logs:
if options.log_level:
set_logging_config(validate_log_level(options.log_level), only_logs=True)
else:
set_logging_config("INFO", only_logs=True)
logger.info("Only logs are shown")
if profile:
# Execute Prowler
checks_to_execute = ["s3_account_level_public_access_blocks"]
# Create the provider
args = Namespace
args.provider = provider
args.profile = profile
args.verbose = False
args.fixer = False
args.only_logs = False
args.status = []
args.output_formats = []
args.output_filename = None
args.unix_timestamp = False
args.output_directory = None
args.shodan = None
args.security_hub = False
args.send_sh_only_fails = False
# args.region = ("eu-west-1")
Provider.set_global_provider(args)
provider = Provider.get_global_provider()
bulk_checks_metadata = bulk_load_checks_metadata(provider.type)
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider.type)
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
provider.output_options = (args, bulk_checks_metadata)
provider.output_options.bulk_checks_metadata = bulk_checks_metadata
scan = Scan(provider, checks_to_execute)
custom_checks_metadata = None
scan_results = scan.scan(custom_checks_metadata)
# Verify where AWS Security Hub is enabled
aws_security_enabled_regions = []
security_hub_regions = (
provider.get_available_aws_service_regions("securityhub")
if not provider.identity.audited_regions
else provider.identity.audited_regions
)
security_hub = SecurityHub(provider)
for region in security_hub_regions:
# Save the regions where AWS Security Hub is enabled
if security_hub.verify_security_hub_integration_enabled_per_region(
region,
):
aws_security_enabled_regions.append(region)
# Prepare the findings to be sent to Security Hub
security_hub_findings_per_region = security_hub.prepare_security_hub_findings(
scan_results,
aws_security_enabled_regions,
)
# Send the findings to Security Hub
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub(
security_hub_findings_per_region
)
print(findings_sent_to_security_hub)
if options.status:
logger.info(f"Filtering by status: {options.status}")
# TODO: Implement filtering by status in a class
if options.output_formats:
logger.info(f"Output formats: {options.output_formats}")
# TODO: Implement output formats in a class
if options.output_filename:
logger.info(f"Output filename: {options.output_filename}")
# TODO: Implement output filename in a class
if options.output_directory:
logger.info(f"Output directory: {options.output_directory}")
# TODO: Implement output directory in a class
if options.verbose:
logger.info("Verbose output is enabled")
if options.ignore_exit_code_3:
logger.info("Ignoring exit code 3")
if options.no_banner:
logger.info("No banner is shown")
if options.unix_timestamp:
logger.info("Using Unix timestamp")
if options.profile:
logger.info(f"Using profile: {options.profile}")
run_scan(options)
return options
def run_scan(options: CLI):
# Execute Prowler
checks_to_execute = ["s3_account_level_public_access_blocks"]
# Create the provider
args = Namespace
args.provider = options.provider
args.profile = options.profile
args.verbose = options.verbose
args.fixer = False
args.only_logs = options.only_logs
args.status = options.status
args.output_formats = options.output_formats
args.output_filename = options.output_filename
args.unix_timestamp = options.unix_timestamp
args.output_directory = options.output_directory
args.shodan = None
args.security_hub = False
args.send_sh_only_fails = False
args.ignore_exit_code_3 = options.ignore_exit_code_3
args.no_banner = options.no_banner
# args.region = ("eu-west-1")
Provider.set_global_provider(args)
provider = Provider.get_global_provider()
bulk_checks_metadata = bulk_load_checks_metadata(provider.type)
bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider.type)
bulk_checks_metadata = update_checks_metadata_with_compliance(
bulk_compliance_frameworks, bulk_checks_metadata
)
provider.output_options = (args, bulk_checks_metadata)
provider.output_options.bulk_checks_metadata = bulk_checks_metadata
scan = Scan(provider, checks_to_execute)
custom_checks_metadata = None
scan_results = scan.scan(custom_checks_metadata)
# Verify where AWS Security Hub is enabled
aws_security_enabled_regions = []
security_hub_regions = (
provider.get_available_aws_service_regions("securityhub")
if not provider.identity.audited_regions
else provider.identity.audited_regions
)
security_hub = SecurityHub(provider)
for region in security_hub_regions:
# Save the regions where AWS Security Hub is enabled
if security_hub.verify_security_hub_integration_enabled_per_region(
region,
):
aws_security_enabled_regions.append(region)
# Prepare the findings to be sent to Security Hub
security_hub_findings_per_region = security_hub.prepare_security_hub_findings(
scan_results,
aws_security_enabled_regions,
)
# Send the findings to Security Hub
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub(
security_hub_findings_per_region
)
print(findings_sent_to_security_hub)
if __name__ == "__main__":

View File

@@ -31,7 +31,7 @@ class TestCLI:
def test_list_compliance_requirements_aws(self):
result = runner.invoke(
app, ["aws", "--list-compliance-requirements", "cis_2.0_aws,soc2_aws"]
app, ["aws", "--list-compliance-requirements", "cis_2.0_aws soc2_aws"]
)
assert result.exit_code == 0
assert "Listing CIS 2.0 AWS Compliance Requirements:" in result.output
@@ -46,7 +46,7 @@ class TestCLI:
invalid_name = "invalid"
result = runner.invoke(
app,
["aws", "--list-compliance-requirements", f"cis_2.0_aws,{invalid_name}"],
["aws", "--list-compliance-requirements", f"cis_2.0_aws {invalid_name}"],
)
assert result.exit_code == 0
assert "Listing CIS 2.0 AWS Compliance Requirements:" in result.output
@@ -93,3 +93,55 @@ class TestCLI:
def test_only_logs(self):
result = runner.invoke(app, ["aws", "--only-logs"])
assert result.exit_code == 0
def test_status(self):
result = runner.invoke(app, ["aws", "--status", "PASS"])
assert result.exit_code == 0
def test_status_invalid(self):
result = runner.invoke(app, ["aws", "--status", "INVALID"])
assert result.exit_code == 2
assert "Status must be one of" in result.output
def test_status_no_value(self):
result = runner.invoke(app, ["aws", "--status"])
assert result.exit_code == 2
assert "Option '--status' requires an argument." in result.output
def test_outputs_formats(self):
result = runner.invoke(app, ["aws", "--output-filename", "csv html"])
assert result.exit_code == 0
def test_outputs_formats_no_value(self):
result = runner.invoke(app, ["aws", "--output-filename"])
assert result.exit_code == 2
assert "Option '--output-filename' requires an argument." in result.output
def test_output_directory(self):
result = runner.invoke(app, ["aws", "--output-directory", "test"])
assert result.exit_code == 0
def test_output_directory_no_value(self):
result = runner.invoke(app, ["aws", "--output-directory"])
assert result.exit_code == 2
assert "Option '--output-directory' requires an argument." in result.output
def test_verbose(self):
result = runner.invoke(app, ["aws", "--verbose"])
assert result.exit_code == 0
def test_ignore_exit_code_3(self):
result = runner.invoke(app, ["aws", "--ignore-exit-code-3"])
assert result.exit_code == 0
def test_no_banner(self):
result = runner.invoke(app, ["aws", "--no-banner"])
assert result.exit_code == 0
def test_unix_timestamp(self):
result = runner.invoke(app, ["aws", "--unix-timestamp"])
assert result.exit_code == 0
def test_profile(self):
result = runner.invoke(app, ["aws", "--profile", "test"])
assert result.exit_code == 0