Compare commits

...

2 Commits

Author SHA1 Message Date
n4ch04
6f39fb47c3 fix(check lib): delete comments and debug 2023-08-18 12:07:06 +02:00
n4ch04
b75226433c feat(checks metdata): only load checks metadata once 2023-08-18 11:55:08 +02:00
4 changed files with 47 additions and 39 deletions

View File

@@ -160,7 +160,11 @@ def prowler():
findings = []
if len(checks_to_execute):
findings = execute_checks(
checks_to_execute, provider, audit_info, audit_output_options
checks_to_execute,
provider,
audit_info,
audit_output_options,
bulk_checks_metadata,
)
else:
logger.error(

View File

@@ -14,7 +14,7 @@ from colorama import Fore, Style
from prowler.config.config import orange_color
from prowler.lib.check.compliance_models import load_compliance_framework
from prowler.lib.check.models import Check, load_check_metadata
from prowler.lib.check.models import Check, Check_Metadata_Model, load_check_metadata
from prowler.lib.logger import logger
try:
@@ -385,20 +385,21 @@ def import_check(check_path: str) -> ModuleType:
def run_check(check: Check, output_options: Provider_Output_Options) -> list:
findings = []
if output_options.verbose:
print(
f"\nCheck ID: {check.CheckID} - {Fore.MAGENTA}{check.ServiceName}{Fore.YELLOW} [{check.Severity}]{Style.RESET_ALL}"
)
logger.debug(f"Executing check: {check.CheckID}")
try:
if output_options.verbose:
print(
f"\nCheck ID: {check.check_metadata.CheckID} - {Fore.MAGENTA}{check.check_metadata.ServiceName}{Fore.YELLOW} [{check.check_metadata.Severity}]{Style.RESET_ALL}"
)
logger.debug(f"Executing check: {check.check_metadata.CheckID}")
findings = check.execute()
except Exception as error:
if not output_options.only_logs:
print(
f"Something went wrong in {check.CheckID}, please use --log-level ERROR"
f"Something went wrong in {check.check_metadata.CheckID}, please use --log-level ERROR"
)
logger.error(
f"{check.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
f"{check.check_metadata.CheckID} -- {error.__class__.__name__}[{traceback.extract_tb(error.__traceback__)[-1].lineno}]: {error}"
)
finally:
return findings
@@ -409,6 +410,7 @@ def execute_checks(
provider: str,
audit_info: Any,
audit_output_options: Provider_Output_Options,
bulk_checks_metadata: dict,
) -> list:
# List to store all the check's findings
all_findings = []
@@ -454,6 +456,7 @@ def execute_checks(
audit_info,
services_executed,
checks_executed,
bulk_checks_metadata,
)
all_findings.extend(check_findings)
@@ -500,6 +503,7 @@ def execute_checks(
audit_info,
services_executed,
checks_executed,
bulk_checks_metadata,
)
all_findings.extend(check_findings)
bar()
@@ -527,25 +531,32 @@ def execute(
audit_info: Any,
services_executed: set,
checks_executed: set,
bulk_checks_metadata: dict[str, Check_Metadata_Model],
):
# Import check module
check_module_path = (
f"prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
)
lib = import_check(check_module_path)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
c = check_to_execute()
try:
# Import check module
check_module_path = (
f"prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
)
lib = import_check(check_module_path)
# Recover functions from check
metadata = bulk_checks_metadata[check_name]
check_to_execute = getattr(lib, check_name)
c = check_to_execute(metadata)
# Run check
check_findings = run_check(c, audit_output_options)
# Run check
check_findings = run_check(c, audit_output_options)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
audit_info.audit_metadata = update_audit_metadata(
audit_info.audit_metadata, services_executed, checks_executed
)
# Update Audit Status
services_executed.add(service)
checks_executed.add(check_name)
audit_info.audit_metadata = update_audit_metadata(
audit_info.audit_metadata, services_executed, checks_executed
)
except Exception as error:
logger.error(
f"{check_name} - {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
# Report the check's findings
report(check_findings, audit_output_options, audit_info)

View File

@@ -1,4 +1,3 @@
import os
import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass
@@ -56,24 +55,18 @@ class Check_Metadata_Model(BaseModel):
Compliance: list = None
class Check(ABC, Check_Metadata_Model):
class Check(ABC):
"""Prowler Check"""
def __init__(self, **data):
check_metadata: Check_Metadata_Model
def __init__(self, metadata):
"""Check's init function. Calls the CheckMetadataModel init."""
# Parse the Check's metadata file
metadata_file = (
os.path.abspath(sys.modules[self.__module__].__file__)[:-3]
+ ".metadata.json"
)
# Store it to validate them with Pydantic
data = Check_Metadata_Model.parse_file(metadata_file).dict()
# Calls parents init function
super().__init__(**data)
self.check_metadata = metadata
def metadata(self) -> dict:
"""Return the JSON representation of the check's metadata"""
return self.json()
return self.check_metadata.json()
@abstractmethod
def execute(self):

View File

@@ -435,7 +435,7 @@ class Check_Output_JSON(BaseModel):
Risk: str
RelatedUrl: str
Remediation: Remediation
Compliance: Optional[dict]
Compliance: Optional[list]
Categories: List[str]
DependsOn: List[str]
RelatedTo: List[str]