mirror of
https://github.com/prowler-cloud/prowler.git
synced 2025-12-19 05:17:47 +00:00
Compare commits
2 Commits
0d0dabe166
...
PRWLR-4820
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
184d28ca7b | ||
|
|
0d208515fd |
@@ -470,18 +470,37 @@ def execute_checks(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
verbose = (
|
||||
global_provider.output_options.verbose or global_provider.output_options.fixer
|
||||
)
|
||||
# Execution with the --only-logs flag
|
||||
if global_provider.output_options.only_logs:
|
||||
for check_name in checks_to_execute:
|
||||
# Recover service from check name
|
||||
service = check_name.split("_")[0]
|
||||
try:
|
||||
try:
|
||||
# Import check module
|
||||
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
|
||||
lib = import_check(check_module_path)
|
||||
# Recover functions from check
|
||||
check_to_execute = getattr(lib, check_name)
|
||||
check = check_to_execute()
|
||||
except ModuleNotFoundError:
|
||||
logger.error(
|
||||
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
|
||||
)
|
||||
continue
|
||||
if verbose:
|
||||
print(
|
||||
f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity}]{Style.RESET_ALL}"
|
||||
)
|
||||
check_findings = execute(
|
||||
service,
|
||||
check_name,
|
||||
check,
|
||||
global_provider,
|
||||
custom_checks_metadata,
|
||||
)
|
||||
report(check_findings, global_provider)
|
||||
all_findings.extend(check_findings)
|
||||
|
||||
# Update Audit Status
|
||||
@@ -539,12 +558,28 @@ def execute_checks(
|
||||
f"-> Scanning {orange_color}{service}{Style.RESET_ALL} service"
|
||||
)
|
||||
try:
|
||||
try:
|
||||
# Import check module
|
||||
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
|
||||
lib = import_check(check_module_path)
|
||||
# Recover functions from check
|
||||
check_to_execute = getattr(lib, check_name)
|
||||
check = check_to_execute()
|
||||
except ModuleNotFoundError:
|
||||
logger.error(
|
||||
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
|
||||
)
|
||||
continue
|
||||
if verbose:
|
||||
print(
|
||||
f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity}]{Style.RESET_ALL}"
|
||||
)
|
||||
check_findings = execute(
|
||||
service,
|
||||
check_name,
|
||||
check,
|
||||
global_provider,
|
||||
custom_checks_metadata,
|
||||
)
|
||||
report(check_findings, global_provider)
|
||||
all_findings.extend(check_findings)
|
||||
services_executed.add(service)
|
||||
checks_executed.add(check_name)
|
||||
@@ -567,39 +602,52 @@ def execute_checks(
|
||||
)
|
||||
bar()
|
||||
bar.title = f"-> {Fore.GREEN}Scan completed!{Style.RESET_ALL}"
|
||||
|
||||
# Custom report interface
|
||||
if os.environ.get("PROWLER_REPORT_LIB_PATH"):
|
||||
try:
|
||||
logger.info("Using custom report interface ...")
|
||||
lib = os.environ["PROWLER_REPORT_LIB_PATH"]
|
||||
outputs_module = importlib.import_module(lib)
|
||||
custom_report_interface = getattr(outputs_module, "report")
|
||||
|
||||
# TODO: review this call and see if we can remove the global_provider.output_options since it is contained in the global_provider
|
||||
custom_report_interface(
|
||||
check_findings, global_provider.output_options, global_provider
|
||||
)
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
|
||||
return all_findings
|
||||
|
||||
|
||||
def execute(
|
||||
service: str,
|
||||
check_name: str,
|
||||
check: Check,
|
||||
global_provider: Any,
|
||||
custom_checks_metadata: Any,
|
||||
):
|
||||
try:
|
||||
# Import check module
|
||||
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
|
||||
lib = import_check(check_module_path)
|
||||
# Recover functions from check
|
||||
check_to_execute = getattr(lib, check_name)
|
||||
check_class = check_to_execute()
|
||||
|
||||
# Update check metadata to reflect that in the outputs
|
||||
if custom_checks_metadata and custom_checks_metadata["Checks"].get(
|
||||
check_class.CheckID
|
||||
check.CheckID
|
||||
):
|
||||
check_class = update_check_metadata(
|
||||
check_class, custom_checks_metadata["Checks"][check_class.CheckID]
|
||||
check = update_check_metadata(
|
||||
check, custom_checks_metadata["Checks"][check.CheckID]
|
||||
)
|
||||
|
||||
# Run check
|
||||
verbose = (
|
||||
global_provider.output_options.verbose
|
||||
or global_provider.output_options.fixer
|
||||
)
|
||||
check_findings = run_check(
|
||||
check_class, verbose, global_provider.output_options.only_logs
|
||||
)
|
||||
# Execute the check
|
||||
check_findings = []
|
||||
logger.debug(f"Executing check: {check.CheckID}")
|
||||
try:
|
||||
check_findings = check.execute()
|
||||
except Exception as error:
|
||||
if not global_provider.output_options.only_logs:
|
||||
print(
|
||||
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
|
||||
)
|
||||
logger.error(
|
||||
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
|
||||
)
|
||||
|
||||
# Exclude findings per status
|
||||
if global_provider.output_options.status:
|
||||
@@ -609,9 +657,8 @@ def execute(
|
||||
if finding.status in global_provider.output_options.status
|
||||
]
|
||||
|
||||
# Mutelist findings
|
||||
# Before returning the findings, we need to apply the mute list logic
|
||||
if hasattr(global_provider, "mutelist") and global_provider.mutelist.mutelist:
|
||||
# TODO: make this prettier
|
||||
is_finding_muted_args = {}
|
||||
if global_provider.type == "aws":
|
||||
is_finding_muted_args["aws_account_id"] = (
|
||||
@@ -625,28 +672,9 @@ def execute(
|
||||
finding.muted = global_provider.mutelist.is_finding_muted(
|
||||
**is_finding_muted_args
|
||||
)
|
||||
|
||||
# Refactor(Outputs)
|
||||
# Report the check's findings
|
||||
report(check_findings, global_provider)
|
||||
|
||||
# Refactor(Outputs)
|
||||
if os.environ.get("PROWLER_REPORT_LIB_PATH"):
|
||||
try:
|
||||
logger.info("Using custom report interface ...")
|
||||
lib = os.environ["PROWLER_REPORT_LIB_PATH"]
|
||||
outputs_module = importlib.import_module(lib)
|
||||
custom_report_interface = getattr(outputs_module, "report")
|
||||
|
||||
# TODO: review this call and see if we can remove the global_provider.output_options since it is contained in the global_provider
|
||||
custom_report_interface(
|
||||
check_findings, global_provider.output_options, global_provider
|
||||
)
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
except ModuleNotFoundError:
|
||||
logger.error(
|
||||
f"Check '{check_name}' was not found for the {global_provider.type.upper()} provider"
|
||||
f"Check '{check.CheckID}' was not found for the {global_provider.type.upper()} provider"
|
||||
)
|
||||
check_findings = []
|
||||
except Exception as error:
|
||||
|
||||
@@ -29,7 +29,7 @@ class IAM(GCPService):
|
||||
while request is not None:
|
||||
response = request.execute()
|
||||
|
||||
for account in response["accounts"]:
|
||||
for account in response.get("accounts", []):
|
||||
self.service_accounts.append(
|
||||
ServiceAccount(
|
||||
name=account["name"],
|
||||
@@ -65,7 +65,7 @@ class IAM(GCPService):
|
||||
)
|
||||
response = request.execute()
|
||||
|
||||
for key in response["keys"]:
|
||||
for key in response.get("keys", []):
|
||||
sa.keys.append(
|
||||
Key(
|
||||
name=key["name"].split("/")[-1],
|
||||
|
||||
Reference in New Issue
Block a user