mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
3b0124d3fd
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
384 lines
14 KiB
Python
384 lines
14 KiB
Python
import importlib.metadata
|
|
import os
|
|
import pathlib
|
|
from datetime import datetime, timezone
|
|
from enum import Enum
|
|
from os import getcwd
|
|
from typing import Tuple
|
|
|
|
import requests
|
|
import yaml
|
|
from packaging import version
|
|
|
|
from prowler.lib.check.compliance_models import load_compliance_framework_universal
|
|
|
|
# Re-exported from a leaf module so prowler.lib.check.utils can import the
|
|
# constant without participating in the config <-> compliance_models <-> utils
|
|
# import cycle. Existing consumers continue to import from this module.
|
|
# The `as EXTERNAL_TOOL_PROVIDERS` rename is the PEP 484 explicit re-export
|
|
# form so static analyzers (CodeQL, mypy, ruff) treat the name as public.
|
|
from prowler.lib.check.external_tool_providers import ( # noqa: F401
|
|
EXTERNAL_TOOL_PROVIDERS as EXTERNAL_TOOL_PROVIDERS,
|
|
)
|
|
from prowler.lib.logger import logger
|
|
|
|
|
|
class _MutableTimestamp:
|
|
"""Lightweight proxy to keep timestamp references in sync across modules."""
|
|
|
|
def __init__(self, value: datetime) -> None:
|
|
self.value = value
|
|
|
|
def set(self, value: datetime) -> None:
|
|
self.value = value
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.value, name)
|
|
|
|
def __str__(self) -> str: # pragma: no cover - trivial forwarder
|
|
return str(self.value)
|
|
|
|
def __repr__(self) -> str: # pragma: no cover - trivial forwarder
|
|
return repr(self.value)
|
|
|
|
def __eq__(self, other) -> bool:
|
|
if isinstance(other, _MutableTimestamp):
|
|
return self.value == other.value
|
|
return self.value == other
|
|
|
|
|
|
timestamp = _MutableTimestamp(datetime.today())
|
|
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
|
|
prowler_version = "5.32.0"
|
|
html_logo_url = "https://github.com/prowler-cloud/prowler/"
|
|
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
|
|
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
|
|
azure_logo = "https://user-images.githubusercontent.com/38561120/235927375-b23e2e0f-8932-49ec-b59c-d89f61c8041d.png"
|
|
gcp_logo = "https://user-images.githubusercontent.com/38561120/235928332-eb4accdc-c226-4391-8e97-6ca86a91cf50.png"
|
|
|
|
orange_color = "\033[38;5;208m"
|
|
banner_color = "\033[1;92m"
|
|
|
|
|
|
class Provider(str, Enum):
|
|
AWS = "aws"
|
|
GCP = "gcp"
|
|
AZURE = "azure"
|
|
CLOUDFLARE = "cloudflare"
|
|
KUBERNETES = "kubernetes"
|
|
M365 = "m365"
|
|
GITHUB = "github"
|
|
GOOGLEWORKSPACE = "googleworkspace"
|
|
IAC = "iac"
|
|
NHN = "nhn"
|
|
MONGODBATLAS = "mongodbatlas"
|
|
ORACLECLOUD = "oraclecloud"
|
|
ALIBABACLOUD = "alibabacloud"
|
|
OPENSTACK = "openstack"
|
|
IMAGE = "image"
|
|
SCALEWAY = "scaleway"
|
|
VERCEL = "vercel"
|
|
OKTA = "okta"
|
|
STACKIT = "stackit"
|
|
LINODE = "linode"
|
|
|
|
|
|
# Compliance
|
|
actual_directory = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
|
|
|
|
|
|
def _get_ep_compliance_dirs() -> dict:
|
|
"""Discover compliance directories from entry points. Returns {provider: path}."""
|
|
dirs = {}
|
|
for ep in importlib.metadata.entry_points(group="prowler.compliance"):
|
|
try:
|
|
module = ep.load()
|
|
if hasattr(module, "__path__"):
|
|
dirs[ep.name] = module.__path__[0]
|
|
elif hasattr(module, "__file__"):
|
|
dirs[ep.name] = os.path.dirname(module.__file__)
|
|
except Exception as error:
|
|
logger.warning(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
|
)
|
|
return dirs
|
|
|
|
|
|
def get_available_compliance_frameworks(provider=None):
|
|
available_compliance_frameworks = []
|
|
# Built-in compliance
|
|
compliance_base = f"{actual_directory}/../compliance"
|
|
if provider:
|
|
providers = [provider]
|
|
else:
|
|
# Scan compliance directory for all provider subdirectories
|
|
providers = []
|
|
if os.path.isdir(compliance_base):
|
|
for entry in os.scandir(compliance_base):
|
|
if entry.is_dir():
|
|
providers.append(entry.name)
|
|
for prov in providers:
|
|
compliance_dir = f"{compliance_base}/{prov}"
|
|
if not os.path.isdir(compliance_dir):
|
|
continue
|
|
with os.scandir(compliance_dir) as files:
|
|
for file in files:
|
|
if file.is_file() and file.name.endswith(".json"):
|
|
available_compliance_frameworks.append(
|
|
file.name.removesuffix(".json")
|
|
)
|
|
# Built-in multi-provider frameworks at top-level compliance/ directory.
|
|
# Placed before external entry points so built-ins win on name collisions.
|
|
# When a specific provider was requested, only include the framework if it
|
|
# declares support for that provider; otherwise include all universal frameworks.
|
|
compliance_root = f"{actual_directory}/../compliance"
|
|
if os.path.isdir(compliance_root):
|
|
with os.scandir(compliance_root) as files:
|
|
for file in files:
|
|
if file.is_file() and file.name.endswith(".json"):
|
|
name = file.name.removesuffix(".json")
|
|
if provider:
|
|
framework = load_compliance_framework_universal(file.path)
|
|
if framework is None or not framework.supports_provider(
|
|
provider
|
|
):
|
|
continue
|
|
if name not in available_compliance_frameworks:
|
|
available_compliance_frameworks.append(name)
|
|
# External per-provider compliance via entry points.
|
|
ep_dirs = _get_ep_compliance_dirs()
|
|
for prov, path in ep_dirs.items():
|
|
if provider and prov != provider:
|
|
continue
|
|
if os.path.isdir(path):
|
|
for file in os.scandir(path):
|
|
if file.is_file() and file.name.endswith(".json"):
|
|
name = file.name.removesuffix(".json")
|
|
if name not in available_compliance_frameworks:
|
|
available_compliance_frameworks.append(name)
|
|
# External multi-provider frameworks via the dedicated universal group;
|
|
# filtered by supports_provider when a provider is given.
|
|
for ep in importlib.metadata.entry_points(group="prowler.compliance.universal"):
|
|
try:
|
|
module = ep.load()
|
|
path = (
|
|
module.__path__[0]
|
|
if hasattr(module, "__path__")
|
|
else os.path.dirname(module.__file__)
|
|
)
|
|
except Exception as error:
|
|
logger.warning(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
|
)
|
|
continue
|
|
if not os.path.isdir(path):
|
|
continue
|
|
for file in os.scandir(path):
|
|
if file.is_file() and file.name.endswith(".json"):
|
|
name = file.name.removesuffix(".json")
|
|
if provider:
|
|
framework = load_compliance_framework_universal(file.path)
|
|
if framework is None or not framework.supports_provider(provider):
|
|
continue
|
|
if name not in available_compliance_frameworks:
|
|
available_compliance_frameworks.append(name)
|
|
return available_compliance_frameworks
|
|
|
|
|
|
available_compliance_frameworks = get_available_compliance_frameworks()
|
|
|
|
|
|
# AWS services-regions matrix json
|
|
aws_services_json_file = "aws_regions_by_service.json"
|
|
|
|
# gcp_zones_json_file = "gcp_zones.json"
|
|
|
|
default_output_directory = getcwd() + "/output"
|
|
output_file_timestamp = timestamp.strftime("%Y%m%d%H%M%S")
|
|
timestamp_iso = timestamp.isoformat(sep=" ", timespec="seconds")
|
|
csv_file_suffix = ".csv"
|
|
json_file_suffix = ".json"
|
|
json_asff_file_suffix = ".asff.json"
|
|
json_ocsf_file_suffix = ".ocsf.json"
|
|
html_file_suffix = ".html"
|
|
sarif_file_suffix = ".sarif"
|
|
default_config_file_path = (
|
|
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/config.yaml"
|
|
)
|
|
default_fixer_config_file_path = (
|
|
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/fixer_config.yaml"
|
|
)
|
|
default_redteam_config_file_path = (
|
|
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/llm_config.yaml"
|
|
)
|
|
encoding_format_utf_8 = "utf-8"
|
|
available_output_formats = ["csv", "json-asff", "json-ocsf", "html", "sarif"]
|
|
|
|
# Prowler Cloud API settings
|
|
cloud_api_base_url = os.getenv("PROWLER_CLOUD_API_BASE_URL", "https://api.prowler.com")
|
|
cloud_api_key = os.getenv("PROWLER_CLOUD_API_KEY", "")
|
|
cloud_api_ingestion_path = "/api/v1/ingestions"
|
|
|
|
|
|
def set_output_timestamp(
|
|
new_timestamp: datetime,
|
|
) -> Tuple[datetime, datetime, str, str]:
|
|
"""
|
|
Override the global output timestamps so generated artifacts reflect a specific scan.
|
|
Returns the previous values so callers can restore them afterwards.
|
|
"""
|
|
global output_file_timestamp, timestamp_iso
|
|
|
|
previous_values = (
|
|
timestamp.value,
|
|
timestamp_utc.value,
|
|
output_file_timestamp,
|
|
timestamp_iso,
|
|
)
|
|
|
|
timestamp.set(new_timestamp)
|
|
timestamp_utc.set(
|
|
new_timestamp.astimezone(timezone.utc)
|
|
if new_timestamp.tzinfo
|
|
else new_timestamp.replace(tzinfo=timezone.utc)
|
|
)
|
|
output_file_timestamp = timestamp.strftime("%Y%m%d%H%M%S")
|
|
timestamp_iso = timestamp.isoformat(sep=" ", timespec="seconds")
|
|
|
|
return previous_values
|
|
|
|
|
|
def get_default_mute_file_path(provider: str):
|
|
"""
|
|
get_default_mute_file_path returns the default mute file path for the provider
|
|
"""
|
|
# TODO: create default mutelist file for kubernetes, azure and gcp
|
|
mutelist_path = f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/{provider}_mutelist.yaml"
|
|
if not os.path.isfile(mutelist_path):
|
|
mutelist_path = None
|
|
return mutelist_path
|
|
|
|
|
|
def check_current_version():
|
|
try:
|
|
prowler_version_string = f"Prowler {prowler_version}"
|
|
release_response = requests.get(
|
|
"https://api.github.com/repos/prowler-cloud/prowler/tags", timeout=1
|
|
)
|
|
latest_version = release_response.json()[0]["name"]
|
|
if version.parse(latest_version) > version.parse(prowler_version):
|
|
return f"{prowler_version_string} (latest is {latest_version}, upgrade for the latest features)"
|
|
else:
|
|
return (
|
|
f"{prowler_version_string} (You are running the latest version, yay!)"
|
|
)
|
|
except requests.RequestException:
|
|
return f"{prowler_version_string}"
|
|
except Exception:
|
|
return f"{prowler_version_string}"
|
|
|
|
|
|
def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
|
|
"""
|
|
Reads the Prowler config file in YAML format from the default location or the file passed with the --config-file flag.
|
|
|
|
Args:
|
|
provider (str): The provider name (e.g., 'aws', 'gcp', 'azure', 'kubernetes').
|
|
config_file_path (str): The path to the configuration file.
|
|
|
|
Returns:
|
|
dict: The configuration dictionary for the specified provider.
|
|
"""
|
|
# Imported lazily to avoid an import cycle: schemas may eventually want to
|
|
# import from prowler.config.config (e.g. for shared constants).
|
|
from prowler.config.schema.registry import SCHEMAS
|
|
from prowler.config.schema.validator import validate_provider_config
|
|
|
|
try:
|
|
with open(config_file_path, "r", encoding=encoding_format_utf_8) as f:
|
|
config_file = yaml.safe_load(f)
|
|
|
|
# Namespaced format: each provider has its own top-level key.
|
|
# Works for every built-in and every external plugin without a hardcoded list.
|
|
# Flat legacy format is AWS-only (historical, pre-multicloud). We identify it
|
|
# by the absence of nested-dict top-level values (namespaced files always
|
|
# have dict values; the legacy AWS format only has primitives/lists).
|
|
if (
|
|
isinstance(config_file, dict)
|
|
and provider in config_file
|
|
and isinstance(config_file[provider], dict)
|
|
):
|
|
config = config_file.get(provider, {}) or {}
|
|
elif (
|
|
isinstance(config_file, dict)
|
|
and config_file
|
|
and provider == "aws"
|
|
and not any(isinstance(v, dict) for v in config_file.values())
|
|
):
|
|
config = config_file
|
|
else:
|
|
config = {}
|
|
|
|
return validate_provider_config(
|
|
provider=provider,
|
|
raw=config,
|
|
schema_cls=SCHEMAS.get(provider),
|
|
)
|
|
|
|
except FileNotFoundError as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
except yaml.YAMLError as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
except UnicodeDecodeError as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
except Exception as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
|
|
return {}
|
|
|
|
|
|
def load_and_validate_fixer_config_file(
|
|
provider: str, fixer_config_file_path: str
|
|
) -> dict:
|
|
"""
|
|
Reads the Prowler fixer config file in YAML format from the default location or the file passed with the --fixer-config flag.
|
|
|
|
Args:
|
|
provider (str): The provider name (e.g., 'aws', 'gcp', 'azure', 'kubernetes').
|
|
fixer_config_file_path (str): The path to the fixer configuration file.
|
|
|
|
Returns:
|
|
dict: The fixer configuration dictionary for the specified provider.
|
|
"""
|
|
try:
|
|
with open(fixer_config_file_path, "r", encoding=encoding_format_utf_8) as f:
|
|
fixer_config_file = yaml.safe_load(f)
|
|
return fixer_config_file.get(provider, {})
|
|
|
|
except FileNotFoundError as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
except yaml.YAMLError as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
except UnicodeDecodeError as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
except Exception as error:
|
|
logger.error(
|
|
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
|
)
|
|
|
|
return {}
|