Compare commits

...

2 Commits

Author SHA1 Message Date
pedrooot
b0141aad21 feat(only_logs): remove from parser 2024-11-28 15:37:33 +01:00
pedrooot
6f60d80b2c feat(only_logs): deprecate only_logs tag 2024-11-28 14:37:52 +01:00
11 changed files with 92 additions and 199 deletions

View File

@@ -130,7 +130,7 @@ def prowler():
args.output_formats.extend(get_available_compliance_frameworks(provider))
# Set Logger configuration
set_logging_config(args.log_level, args.log_file, args.only_logs)
set_logging_config(args.log_level, args.log_file)
if args.list_services:
print_services(list_services(provider))
@@ -205,8 +205,7 @@ def prowler():
global_provider = Provider.get_global_provider()
# Print Provider Credentials
if not args.only_logs:
global_provider.print_credentials()
global_provider.print_credentials()
# Import custom checks from folder
if checks_folder:
@@ -687,37 +686,36 @@ def prowler():
)
# Display summary table
if not args.only_logs:
display_summary_table(
findings,
global_provider,
output_options,
)
# Only display compliance table if there are findings (not all MANUAL) and it is a default execution
if (
findings and not all(finding.status == "MANUAL" for finding in findings)
) and default_execution:
compliance_overview = False
if not compliance_framework:
compliance_framework = get_available_compliance_frameworks(provider)
if (
compliance_framework
): # If there are compliance frameworks, print compliance overview
compliance_overview = True
for compliance in sorted(compliance_framework):
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance,
output_options.output_filename,
output_options.output_directory,
compliance_overview,
)
if compliance_overview:
print(
f"\nDetailed compliance results are in {Fore.YELLOW}{output_options.output_directory}/compliance/{Style.RESET_ALL}\n"
)
display_summary_table(
findings,
global_provider,
output_options,
)
# Only display compliance table if there are findings (not all MANUAL) and it is a default execution
if (
findings and not all(finding.status == "MANUAL" for finding in findings)
) and default_execution:
compliance_overview = False
if not compliance_framework:
compliance_framework = get_available_compliance_frameworks(provider)
if (
compliance_framework
): # If there are compliance frameworks, print compliance overview
compliance_overview = True
for compliance in sorted(compliance_framework):
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance,
output_options.output_filename,
output_options.output_directory,
compliance_overview,
)
if compliance_overview:
print(
f"\nDetailed compliance results are in {Fore.YELLOW}{output_options.output_directory}/compliance/{Style.RESET_ALL}\n"
)
# If custom checks were passed, remove the modules
if checks_folder:

View File

@@ -422,11 +422,39 @@ def execute_checks(
elif hasattr(output_options, "fixer"):
verbose = output_options.fixer
# Execution with the --only-logs flag
if output_options.only_logs:
# Prepare your messages
messages = [f"Config File: {Fore.YELLOW}{config_file}{Style.RESET_ALL}"]
if global_provider.mutelist.mutelist_file_path:
messages.append(
f"Mutelist File: {Fore.YELLOW}{global_provider.mutelist.mutelist_file_path}{Style.RESET_ALL}"
)
if global_provider.type == "aws":
messages.append(
f"Scanning unused services and resources: {Fore.YELLOW}{global_provider.scan_unused_services}{Style.RESET_ALL}"
)
report_title = f"{Style.BRIGHT}Using the following configuration:{Style.RESET_ALL}"
print_boxes(messages, report_title)
# Default execution
checks_num = len(checks_to_execute)
plural_string = "checks"
singular_string = "check"
check_noun = plural_string if checks_num > 1 else singular_string
print(
f"{Style.BRIGHT}Executing {checks_num} {check_noun}, please wait...{Style.RESET_ALL}"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
try:
try:
# Import check module
@@ -450,111 +478,31 @@ def execute_checks(
custom_checks_metadata,
output_options,
)
report(check_findings, global_provider, output_options)
all_findings.extend(check_findings)
# Update Audit Status
report(check_findings, global_provider, output_options)
all_findings.extend(check_findings)
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata, services_executed, checks_executed
global_provider.audit_metadata,
services_executed,
checks_executed,
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
)
except Exception as error:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
# Prepare your messages
messages = [f"Config File: {Fore.YELLOW}{config_file}{Style.RESET_ALL}"]
if global_provider.mutelist.mutelist_file_path:
messages.append(
f"Mutelist File: {Fore.YELLOW}{global_provider.mutelist.mutelist_file_path}{Style.RESET_ALL}"
)
if global_provider.type == "aws":
messages.append(
f"Scanning unused services and resources: {Fore.YELLOW}{global_provider.scan_unused_services}{Style.RESET_ALL}"
)
report_title = (
f"{Style.BRIGHT}Using the following configuration:{Style.RESET_ALL}"
)
print_boxes(messages, report_title)
# Default execution
checks_num = len(checks_to_execute)
plural_string = "checks"
singular_string = "check"
check_noun = plural_string if checks_num > 1 else singular_string
print(
f"{Style.BRIGHT}Executing {checks_num} {check_noun}, please wait...{Style.RESET_ALL}"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = (
f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
)
try:
try:
# Import check module
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
except ModuleNotFoundError:
logger.error(
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
)
continue
if verbose:
print(
f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity.value}]{Style.RESET_ALL}"
)
check_findings = execute(
check,
global_provider,
custom_checks_metadata,
output_options,
)
report(check_findings, global_provider, output_options)
all_findings.extend(check_findings)
services_executed.add(service)
checks_executed.add(check_name)
global_provider.audit_metadata = update_audit_metadata(
global_provider.audit_metadata,
services_executed,
checks_executed,
)
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
)
except Exception as error:
# TODO: add more loggin here, we need the original exception -- traceback.print_last()
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bar()
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
bar()
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
# Custom report interface
if os.environ.get("PROWLER_REPORT_LIB_PATH"):
@@ -600,20 +548,15 @@ def execute(
check, custom_checks_metadata["Checks"][check.CheckID]
)
only_logs = False
if hasattr(output_options, "only_logs"):
only_logs = output_options.only_logs
# Execute the check
check_findings = []
logger.debug(f"Executing check: {check.CheckID}")
try:
check_findings = check.execute()
except Exception as error:
if not only_logs:
print(
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
)
print(
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
)
logger.error(
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
)

View File

@@ -112,7 +112,7 @@ Detailed documentation at https://docs.prowler.com
)
# Only Logging Configuration
if args.provider != "dashboard" and (args.only_logs or args.list_checks_json):
if args.provider != "dashboard" and args.list_checks_json:
args.no_banner = True
# Extra validation for provider arguments
@@ -207,11 +207,6 @@ Detailed documentation at https://docs.prowler.com
nargs="?",
help="Set log file name",
)
common_logging_parser.add_argument(
"--only-logs",
action="store_true",
help="Print only Prowler logs by the stdout. This option sets --no-banner.",
)
def __init_exclude_checks_parser__(self):
# Exclude checks options

View File

@@ -11,7 +11,20 @@ logging_levels = {
}
def set_logging_config(log_level: str, log_file: str = None, only_logs: bool = False):
def set_logging_config(log_level: str, log_file: str = None):
"""
Set the logging configuration
Args:
log_level (str): Log level
log_file (str): Log file path
Returns:
None
Example:
>>> set_logging_config("DEBUG", "prowler.log")
"""
# Logs formatter
stream_formatter = logging.Formatter(
"\n%(asctime)s [File: %(filename)s:%(lineno)d] \t[Module: %(module)s]\t %(levelname)s: %(message)s"
@@ -23,12 +36,9 @@ def set_logging_config(log_level: str, log_file: str = None, only_logs: bool = F
# Where to put logs
logging_handlers = []
# Include stdout by default, if only_logs is set the log format is JSON
# Include stdout by default
stream_handler = logging.StreamHandler()
if only_logs:
stream_handler.setFormatter(log_file_formatter)
else:
stream_handler.setFormatter(stream_formatter)
stream_handler.setFormatter(stream_formatter)
logging_handlers.append(stream_handler)
# Log to file configuration

View File

@@ -24,7 +24,6 @@ class ProviderOutputOptions:
bulk_checks_metadata: dict
verbose: str
output_filename: str
only_logs: bool
unix_timestamp: bool
def __init__(self, arguments, bulk_checks_metadata):
@@ -33,7 +32,6 @@ class ProviderOutputOptions:
self.output_directory = arguments.output_directory
self.verbose = arguments.verbose
self.bulk_checks_metadata = bulk_checks_metadata
self.only_logs = arguments.only_logs
self.unix_timestamp = arguments.unix_timestamp
self.shodan_api_key = arguments.shodan
self.fixer = getattr(arguments, "fixer", None)

View File

@@ -2,7 +2,6 @@ import json
import os
import pathlib
from importlib.machinery import FileFinder
from logging import ERROR
from pkgutil import ModuleInfo
from unittest import mock
@@ -14,7 +13,6 @@ from prowler.lib.check.check import (
exclude_checks_to_run,
exclude_services_to_run,
execute,
execute_checks,
list_categories,
list_checks_json,
list_services,
@@ -940,33 +938,3 @@ class TestCheck:
assert (
check_id == check_dir
), f"CheckID in metadata does not match the check name in {check_directory}. Found CheckID: {check_id}"
def test_execute_check_exception_only_logs(self, caplog):
caplog.set_level(ERROR)
findings = []
check = Mock()
checks = ["test-check"]
provider = mock.MagicMock()
provider.type = "aws"
output_options = mock.MagicMock()
output_options.only_logs = True
error = Exception()
check.execute = Mock(side_effect=error)
with patch("prowler.lib.check.check.execute", return_value=findings):
assert (
execute_checks(
checks,
provider,
custom_checks_metadata=None,
config_file=None,
output_options=output_options,
)
== findings
)
assert caplog.record_tuples == [
("root", 40, f"Check '{checks[0]}' was not found for the AWS provider")
]

View File

@@ -58,7 +58,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -107,7 +106,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -148,7 +146,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -184,7 +181,6 @@ class Test_Parser:
assert not parsed.unix_timestamp
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.check
assert not parsed.checks_file
assert not parsed.checks_folder
@@ -375,12 +371,6 @@ class Test_Parser:
parsed = self.parser.parse(command)
assert parsed.unix_timestamp
def test_logging_parser_only_logs_set(self):
command = [prowler_command, "--only-logs"]
parsed = self.parser.parse(command)
assert parsed.only_logs
assert parsed.no_banner
def test_logging_parser_log_level_default(self):
log_level = "CRITICAL"
command = [prowler_command]

View File

@@ -22,7 +22,6 @@ class Test_Output_Options:
arguments.verbose = True
arguments.security_hub = True
arguments.shodan = None
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.send_sh_only_fails = True
@@ -37,7 +36,6 @@ class Test_Output_Options:
assert output_options.verbose
assert output_options.security_hub_enabled
assert not output_options.shodan_api_key
assert not output_options.only_logs
assert not output_options.unix_timestamp
assert output_options.send_sh_only_fails
assert (
@@ -59,7 +57,6 @@ class Test_Output_Options:
arguments.output_filename = "output_test_filename"
arguments.security_hub = True
arguments.shodan = None
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.send_sh_only_fails = True
@@ -90,7 +87,6 @@ class Test_Output_Options:
output_directory = arguments.output_directory
arguments.status = []
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = None
@@ -127,7 +123,6 @@ class Test_Output_Options:
arguments.output_formats = ["csv"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = None
@@ -160,7 +155,6 @@ class Test_Output_Options:
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.output_filename = "output_test_filename"
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = None

View File

@@ -164,7 +164,6 @@ def set_default_provider_arguments(
arguments.output_formats = []
arguments.output_directory = ""
arguments.verbose = False
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = None
arguments.security_hub = False

View File

@@ -100,7 +100,6 @@ class TestGCPProvider:
arguments.output_formats = ["csv"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
arguments.unix_timestamp = False
arguments.shodan = "test-api-key"

View File

@@ -50,7 +50,6 @@ class TestKubernetesProvider:
arguments = Namespace()
arguments.kubeconfig_file = "dummy_path"
arguments.context = None
arguments.only_logs = False
arguments.namespace = None
fixer_config = load_and_validate_config_file(
"kubernetes", default_fixer_config_file_path