Compare commits

...

1 Commits

Author SHA1 Message Date
pedrooot
82e501fc08 refactor(parser): remove --only-logs option 2024-09-12 11:02:05 +02:00
12 changed files with 84 additions and 170 deletions

View File

@@ -119,7 +119,7 @@ def prowler():
args.output_formats.extend(get_available_compliance_frameworks(provider))
# Set Logger configuration
set_logging_config(args.log_level, args.log_file, args.only_logs)
set_logging_config(args.log_level, args.log_file)
if args.list_services:
print_services(list_services(provider))
@@ -194,8 +194,7 @@ def prowler():
global_provider = Provider.get_global_provider()
# Print Provider Credentials
if not args.only_logs:
global_provider.print_credentials()
global_provider.print_credentials()
# Import custom checks from folder
if checks_folder:
@@ -616,37 +615,36 @@ def prowler():
)
# Display summary table
if not args.only_logs:
display_summary_table(
findings,
global_provider,
global_provider.output_options,
)
# Only display compliance table if there are findings (not all MANUAL) and it is a default execution
if (
findings and not all(finding.status == "MANUAL" for finding in findings)
) and default_execution:
compliance_overview = False
if not compliance_framework:
compliance_framework = get_available_compliance_frameworks(provider)
if (
compliance_framework
): # If there are compliance frameworks, print compliance overview
compliance_overview = True
for compliance in sorted(compliance_framework):
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance,
global_provider.output_options.output_filename,
global_provider.output_options.output_directory,
compliance_overview,
)
if compliance_overview:
print(
f"\nDetailed compliance results are in {Fore.YELLOW}{global_provider.output_options.output_directory}/compliance/{Style.RESET_ALL}\n"
)
display_summary_table(
findings,
global_provider,
global_provider.output_options,
)
# Only display compliance table if there are findings (not all MANUAL) and it is a default execution
if (
findings and not all(finding.status == "MANUAL" for finding in findings)
) and default_execution:
compliance_overview = False
if not compliance_framework:
compliance_framework = get_available_compliance_frameworks(provider)
if (
compliance_framework
): # If there are compliance frameworks, print compliance overview
compliance_overview = True
for compliance in sorted(compliance_framework):
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance,
global_provider.output_options.output_filename,
global_provider.output_options.output_directory,
compliance_overview,
)
if compliance_overview:
print(
f"\nDetailed compliance results are in {Fore.YELLOW}{global_provider.output_options.output_directory}/compliance/{Style.RESET_ALL}\n"
)
# If custom checks were passed, remove the modules
if checks_folder:

View File

@@ -325,12 +325,12 @@ def import_check(check_path: str) -> ModuleType:
return lib
def run_check(check: Check, verbose: bool = False, only_logs: bool = False) -> list:
def run_check(check: Check, verbose: bool = False) -> list:
"""
Run the check and return the findings
Args:
check (Check): check class
output_options (Any): output options
check (Check): check object
verbose (bool): verbose output
Returns:
list: list of findings
"""
@@ -343,10 +343,7 @@ def run_check(check: Check, verbose: bool = False, only_logs: bool = False) -> l
try:
findings = check.execute()
except Exception as error:
if not only_logs:
print(
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
)
print(f"Something went wrong in {check.CheckID}, please use --log-level ERROR")
logger.error(
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
)
@@ -470,11 +467,39 @@ def execute_checks(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
# Execution with the --only-logs flag
if global_provider.output_options.only_logs:
# Prepare your messages
messages = [f"Config File: {Fore.YELLOW}{config_file}{Style.RESET_ALL}"]
if global_provider.mutelist.mutelist_file_path:
messages.append(
f"Mutelist File: {Fore.YELLOW}{global_provider.mutelist.mutelist_file_path}{Style.RESET_ALL}"
)
if global_provider.type == "aws":
messages.append(
f"Scanning unused services and resources: {Fore.YELLOW}{global_provider.scan_unused_services}{Style.RESET_ALL}"
)
report_title = f"{Style.BRIGHT}Using the following configuration:{Style.RESET_ALL}"
print_boxes(messages, report_title)
# Default execution
checks_num = len(checks_to_execute)
plural_string = "checks"
singular_string = "check"
check_noun = plural_string if checks_num > 1 else singular_string
print(
f"{Style.BRIGHT}Executing {checks_num} {check_noun}, please wait...{Style.RESET_ALL}"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
try:
check_findings = execute(
service,
@@ -483,90 +508,27 @@ def execute_checks(
custom_checks_metadata,
)
all_findings.extend(check_findings)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata, services_executed, checks_executed
global_provider.audit_metadata,
services_executed,
checks_executed,
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
)
except Exception as error:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
# Prepare your messages
messages = [f"Config File: {Fore.YELLOW}{config_file}{Style.RESET_ALL}"]
if global_provider.mutelist.mutelist_file_path:
messages.append(
f"Mutelist File: {Fore.YELLOW}{global_provider.mutelist.mutelist_file_path}{Style.RESET_ALL}"
)
if global_provider.type == "aws":
messages.append(
f"Scanning unused services and resources: {Fore.YELLOW}{global_provider.scan_unused_services}{Style.RESET_ALL}"
)
report_title = (
f"{Style.BRIGHT}Using the following configuration:{Style.RESET_ALL}"
)
print_boxes(messages, report_title)
# Default execution
checks_num = len(checks_to_execute)
plural_string = "checks"
singular_string = "check"
check_noun = plural_string if checks_num > 1 else singular_string
print(
f"{Style.BRIGHT}Executing {checks_num} {check_noun}, please wait...{Style.RESET_ALL}"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = (
f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
)
try:
check_findings = execute(
service,
check_name,
global_provider,
custom_checks_metadata,
)
all_findings.extend(check_findings)
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata,
services_executed,
checks_executed,
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
)
except Exception as error:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bar()
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
bar()
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
return all_findings
@@ -597,9 +559,7 @@ def execute(
global_provider.output_options.verbose
or global_provider.output_options.fixer
)
check_findings = run_check(
check_class, verbose, global_provider.output_options.only_logs
)
check_findings = run_check(check_class, verbose)
# Exclude findings per status
if global_provider.output_options.status:

View File

@@ -112,7 +112,7 @@ Detailed documentation at https://docs.prowler.com
)
# Only Logging Configuration
if args.provider != "dashboard" and (args.only_logs or args.list_checks_json):
if args.provider != "dashboard" and args.list_checks_json:
args.no_banner = True
# Extra validation for provider arguments
@@ -201,11 +201,6 @@ Detailed documentation at https://docs.prowler.com
nargs="?",
help="Set log file name",
)
common_logging_parser.add_argument(
"--only-logs",
action="store_true",
help="Print only Prowler logs by the stdout. This option sets --no-banner.",
)
def __init_exclude_checks_parser__(self):
# Exclude checks options

View File

@@ -11,7 +11,13 @@ logging_levels = {
}
def set_logging_config(log_level: str, log_file: str = None, only_logs: bool = False):
def set_logging_config(log_level: str, log_file: str = None):
"""
Set the logging configuration for the application
Args:
log_level (str): Log level
log_file (str): Log file path
"""
# Logs formatter
stream_formatter = logging.Formatter(
"\n%(asctime)s [File: %(filename)s:%(lineno)d] \t[Module: %(module)s]\t %(levelname)s: %(message)s"
@@ -23,12 +29,9 @@ def set_logging_config(log_level: str, log_file: str = None, only_logs: bool = F
# Where to put logs
logging_handlers = []
# Include stdout by default, if only_logs is set the log format is JSON
# Include stdout by default for local development
stream_handler = logging.StreamHandler()
if only_logs:
stream_handler.setFormatter(log_file_formatter)
else:
stream_handler.setFormatter(stream_formatter)
stream_handler.setFormatter(stream_formatter)
logging_handlers.append(stream_handler)
# Log to file configuration

View File

@@ -24,7 +24,6 @@ class ProviderOutputOptions:
bulk_checks_metadata: dict
verbose: str
output_filename: str
only_logs: bool
unix_timestamp: bool
def __init__(self, arguments, bulk_checks_metadata):
@@ -33,7 +32,6 @@ class ProviderOutputOptions:
self.output_directory = arguments.output_directory
self.verbose = arguments.verbose
self.bulk_checks_metadata = bulk_checks_metadata
self.only_logs = arguments.only_logs
self.unix_timestamp = arguments.unix_timestamp
self.shodan_api_key = arguments.shodan
self.fixer = getattr(arguments, "fixer", None)

View File

@@ -891,27 +891,6 @@ class TestCheck:
== f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity}]{Style.RESET_ALL}\n"
)
def test_run_check_exception_only_logs(self, caplog):
caplog.set_level(ERROR)
findings = []
check = Mock()
check.CheckID = "test-check"
check.ServiceName = "test-service"
check.Severity = "test-severity"
error = Exception()
check.execute = Mock(side_effect=error)
with patch("prowler.lib.check.check.execute", return_value=findings):
assert run_check(check, only_logs=True) == findings
assert caplog.record_tuples == [
(
"root",
ERROR,
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}",
)
]
def test_run_check_exception(self, caplog, capsys):
caplog.set_level(ERROR)

View File

@@ -56,7 +56,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -104,7 +103,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -144,7 +142,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -179,7 +176,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -365,12 +361,6 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.unix_timestamp
def test_logging_parser_only_logs_set(self):
command = [prowler_command, "--only-logs"]
parsed = self.parser.parse(command)
assert parsed.only_logs
assert parsed.no_banner
def test_logging_parser_log_level_default(self):
log_level = "CRITICAL"
command = [prowler_command]

View File

@@ -1048,7 +1048,6 @@ aws:
arguments.verbose = True
arguments.security_hub = True
arguments.shodan = "test-api-key"
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.send_sh_only_fails = True
@@ -1097,7 +1096,6 @@ aws:
arguments.output_filename = "output_test_filename"
arguments.security_hub = True
arguments.shodan = "test-api-key"
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.send_sh_only_fails = True

View File

@@ -168,7 +168,6 @@ def set_default_provider_arguments(
arguments.output_formats = []
arguments.output_directory = ""
arguments.verbose = False
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = None
arguments.security_hub = False

View File

@@ -225,7 +225,6 @@ class TestAzureProvider:
output_directory = arguments.output_directory
arguments.status = []
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = "test-api-key"

View File

@@ -87,7 +87,6 @@ class TestGCPProvider:
arguments.output_formats = ["csv"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = "test-api-key"
@@ -180,7 +179,6 @@ class TestGCPProvider:
arguments.output_formats = ["csv"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = "test-api-key"

View File

@@ -49,7 +49,6 @@ class TestKubernetesProvider:
arguments = Namespace()
arguments.kubeconfig_file = "dummy_path"
arguments.context = None
arguments.only_logs = False
arguments.namespace = None
audit_config = load_and_validate_config_file(
"kubernetes", default_config_file_path
@@ -84,7 +83,6 @@ class TestKubernetesProvider:
arguments = Namespace()
arguments.kubeconfig_file = "dummy_path"
arguments.context = None
arguments.only_logs = False
arguments.namespace = None
arguments.config_file = default_config_file_path
arguments.fixer_config = default_fixer_config_file_path
@@ -93,7 +91,6 @@ class TestKubernetesProvider:
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.output_filename = "output_test_filename"
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = "test-api-key"