feat(only_logs): New logging flag to only show execution logs (#1708)

This commit is contained in:
Pepe Fagoaga
2023-01-17 10:13:09 +01:00
committed by GitHub
parent 0cbe80d2ab
commit e7796268b5
11 changed files with 160 additions and 76 deletions

View File

@@ -24,4 +24,9 @@ ENV PATH="$HOME/.local/bin:$PATH"
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir .
# Remove Prowler directory
USER 0
RUN rm -rf /home/prowler/prowler /home/prowler/pyproject.toml
USER prowler
ENTRYPOINT ["prowler"]

View File

@@ -22,13 +22,10 @@ from prowler.lib.check.checks_loader import load_checks_to_execute
from prowler.lib.check.compliance import update_checks_metadata_with_compliance
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.outputs import (
extract_findings_statistics,
send_to_s3_bucket,
)
from prowler.lib.outputs.compliance import display_compliance_table
from prowler.lib.outputs.html import add_html_footer, fill_html_overview_statistics
from prowler.lib.outputs.json import close_json
from prowler.lib.outputs.outputs import extract_findings_statistics, send_to_s3_bucket
from prowler.lib.outputs.summary_table import display_summary_table
from prowler.providers.aws.lib.allowlist.allowlist import parse_allowlist_file
from prowler.providers.aws.lib.quick_inventory.quick_inventory import quick_inventory
@@ -60,9 +57,9 @@ def prowler():
args.output_modes.extend(compliance_framework)
# Set Logger configuration
set_logging_config(args.log_file, args.log_level)
set_logging_config(args.log_level, args.log_file, args.only_logs)
if args.no_banner:
if not args.no_banner:
print_banner(args)
if args.list_services:
@@ -203,23 +200,24 @@ def prowler():
resolve_security_hub_previous_findings(args.output_directory, audit_info)
# Display summary table
display_summary_table(
findings,
audit_info,
audit_output_options,
provider,
)
if compliance_framework and findings:
# Display compliance table
display_compliance_table(
if not args.only_logs:
display_summary_table(
findings,
bulk_checks_metadata,
compliance_framework,
audit_output_options.output_filename,
audit_output_options.output_directory,
audit_info,
audit_output_options,
provider,
)
if compliance_framework and findings:
# Display compliance table
display_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
audit_output_options.output_filename,
audit_output_options.output_directory,
)
if __name__ == "__main__":
prowler()

View File

@@ -313,44 +313,86 @@ def execute_checks(
audit_output_options: Provider_Output_Options,
) -> list:
all_findings = []
print(
f"{Style.BRIGHT}Executing {len(checks_to_execute)} checks, please wait...{Style.RESET_ALL}\n"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
# Execution with the --only-logs flag
if audit_output_options.only_logs:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
try:
# Import check module
check_module_path = f"prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
check_findings = run_check(c, audit_output_options)
check_findings = execute(
service, check_name, provider, audit_output_options, audit_info
)
all_findings.extend(check_findings)
report(check_findings, audit_output_options, audit_info)
bar()
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.critical(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)
bar.title = f"-> {Fore.RED}Scan was aborted!{Style.RESET_ALL}"
sys.exit()
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
else:
# Default execution
print(
f"{Style.BRIGHT}Executing {len(checks_to_execute)} checks, please wait...{Style.RESET_ALL}\n"
)
with alive_bar(
total=len(checks_to_execute),
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
for check_name in checks_to_execute:
# Recover service from check name
service = check_name.split("_")[0]
bar.title = (
f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
)
try:
check_findings = execute(
service, check_name, provider, audit_output_options, audit_info
)
all_findings.extend(check_findings)
bar()
# If check does not exists in the provider or is from another provider
except ModuleNotFoundError:
logger.critical(
f"Check '{check_name}' was not found for the {provider.upper()} provider"
)
bar.title = f"-> {Fore.RED}Scan was aborted!{Style.RESET_ALL}"
sys.exit()
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
return all_findings
def execute(
service,
check_name: str,
provider: str,
audit_output_options: Provider_Output_Options,
audit_info: AWS_Audit_Info,
):
# Import check module
check_module_path = (
f"prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
)
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
# Run check
check_findings = run_check(c, audit_output_options)
report(check_findings, audit_output_options, audit_info)
return check_findings

View File

@@ -79,6 +79,10 @@ Detailed documentation at https://docs.prowler.cloud
"A provider is required to see its specific help options."
)
# Only Logging Configuration
if args.only_logs:
args.no_banner = True
return args
def __set_default_provider__(self, args: list) -> list:
@@ -127,7 +131,7 @@ Detailed documentation at https://docs.prowler.cloud
help="Display detailed information about findings",
)
common_outputs_parser.add_argument(
"-b", "--no-banner", action="store_false", help="Hide Prowler banner"
"-b", "--no-banner", action="store_true", help="Hide Prowler banner"
)
def __init_logging_parser__(self):
@@ -147,6 +151,11 @@ Detailed documentation at https://docs.prowler.cloud
nargs="?",
help="Set log file name",
)
common_logging_parser.add_argument(
"--only-logs",
action="store_true",
help="Print only Prowler logs by the stdout. This option sets --no-banner.",
)
def __init_exclude_checks_parser__(self):
# Exclude checks options

View File

@@ -1,4 +1,5 @@
import logging
from os import environ
# Logging levels
logging_levels = {
@@ -10,7 +11,7 @@ logging_levels = {
}
def set_logging_config(log_file: str = None, log_level: str = "ERROR"):
def set_logging_config(log_level: str, log_file: str = None, only_logs: bool = False):
# Logs formatter
stream_formatter = logging.Formatter(
"%(asctime)s [File: %(filename)s:%(lineno)d] \t[Module: %(module)s]\t %(levelname)s: %(message)s"
@@ -22,9 +23,12 @@ def set_logging_config(log_file: str = None, log_level: str = "ERROR"):
# Where to put logs
logging_handlers = []
# Include stdout by default
# Include stdout by default, if only_logs is set the log format is JSON
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(stream_formatter)
if only_logs:
stream_handler.setFormatter(log_file_formatter)
else:
stream_handler.setFormatter(stream_formatter)
logging_handlers.append(stream_handler)
# Log to file configuration
@@ -35,6 +39,12 @@ def set_logging_config(log_file: str = None, log_level: str = "ERROR"):
# Append the log formatter
logging_handlers.append(log_file_handler)
# Set Log Level, environment takes precedence over the --log-level argument
try:
log_level = environ["LOG_LEVEL"]
except KeyError:
log_level = log_level
# Configure Logger
# Initialize you log configuration using the base class
# https://docs.python.org/3/library/logging.html#logrecord-attributes

View File

@@ -1,5 +1,5 @@
import sys
from os import path
from prowler.config.config import (
html_file_suffix,
html_logo_img,
@@ -263,12 +263,14 @@ def fill_html_overview_statistics(stats, output_filename, output_directory):
def add_html_footer(output_filename, output_directory):
try:
filename = f"{output_directory}/{output_filename}{html_file_suffix}"
file_descriptor = open_file(
filename,
"a",
)
file_descriptor.write(
"""
# Close HTML file if exists
if path.isfile(filename):
file_descriptor = open_file(
filename,
"a",
)
file_descriptor.write(
"""
</tbody>
</table>
</div>
@@ -352,8 +354,8 @@ def add_html_footer(output_filename, output_directory):
</html>
"""
)
file_descriptor.close()
)
file_descriptor.close()
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"

View File

@@ -57,21 +57,24 @@ def fill_json_asff(finding_output, audit_info, finding):
def close_json(output_filename, output_directory, mode):
"""close_json closes the output JSON file replacing the last comma with ]"""
try:
suffix = json_file_suffix
if mode == "json-asff":
suffix = json_asff_file_suffix
filename = f"{output_directory}/{output_filename}{suffix}"
file_descriptor = open_file(
filename,
"a",
)
# Replace last comma for square bracket if not empty
if file_descriptor.tell() > 0:
file_descriptor.seek(file_descriptor.tell() - 1, os.SEEK_SET)
file_descriptor.truncate()
file_descriptor.write("]")
file_descriptor.close()
# Close JSON file if exists
if os.path.isfile(filename):
file_descriptor = open_file(
filename,
"a",
)
# Replace last comma for square bracket if not empty
if file_descriptor.tell() > 0:
file_descriptor.seek(file_descriptor.tell() - 1, os.SEEK_SET)
file_descriptor.truncate()
file_descriptor.write("]")
file_descriptor.close()
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"

View File

@@ -132,8 +132,8 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
caller_identity = self.validate_credentials(current_audit_info.original_session)
logger.info("Credentials validated")
logger.info(f"Original caller identity UserId : {caller_identity['UserId']}")
logger.info(f"Original caller identity ARN : {caller_identity['Arn']}")
logger.info(f"Original caller identity UserId: {caller_identity['UserId']}")
logger.info(f"Original caller identity ARN: {caller_identity['Arn']}")
current_audit_info.audited_account = caller_identity["Account"]
current_audit_info.audited_identity_arn = caller_identity["Arn"]
@@ -233,7 +233,9 @@ Caller Identity ARN: {Fore.YELLOW}[{audit_info.audited_identity_arn}]{Style.RESE
else:
current_audit_info.profile_region = "us-east-1"
self.print_audit_credentials(current_audit_info)
if not arguments.get("only_logs"):
self.print_audit_credentials(current_audit_info)
return current_audit_info
def set_azure_audit_info(self, arguments) -> Azure_Audit_Info:

View File

@@ -38,6 +38,7 @@ class Provider_Output_Options:
bulk_checks_metadata: dict
verbose: str
output_filename: str
only_logs: bool
def __init__(self, arguments, allowlist_file, bulk_checks_metadata):
self.is_quiet = arguments.quiet
@@ -46,7 +47,7 @@ class Provider_Output_Options:
self.verbose = arguments.verbose
self.bulk_checks_metadata = bulk_checks_metadata
self.allowlist_file = allowlist_file
self.only_logs = arguments.only_logs
# Check output directory, if it is not created -> create it
if arguments.output_directory:
if not isdir(arguments.output_directory):

View File

@@ -7,7 +7,7 @@ from prowler.lib.cli.parser import ProwlerArgumentParser
prowler_command = "prowler"
class Test_Outputs:
class Test_Parser:
# Init parser
def setup_method(self):
self.parser = ProwlerArgumentParser()
@@ -25,9 +25,10 @@ class Test_Outputs:
assert not parsed.output_filename
assert "output" in parsed.output_directory
assert not parsed.verbose
assert parsed.no_banner
assert not parsed.no_banner
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.checks
assert not parsed.checks_file
assert not parsed.services
@@ -67,9 +68,10 @@ class Test_Outputs:
assert not parsed.output_filename
assert "output" in parsed.output_directory
assert not parsed.verbose
assert parsed.no_banner
assert not parsed.no_banner
assert parsed.log_level == "CRITICAL"
assert not parsed.log_file
assert not parsed.only_logs
assert not parsed.checks
assert not parsed.checks_file
assert not parsed.services
@@ -201,12 +203,18 @@ class Test_Outputs:
def test_root_parser_no_banner_short(self):
command = [prowler_command, "-b"]
parsed = self.parser.parse(command)
assert not parsed.no_banner
assert parsed.no_banner
def test_root_parser_no_banner_long(self):
command = [prowler_command, "--no-banner"]
parsed = self.parser.parse(command)
assert not parsed.no_banner
assert parsed.no_banner
def test_logging_parser_only_logs_set(self):
command = [prowler_command, "--only-logs"]
parsed = self.parser.parse(command)
assert parsed.only_logs
assert parsed.no_banner
def test_logging_parser_log_level_default(self):
log_level = "CRITICAL"

View File

@@ -66,6 +66,7 @@ class Test_Common_Output_Options:
arguments.output_filename = "output_test_filename"
arguments.security_hub = True
arguments.shodan = "test-api-key"
arguments.only_logs = False
audit_info = self.set_mocked_aws_audit_info()
allowlist_file = ""
@@ -94,6 +95,7 @@ class Test_Common_Output_Options:
arguments.verbose = True
arguments.security_hub = True
arguments.shodan = "test-api-key"
arguments.only_logs = False
# Mock AWS Audit Info
audit_info = self.set_mocked_aws_audit_info()
@@ -125,6 +127,7 @@ class Test_Common_Output_Options:
arguments.output_modes = ["html", "csv", "json"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
# Mock Azure Audit Info
audit_info = self.set_mocked_azure_audit_info()
@@ -159,6 +162,7 @@ class Test_Common_Output_Options:
arguments.output_modes = ["html", "csv", "json"]
arguments.output_directory = "output_test_directory"
arguments.verbose = True
arguments.only_logs = False
# Mock Azure Audit Info
audit_info = self.set_mocked_azure_audit_info()