mirror of
https://github.com/prowler-cloud/prowler.git
synced 2025-12-19 05:17:47 +00:00
Compare commits
2 Commits
ed3fd72e70
...
load-once-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6f39fb47c3 | ||
|
|
b75226433c |
@@ -160,7 +160,11 @@ def prowler():
|
||||
findings = []
|
||||
if len(checks_to_execute):
|
||||
findings = execute_checks(
|
||||
checks_to_execute, provider, audit_info, audit_output_options
|
||||
checks_to_execute,
|
||||
provider,
|
||||
audit_info,
|
||||
audit_output_options,
|
||||
bulk_checks_metadata,
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
|
||||
@@ -14,7 +14,7 @@ from colorama import Fore, Style
|
||||
|
||||
from prowler.config.config import orange_color
|
||||
from prowler.lib.check.compliance_models import load_compliance_framework
|
||||
from prowler.lib.check.models import Check, load_check_metadata
|
||||
from prowler.lib.check.models import Check, Check_Metadata_Model, load_check_metadata
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
try:
|
||||
@@ -385,20 +385,21 @@ def import_check(check_path: str) -> ModuleType:
|
||||
|
||||
def run_check(check: Check, output_options: Provider_Output_Options) -> list:
|
||||
findings = []
|
||||
if output_options.verbose:
|
||||
print(
|
||||
f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity}]{Style.RESET_ALL}"
|
||||
)
|
||||
logger.debug(f"Executing check: {check.CheckID}")
|
||||
|
||||
try:
|
||||
if output_options.verbose:
|
||||
print(
|
||||
f"\nCheck ID: {check.check_metadata.CheckID} - {Fore.MAGENTA}{check.check_metadata.ServiceName}{Fore.YELLOW} [{check.check_metadata.Severity}]{Style.RESET_ALL}"
|
||||
)
|
||||
logger.debug(f"Executing check: {check.check_metadata.CheckID}")
|
||||
findings = check.execute()
|
||||
except Exception as error:
|
||||
if not output_options.only_logs:
|
||||
print(
|
||||
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
|
||||
f"Something went wrong in {check.check_metadata.CheckID}, please use --log-level ERROR"
|
||||
)
|
||||
logger.error(
|
||||
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
|
||||
f"{check.check_metadata.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
|
||||
)
|
||||
finally:
|
||||
return findings
|
||||
@@ -409,6 +410,7 @@ def execute_checks(
|
||||
provider: str,
|
||||
audit_info: Any,
|
||||
audit_output_options: Provider_Output_Options,
|
||||
bulk_checks_metadata: dict,
|
||||
) -> list:
|
||||
# List to store all the check's findings
|
||||
all_findings = []
|
||||
@@ -454,6 +456,7 @@ def execute_checks(
|
||||
audit_info,
|
||||
services_executed,
|
||||
checks_executed,
|
||||
bulk_checks_metadata,
|
||||
)
|
||||
all_findings.extend(check_findings)
|
||||
|
||||
@@ -500,6 +503,7 @@ def execute_checks(
|
||||
audit_info,
|
||||
services_executed,
|
||||
checks_executed,
|
||||
bulk_checks_metadata,
|
||||
)
|
||||
all_findings.extend(check_findings)
|
||||
bar()
|
||||
@@ -527,25 +531,32 @@ def execute(
|
||||
audit_info: Any,
|
||||
services_executed: set,
|
||||
checks_executed: set,
|
||||
bulk_checks_metadata: dict[str, Check_Metadata_Model],
|
||||
):
|
||||
# Import check module
|
||||
check_module_path = (
|
||||
f"prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
|
||||
)
|
||||
lib = import_check(check_module_path)
|
||||
# Recover functions from check
|
||||
check_to_execute = getattr(lib, check_name)
|
||||
c = check_to_execute()
|
||||
try:
|
||||
# Import check module
|
||||
check_module_path = (
|
||||
f"prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
|
||||
)
|
||||
lib = import_check(check_module_path)
|
||||
# Recover functions from check
|
||||
metadata = bulk_checks_metadata[check_name]
|
||||
check_to_execute = getattr(lib, check_name)
|
||||
c = check_to_execute(metadata)
|
||||
|
||||
# Run check
|
||||
check_findings = run_check(c, audit_output_options)
|
||||
# Run check
|
||||
check_findings = run_check(c, audit_output_options)
|
||||
|
||||
# Update Audit Status
|
||||
services_executed.add(service)
|
||||
checks_executed.add(check_name)
|
||||
audit_info.audit_metadata = update_audit_metadata(
|
||||
audit_info.audit_metadata, services_executed, checks_executed
|
||||
)
|
||||
# Update Audit Status
|
||||
services_executed.add(service)
|
||||
checks_executed.add(check_name)
|
||||
audit_info.audit_metadata = update_audit_metadata(
|
||||
audit_info.audit_metadata, services_executed, checks_executed
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
# Report the check's findings
|
||||
report(check_findings, audit_output_options, audit_info)
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
import os
|
||||
import sys
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass
|
||||
@@ -56,24 +55,18 @@ class Check_Metadata_Model(BaseModel):
|
||||
Compliance: list = None
|
||||
|
||||
|
||||
class Check(ABC, Check_Metadata_Model):
|
||||
class Check(ABC):
|
||||
"""Prowler Check"""
|
||||
|
||||
def __init__(self, **data):
|
||||
check_metadata: Check_Metadata_Model
|
||||
|
||||
def __init__(self, metadata):
|
||||
"""Check's init function. Calls the CheckMetadataModel init."""
|
||||
# Parse the Check's metadata file
|
||||
metadata_file = (
|
||||
os.path.abspath(sys.modules[self.__module__].__file__)[:-3]
|
||||
+ ".metadata.json"
|
||||
)
|
||||
# Store it to validate them with Pydantic
|
||||
data = Check_Metadata_Model.parse_file(metadata_file).dict()
|
||||
# Calls parents init function
|
||||
super().__init__(**data)
|
||||
self.check_metadata = metadata
|
||||
|
||||
def metadata(self) -> dict:
|
||||
"""Return the JSON representation of the check's metadata"""
|
||||
return self.json()
|
||||
return self.check_metadata.json()
|
||||
|
||||
@abstractmethod
|
||||
def execute(self):
|
||||
|
||||
@@ -435,7 +435,7 @@ class Check_Output_JSON(BaseModel):
|
||||
Risk: str
|
||||
RelatedUrl: str
|
||||
Remediation: Remediation
|
||||
Compliance: Optional[dict]
|
||||
Compliance: Optional[list]
|
||||
Categories: List[str]
|
||||
DependsOn: List[str]
|
||||
RelatedTo: List[str]
|
||||
|
||||
Reference in New Issue
Block a user