Compare commits

...

37 Commits

Author SHA1 Message Date
StylusFrost b13baa9076 refactor(sdk): scope ImageBaseException catch to image provider in __main__
After this PR, the tool-wrapper else branch in __main__.py covers iac,
image, and any external tool-wrapper provider registered via entry
points (anything Provider.is_tool_wrapper_provider returns True for and
that is not llm). The existing `except ImageBaseException` was specific
to the Image provider but lived in a branch that was no longer
Image-only — its name no longer matched its scope, misleading plug-in
authors reading the file.

Move the catch inside an explicit `if provider == "image":` branch so
the name matches what it catches. iac and external tool-wrapper
providers fall through to the outer `except Exception` backstop for
unexpected failures, which was already the effective behavior in
practice (they do not raise ImageBaseException).

No behavior change — pure refactor for legibility, flagged by reviewer
on the PR #10700 review of 2026-05-06.
2026-05-07 10:11:37 +02:00
StylusFrost 4fb14bbb21 perf(sdk): cache misses in Provider._load_ep_provider
Repeated lookups for unknown provider names (built-ins, typos, names
with no registered entry point) re-iterated entry_points() on every
call because only hits were cached. importlib.metadata.entry_points()
walks the metadata of every installed Python package, so the cost is
proportional to the size of the venv, not just to Prowler.

Caches None on miss so subsequent lookups hit the existing
`if name in Provider._ep_providers` short-circuit and return
immediately. Aligns Provider._ep_providers with the symmetric cache
in tool_wrapper._ep_class_cache, which already had this behavior.

Includes a regression test that mocks importlib.metadata.entry_points,
calls _load_ep_provider("nonexistent") twice, and asserts entry_points
is invoked exactly once.

Also underscore-prefix the remaining unused parameters on the abstract
Provider stubs (get_output_options, get_stdout_detail,
generate_compliance_output, display_compliance_table) so vulture stops
flagging them now that the file is in the diff. Same pattern applied
in bbe3a7dbf for the previous batch.
2026-05-07 10:00:07 +02:00
StylusFrost e5b9fee942 fix(sdk): dedupe entry-point compliance frameworks against built-ins
The entry-point loop in get_available_compliance_frameworks appended
framework names without checking whether the same name was already
provided by a built-in. The loader (Compliance.get_bulk) already
respects "built-in wins" via an explicit guard, but the listing
function did not, causing --list-compliance and
--list-compliance-requirements to print the same name twice on
collision with a built-in.

Adds the same dedup guard already used by the universal frameworks
loop a few lines above, restoring symmetry across the three
compliance discovery layers.

Includes a regression test that registers a fake entry point with
a name colliding with a known built-in (cis_2.0_aws) and asserts
the framework appears exactly once in the resulting list.
2026-05-07 09:32:48 +02:00
StylusFrost 020388824e fix(sdk): silence CodeQL py/not-named-self on CheckMetadata validators
Stack @classmethod under @validator for the five validators that take
cls as the first parameter. Pydantic v1 was already treating them as
classmethods via signature introspection, so runtime behavior is
unchanged. The explicit decorator resolves CodeQL alerts #6240-#6244.

Adds the existing # noqa: F841 marker (already used for cls in the
older validators of this file) so vulture does not flag cls as unused.

Affected validators in prowler/lib/check/models.py:
- validate_check_title
- validate_related_url
- validate_recommendation_url
- validate_description
- validate_risk
2026-05-06 18:26:22 +02:00
StylusFrost cf99e02ceb Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-05-05 08:57:11 +02:00
StylusFrost 9681901174 style(sdk): satisfy black and vulture in test_dynamic_provider_loading
Two unrelated lint blockers that CI surfaced once the file was in scope
of `black --check .` and `vulture` on the changed-file path:

- Reformat one if/and conditional that black wanted on a single line.
- Underscore-prefix unused parameter names on FakeProvider stub methods
  (from_cli_args, get_output_options, display_compliance_table,
  generate_compliance_output, FakePureContractProvider.from_cli_args)
  and the throwaway **kw in a MagicMock side_effect lambda. These are
  test fixtures whose method bodies don't reference those args; the
  signatures must match the abstract contract on Provider, so we keep
  positions/types and just rename to indicate intentional non-use.

No logic change.
2026-05-03 22:55:24 +02:00
StylusFrost bbe3a7dbf8 refactor(sdk): extract is_builtin_provider to leaf module to break import cycle
CodeQL flagged a cyclic import after `prowler/lib/check/utils.py` and
`prowler/lib/check/check.py` started importing `Provider` from
`prowler.providers.common.provider`. That module transitively imports
`prowler.config.config`, which imports back into `prowler.lib.check.*`
(`compliance_models`, `external_tool_providers`) — closing the cycle.

Apply the same pattern already used for `is_tool_wrapper_provider`:
extract the predicate to a leaf module, `prowler.providers.common.builtin`,
that depends only on `importlib.util`. `Provider.is_builtin` delegates to
the leaf, and call sites in `prowler.lib.check.*` now import directly
from the leaf — no more cycle.

Also underscore-prefix unused parameters on the abstract stubs in
`Provider` (get_finding_output_data, generate_compliance_output,
display_compliance_table) so vulture stops flagging them now that the
file is in the diff.
2026-05-03 22:46:00 +02:00
StylusFrost 0672c80563 fix(sdk): guard find_spec with is_builtin for external provider discovery
Calling importlib.util.find_spec on prowler.providers.{provider}.services
for an external provider propagates ModuleNotFoundError when the parent
package prowler.providers.{provider} does not exist, instead of returning
None. This caused recover_checks_from_provider, _resolve_check_module and
Scan.scan to fail with "No module named 'prowler.providers.{external}'"
even though the plug-in registered its checks via entry points correctly.

Gate the built-in branch on Provider.is_builtin (which already wraps the
find_spec in try/except) and reuse _resolve_check_module from Scan.scan
so external providers fall through to the entry-point lookup.
2026-05-03 22:31:31 +02:00
StylusFrost 92d7ea2170 Merge remote-tracking branch 'origin/master' into PROWLER-1391-provider-contract-dynamic-discovery
# Conflicts:
#	prowler/config/config.py
2026-05-03 19:41:25 +02:00
StylusFrost c7aa536896 fix(sdk): built-in wins on plug-in collision for providers and checks 2026-04-30 19:50:19 +02:00
StylusFrost e7f23bb13f fix(sdk): propagate provider argument from report to stdout_report 2026-04-30 14:06:17 +02:00
StylusFrost 82132a9341 fix(sdk): use find_spec to distinguish missing vs broken built-ins 2026-04-30 13:53:06 +02:00
StylusFrost 5e876579f8 fix(sdk): use is_tool_wrapper_provider for compliance framework gate 2026-04-30 10:12:24 +02:00
StylusFrost be49fd8c4e Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-28 14:48:21 +02:00
StylusFrost 15d8f1642e test(sdk): unit tests for tool_wrapper leaf module 2026-04-28 14:45:07 +02:00
StylusFrost 79f12f3617 refactor(sdk): extract is_tool_wrapper_provider to leaf module to break import cycle 2026-04-28 13:57:27 +02:00
StylusFrost 6715361246 fix(sdk): restore dynamic external providers help in CLI epilog 2026-04-28 12:47:50 +02:00
StylusFrost 45e946cd87 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-28 12:42:18 +02:00
StylusFrost 7836905b82 fix(sdk): consult Provider.is_tool_wrapper_provider in check discovery 2026-04-28 12:20:42 +02:00
StylusFrost 52f6653ccf fix(sdk): use equality not substring in provider dispatch chain 2026-04-28 11:54:23 +02:00
StylusFrost a5de6608ae fix(sdk): restore llm in parser usage line to match epilog 2026-04-28 10:50:43 +02:00
StylusFrost 1cdce02397 fix(sdk): use startswith("-") to detect CLI flags so external provider names with hyphens are not misparsed 2026-04-24 20:59:07 +02:00
StylusFrost a31fe9b618 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery
Conflict in prowler/config/config.py resolved by combining both branches:
- HEAD: external compliance discovery via entry points (PROWLER-1391)
- master: multi-provider framework JSONs scanned at top-level compliance/ (#10300)

Order: built-in per-provider -> built-in multi-provider -> external entry points.
Built-ins first so they win on name collisions against external registrations.

Supporting external plug-ins to register multi-provider frameworks is tracked
in PROWLER-1444.
2026-04-24 20:54:22 +02:00
StylusFrost 907166d88a fix(sdk): discriminate builtin vs external providers via find_spec for clearer import errors 2026-04-24 20:33:38 +02:00
StylusFrost 0883baad78 fix(sdk): external providers with --service and external checks for new services 2026-04-24 20:18:20 +02:00
StylusFrost cf70d1f9f8 fix(sdk): honor from_cli_args return value in init_global_provider fallback 2026-04-24 18:51:57 +02:00
StylusFrost 60e7657081 feat(sdk): wire is_external_tool_provider property to execution and metadata validators 2026-04-24 18:23:42 +02:00
StylusFrost e8487d0686 fix(sdk): unwrap namespaced config for all built-in and external providers
load_and_validate_config_file only detected the namespaced format for 5
hardcoded providers (aws, gcp, azure, kubernetes, m365). For every other
built-in (github, nhn, vercel, cloudflare, iac, llm, image, mongodbatlas,
oraclecloud, openstack, alibabacloud, googleworkspace) and for any
external plug-in, the full YAML was returned wrapped instead of the
provider's own block.

Replace the hardcoded list with a dynamic check: if the file has a
top-level key matching the provider and its value is a dict, unwrap it.
Keep the legacy flat format for AWS only (historical, pre-multicloud)
and identify it by the absence of nested-dict top-level values, which
prevents cross-provider config leakage when a namespaced file has no
section for the requested provider.
2026-04-24 18:01:47 +02:00
StylusFrost 9c056beed1 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-22 09:39:10 +02:00
StylusFrost f60f7c61c7 feat(provider): add display_compliance_table method for provider-specific compliance rendering 2026-04-21 19:40:38 +02:00
StylusFrost 3deb1359a5 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-21 18:52:13 +02:00
StylusFrost e2295bd086 feat(provider): implement get_mutelist_finding_args for external providers and add tests 2026-04-21 18:50:18 +02:00
StylusFrost e27317437d feat(external-provider): add dynamic loading tests and coverage for external provider 2026-04-21 14:37:50 +02:00
StylusFrost 6f6016d822 chore: update CHANGELOG for Prowler v5.25.0 with new features 2026-04-21 14:22:24 +02:00
StylusFrost 5f10e1c1b6 Merge branch 'master' into PROWLER-1391-provider-contract-dynamic-discovery 2026-04-21 14:19:52 +02:00
StylusFrost 484211b465 fix(sdk): align exception handlers to SDK convention and improve test coverage 2026-04-21 14:14:17 +02:00
StylusFrost f8333baf24 feat: Enhance dynamic provider loading and compliance framework discovery
- Implemented dynamic loading of external providers via entry points, allowing for greater flexibility in provider integration.
  - Added functionality to discover compliance directories from entry points, enabling external compliance frameworks to be loaded seamlessly.
  - Refactored check module resolution to prioritize built-in checks while falling back to entry points if necessary.
  - Improved compliance framework loading to include both built-in and external sources, ensuring comprehensive compliance coverage.
  - Enhanced CLI argument parsing to support external providers, improving user experience and configurability.
  - Introduced extensive unit tests to validate dynamic loading, compliance discovery, and overall integration of external providers.
2026-04-15 13:22:57 +02:00
27 changed files with 3002 additions and 163 deletions
+26
View File
@@ -516,6 +516,32 @@ jobs:
flags: prowler-py${{ matrix.python-version }}-vercel
files: ./vercel_coverage.xml
# External Provider (dynamic loading)
- name: Check if External Provider files changed
if: steps.check-changes.outputs.any_changed == 'true'
id: changed-external
uses: tj-actions/changed-files@22103cc46bda19c2b464ffe86db46df6922fd323 # v47.0.5
with:
files: |
./prowler/providers/common/**
./prowler/config/**
./prowler/lib/**
./tests/providers/external/**
./poetry.lock
- name: Run External Provider tests
if: steps.changed-external.outputs.any_changed == 'true'
run: poetry run pytest -n auto --cov=./prowler/providers/common --cov=./prowler/config --cov=./prowler/lib --cov-report=xml:external_coverage.xml tests/providers/external
- name: Upload External Provider coverage to Codecov
if: steps.changed-external.outputs.any_changed == 'true'
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
with:
flags: prowler-py${{ matrix.python-version }}-external
files: ./external_coverage.xml
# Lib
- name: Check if Lib files changed
if: steps.check-changes.outputs.any_changed == 'true'
+5
View File
@@ -6,6 +6,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🚀 Added
- Support for external/custom providers, checks, and compliance frameworks without modifying core code [(#10700)](https://github.com/prowler-cloud/prowler/pull/10700)
- `bedrock_guardrails_configured` check for AWS provider [(#10844)](https://github.com/prowler-cloud/prowler/pull/10844)
- Universal compliance pipeline integrated into the CLI: `--list-compliance` and `--list-compliance-requirements` show universal frameworks, and CSV plus OCSF outputs are generated for any framework declaring a `TableConfig` [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
- ASD Essential Eight Maturity Model compliance framework for AWS (Maturity Level One, Nov 2023) [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
@@ -70,6 +71,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Google Workspace check reports now store the actual domain or account resource subject instead of `provider.identity` [(#10901)](https://github.com/prowler-cloud/prowler/pull/10901)
- `entra_users_mfa_capable` evaluating disabled guest accounts; CIS 5.2.3.4 only targets enabled member users [(#10785)](https://github.com/prowler-cloud/prowler/pull/10785)
### 🐞 Fixed
- `load_and_validate_config_file` now unwraps namespaced config for every built-in and external provider, and no longer leaks the full file as the provider's config when the file is namespaced [(#10700)](https://github.com/prowler-cloud/prowler/pull/10700)
---
## [5.24.3] (Prowler v5.24.3)
+46 -12
View File
@@ -10,7 +10,6 @@ from colorama import Fore, Style
from colorama import init as colorama_init
from prowler.config.config import (
EXTERNAL_TOOL_PROVIDERS,
cloud_api_base_url,
csv_file_suffix,
get_available_compliance_frameworks,
@@ -207,9 +206,10 @@ def prowler():
# We treat the compliance framework as another output format
if compliance_framework:
args.output_formats.extend(compliance_framework)
# If no input compliance framework, set all, unless a specific service or check is input
# Skip for IAC and LLM providers that don't use compliance frameworks
elif default_execution and provider not in ["iac", "llm"]:
# If no input compliance framework, set all, unless a specific service or check is input.
# Skip for tool-wrapper providers (iac, llm, image, and any external plug-in
# declaring `is_external_tool_provider = True`) — they don't use compliance frameworks.
elif default_execution and not Provider.is_tool_wrapper_provider(provider):
args.output_formats.extend(get_available_compliance_frameworks(provider))
# Set Logger configuration
@@ -247,7 +247,7 @@ def prowler():
universal_frameworks = {}
# Skip compliance frameworks for external-tool providers
if provider not in EXTERNAL_TOOL_PROVIDERS:
if not Provider.is_tool_wrapper_provider(provider):
bulk_compliance_frameworks = Compliance.get_bulk(provider)
# Complete checks metadata with the compliance framework specification
bulk_checks_metadata = update_checks_metadata_with_compliance(
@@ -315,7 +315,7 @@ def prowler():
sys.exit()
# Skip service and check loading for external-tool providers
if provider not in EXTERNAL_TOOL_PROVIDERS:
if not Provider.is_tool_wrapper_provider(provider):
# Import custom checks from folder
if checks_folder:
custom_checks = parse_checks_from_folder(global_provider, checks_folder)
@@ -426,6 +426,9 @@ def prowler():
output_options = VercelOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
else:
# Dynamic fallback: any external/custom provider
output_options = global_provider.get_output_options(args, bulk_checks_metadata)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:
@@ -435,7 +438,7 @@ def prowler():
# Execute checks
findings = []
if provider in EXTERNAL_TOOL_PROVIDERS:
if Provider.is_tool_wrapper_provider(provider):
# For external-tool providers, run the scan directly
if provider == "llm":
@@ -445,12 +448,19 @@ def prowler():
findings = global_provider.run_scan(streaming_callback=streaming_callback)
else:
# Original behavior for IAC and Image
try:
if provider == "image":
try:
findings = global_provider.run()
except ImageBaseException as error:
logger.critical(f"{error}")
sys.exit(1)
else:
# IAC and external tool-wrapper providers registered via entry
# points. Unexpected failures propagate to the outer except
# Exception backstop further down in this file — keeping the
# branch free of an Image-specific catch that would otherwise
# mislead plug-in authors reading this code.
findings = global_provider.run()
except ImageBaseException as error:
logger.critical(f"{error}")
sys.exit(1)
# Note: External tool providers don't support granular progress tracking since
# they run external tools as a black box and return all findings at once.
# Progress tracking would just be 0% → 100%.
@@ -1343,6 +1353,30 @@ def prowler():
)
generated_outputs["compliance"].append(generic_compliance)
generic_compliance.batch_write_data_to_file()
else:
# Dynamic fallback: any external/custom provider
try:
global_provider.generate_compliance_output(
finding_outputs,
bulk_compliance_frameworks,
input_compliance_frameworks,
output_options,
generated_outputs,
)
except NotImplementedError:
# Last resort: generic compliance
for compliance_name in input_compliance_frameworks:
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
generic_compliance = GenericCompliance(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(generic_compliance)
generic_compliance.batch_write_data_to_file()
# AWS Security Hub Integration
if provider == "aws":
+61 -14
View File
@@ -1,3 +1,4 @@
import importlib.metadata
import os
import pathlib
from datetime import datetime, timezone
@@ -82,13 +83,38 @@ class Provider(str, Enum):
actual_directory = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
def _get_ep_compliance_dirs() -> dict:
"""Discover compliance directories from entry points. Returns {provider: path}."""
dirs = {}
for ep in importlib.metadata.entry_points(group="prowler.compliance"):
try:
module = ep.load()
if hasattr(module, "__path__"):
dirs[ep.name] = module.__path__[0]
elif hasattr(module, "__file__"):
dirs[ep.name] = os.path.dirname(module.__file__)
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return dirs
def get_available_compliance_frameworks(provider=None):
available_compliance_frameworks = []
providers = [p.value for p in Provider]
# Built-in compliance
compliance_base = f"{actual_directory}/../compliance"
if provider:
providers = [provider]
for current_provider in providers:
compliance_dir = f"{actual_directory}/../compliance/{current_provider}"
else:
# Scan compliance directory for all provider subdirectories
providers = []
if os.path.isdir(compliance_base):
for entry in os.scandir(compliance_base):
if entry.is_dir():
providers.append(entry.name)
for prov in providers:
compliance_dir = f"{compliance_base}/{prov}"
if not os.path.isdir(compliance_dir):
continue
with os.scandir(compliance_dir) as files:
@@ -97,7 +123,8 @@ def get_available_compliance_frameworks(provider=None):
available_compliance_frameworks.append(
file.name.removesuffix(".json")
)
# Also scan top-level compliance/ for multi-provider (universal) JSONs.
# Built-in multi-provider frameworks at top-level compliance/ directory.
# Placed before external entry points so built-ins win on name collisions.
# When a specific provider was requested, only include the framework if it
# declares support for that provider; otherwise include all universal frameworks.
compliance_root = f"{actual_directory}/../compliance"
@@ -114,6 +141,18 @@ def get_available_compliance_frameworks(provider=None):
continue
if name not in available_compliance_frameworks:
available_compliance_frameworks.append(name)
# External compliance via entry points.
# Multi-provider support for external plug-ins is tracked in PROWLER-1444.
ep_dirs = _get_ep_compliance_dirs()
for prov, path in ep_dirs.items():
if provider and prov != provider:
continue
if os.path.isdir(path):
for file in os.scandir(path):
if file.is_file() and file.name.endswith(".json"):
name = file.name.removesuffix(".json")
if name not in available_compliance_frameworks:
available_compliance_frameworks.append(name)
return available_compliance_frameworks
@@ -225,18 +264,26 @@ def load_and_validate_config_file(provider: str, config_file_path: str) -> dict:
with open(config_file_path, "r", encoding=encoding_format_utf_8) as f:
config_file = yaml.safe_load(f)
# Not to introduce a breaking change, allow the old format config file without any provider keys
# and a new format with a key for each provider to include their configuration values within.
if any(
key in config_file
for key in ["aws", "gcp", "azure", "kubernetes", "m365"]
# Namespaced format: each provider has its own top-level key.
# Works for every built-in and every external plugin without a hardcoded list.
# Flat legacy format is AWS-only (historical, pre-multicloud). We identify it
# by the absence of nested-dict top-level values (namespaced files always
# have dict values; the legacy AWS format only has primitives/lists).
if (
isinstance(config_file, dict)
and provider in config_file
and isinstance(config_file[provider], dict)
):
config = config_file.get(provider, {})
config = config_file.get(provider, {}) or {}
elif (
isinstance(config_file, dict)
and config_file
and provider == "aws"
and not any(isinstance(v, dict) for v in config_file.values())
):
config = config_file
else:
config = config_file if config_file else {}
# Not to break Azure, K8s and GCP does not support or use the old config format
if provider in ["azure", "gcp", "kubernetes", "m365"]:
config = {}
config = {}
return config
+53 -6
View File
@@ -1,4 +1,6 @@
import importlib
import importlib.metadata
import importlib.util
import json
import os
import re
@@ -19,6 +21,7 @@ from prowler.lib.check.utils import recover_checks_from_provider
from prowler.lib.logger import logger
from prowler.lib.outputs.outputs import report
from prowler.lib.utils.utils import open_file, parse_json_file, print_boxes
from prowler.providers.common.builtin import is_builtin_provider
from prowler.providers.common.models import Audit_Metadata
@@ -385,6 +388,45 @@ def import_check(check_path: str) -> ModuleType:
return lib
def _resolve_check_module(
provider_type: str, service: str, check_name: str
) -> ModuleType:
"""Resolve and import a check module.
Built-in wins on CheckID collision. Plug-ins are first-class extenders
(they can add new checks under new CheckIDs) but cannot override
existing built-ins — a security tool prefers fail-loud predictability
over silent overrides. CheckMetadata.get_bulk() applies the same
precedence on the metadata side (first-write-wins) and emits a warning
when a plug-in tries to override, so the user knows their plug-in
duplicate is being ignored and can rename it.
Gates the built-in branch on `is_builtin_provider(provider_type)` —
calling `find_spec` on `prowler.providers.{provider_type}.services...`
directly would propagate `ModuleNotFoundError` for external providers
(their parent package `prowler.providers.{provider_type}` does not
exist) instead of returning None. The leaf helper encapsulates the
safe lookup, so external providers go straight to entry points. For
built-ins we still use `find_spec` to distinguish "check doesn't
exist" from "check exists but failed to import" (broken transitive
dep, etc.).
"""
# Built-in first — built-in wins on CheckID collision
if is_builtin_provider(provider_type):
builtin_path = f"prowler.providers.{provider_type}.services.{service}.{check_name}.{check_name}"
if importlib.util.find_spec(builtin_path) is not None:
return import_check(builtin_path)
# Entry point lookup — only consulted when the built-in truly doesn't exist
for ep in importlib.metadata.entry_points(group=f"prowler.checks.{provider_type}"):
if ep.name == check_name:
return importlib.import_module(ep.value)
raise ModuleNotFoundError(
f"Check '{check_name}' not found for provider '{provider_type}'"
)
def run_fixer(check_findings: list) -> int:
"""
Run the fixer for the check if it exists and there are any FAIL findings
@@ -525,9 +567,10 @@ def execute_checks(
service = check_name.split("_")[0]
try:
try:
# Import check module
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Import check module (built-in or entry point)
lib = _resolve_check_module(
global_provider.type, service, check_name
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
@@ -605,9 +648,10 @@ def execute_checks(
)
try:
try:
# Import check module
check_module_path = f"prowler.providers.{global_provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Import check module (built-in or entry point)
lib = _resolve_check_module(
global_provider.type, service, check_name
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
@@ -745,6 +789,9 @@ def execute(
is_finding_muted_args["tenancy_id"] = (
global_provider.identity.tenancy_id
)
else:
# External/custom provider — delegate identity args
is_finding_muted_args = global_provider.get_mutelist_finding_args()
for finding in check_findings:
if global_provider.type == "cloudflare":
is_finding_muted_args["account_id"] = finding.account_id
+8 -3
View File
@@ -2,10 +2,10 @@ import sys
from colorama import Fore, Style
from prowler.config.config import EXTERNAL_TOOL_PROVIDERS
from prowler.lib.check.check import parse_checks_from_file
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.models import CheckMetadata, Severity
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
from prowler.lib.logger import logger
@@ -26,8 +26,13 @@ def load_checks_to_execute(
) -> set:
"""Generate the list of checks to execute based on the cloud provider and the input arguments given"""
try:
# Bypass check loading for providers that use external tools directly
if provider in EXTERNAL_TOOL_PROVIDERS:
# Bypass check loading for tool-wrapper providers — they delegate
# scanning to an external tool and have no checks to recover.
# Single source of truth across __main__, the CheckMetadata validators,
# check discovery and this loader, covering both built-in tool wrappers
# (iac/llm/image) and external plug-ins that declare
# `is_external_tool_provider = True` via the contract.
if is_tool_wrapper_provider(provider):
return set()
# Local subsets
+35 -5
View File
@@ -1,3 +1,4 @@
import importlib.metadata
import json
import os
import sys
@@ -434,26 +435,55 @@ class Compliance(BaseModel):
"""Bulk load all compliance frameworks specification into a dict"""
try:
bulk_compliance_frameworks = {}
# Built-in compliance from prowler/compliance/{provider}/
available_compliance_framework_modules = list_compliance_modules()
for compliance_framework in available_compliance_framework_modules:
if provider in compliance_framework.name:
compliance_specification_dir_path = (
f"{compliance_framework.module_finder.path}/{provider}"
)
# for compliance_framework in available_compliance_framework_modules:
for filename in os.listdir(compliance_specification_dir_path):
file_path = os.path.join(
compliance_specification_dir_path, filename
)
# Check if it is a file and ti size is greater than 0
if os.path.isfile(file_path) and os.stat(file_path).st_size > 0:
# Open Compliance file in JSON
# cis_v1.4_aws.json --> cis_v1.4_aws
compliance_framework_name = filename.split(".json")[0]
# Store the compliance info
bulk_compliance_frameworks[compliance_framework_name] = (
load_compliance_framework(file_path)
)
# External compliance via entry points
for ep in importlib.metadata.entry_points(group="prowler.compliance"):
if ep.name == provider:
try:
module = ep.load()
compliance_dir = (
module.__path__[0]
if hasattr(module, "__path__")
else os.path.dirname(module.__file__)
)
for filename in os.listdir(compliance_dir):
if filename.endswith(".json"):
file_path = os.path.join(compliance_dir, filename)
if (
os.path.isfile(file_path)
and os.stat(file_path).st_size > 0
):
compliance_framework_name = filename.split(".json")[
0
]
if (
compliance_framework_name
not in bulk_compliance_frameworks
):
bulk_compliance_frameworks[
compliance_framework_name
] = load_compliance_framework(file_path)
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as e:
logger.error(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}] -- {e}")
+37 -12
View File
@@ -11,10 +11,10 @@ from typing import Any, Dict, Optional, Set
from pydantic.v1 import BaseModel, Field, ValidationError, validator
from pydantic.v1.error_wrappers import ErrorWrapper
from prowler.config.config import EXTERNAL_TOOL_PROVIDERS, Provider
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.check.utils import recover_checks_from_provider
from prowler.lib.logger import logger
from prowler.providers.common.provider import Provider as ProviderABC
# Valid ResourceGroup values as defined in the RFC
VALID_RESOURCE_GROUPS = frozenset(
@@ -259,7 +259,7 @@ class CheckMetadata(BaseModel):
)
if (
value_lower not in VALID_CATEGORIES
and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS
and not ProviderABC.is_tool_wrapper_provider(values.get("Provider"))
):
raise ValueError(
f"Invalid category: '{value_lower}'. Must be one of: {', '.join(sorted(VALID_CATEGORIES))}."
@@ -288,7 +288,9 @@ class CheckMetadata(BaseModel):
raise ValueError("ServiceName must be a non-empty string")
check_id = values.get("CheckID")
if check_id and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if check_id and not ProviderABC.is_tool_wrapper_provider(
values.get("Provider")
):
service_from_check_id = check_id.split("_")[0]
if service_name != service_from_check_id:
raise ValueError(
@@ -304,7 +306,9 @@ class CheckMetadata(BaseModel):
if not check_id:
raise ValueError("CheckID must be a non-empty string")
if check_id and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if check_id and not ProviderABC.is_tool_wrapper_provider(
values.get("Provider")
):
if "-" in check_id:
raise ValueError(
f"CheckID {check_id} contains a hyphen, which is not allowed"
@@ -313,8 +317,9 @@ class CheckMetadata(BaseModel):
return check_id
@validator("CheckTitle", pre=True, always=True)
@classmethod
def validate_check_title(cls, check_title, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
if len(check_title) > 150:
raise ValueError(
f"CheckTitle must not exceed 150 characters, got {len(check_title)} characters"
@@ -326,14 +331,18 @@ class CheckMetadata(BaseModel):
return check_title
@validator("RelatedUrl", pre=True, always=True)
@classmethod
def validate_related_url(cls, related_url, values): # noqa: F841
if related_url and values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if related_url and not ProviderABC.is_tool_wrapper_provider(
values.get("Provider")
):
raise ValueError("RelatedUrl must be empty. This field is deprecated.")
return related_url
@validator("Remediation")
@classmethod
def validate_recommendation_url(cls, remediation, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
url = remediation.Recommendation.Url
if url and not url.startswith("https://hub.prowler.com/"):
raise ValueError(
@@ -346,7 +355,7 @@ class CheckMetadata(BaseModel):
provider = values.get("Provider", "").lower()
# Non-AWS providers must have an empty CheckType list
if provider != "aws" and provider not in EXTERNAL_TOOL_PROVIDERS:
if provider != "aws" and not ProviderABC.is_tool_wrapper_provider(provider):
if check_type:
raise ValueError(
f"CheckType must be empty for non-AWS providers. Got {check_type} for provider '{provider}'."
@@ -371,8 +380,9 @@ class CheckMetadata(BaseModel):
return check_type
@validator("Description", pre=True, always=True)
@classmethod
def validate_description(cls, description, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
if len(description) > 400:
raise ValueError(
f"Description must not exceed 400 characters, got {len(description)} characters"
@@ -380,8 +390,9 @@ class CheckMetadata(BaseModel):
return description
@validator("Risk", pre=True, always=True)
@classmethod
def validate_risk(cls, risk, values): # noqa: F841
if values.get("Provider") not in EXTERNAL_TOOL_PROVIDERS:
if not ProviderABC.is_tool_wrapper_provider(values.get("Provider")):
if len(risk) > 400:
raise ValueError(
f"Risk must not exceed 400 characters, got {len(risk)} characters"
@@ -433,6 +444,20 @@ class CheckMetadata(BaseModel):
metadata_file = f"{check_path}/{check_name}.metadata.json"
# Load metadata
check_metadata = load_check_metadata(metadata_file)
# Built-in wins on CheckID collision. Plug-in entry points are
# appended after built-ins by `recover_checks_from_provider`, so
# a duplicate CheckID here means an entry-point check is trying
# to override a built-in. Ignore the override (the built-in
# metadata stays) and surface it via a warning — matching the
# precedence enforced by `_resolve_check_module`.
if check_metadata.CheckID in bulk_check_metadata:
logger.warning(
f"Plug-in check metadata '{check_metadata.CheckID}' "
f"(loaded from '{metadata_file}') is being IGNORED — "
f"a built-in with the same CheckID exists. To use your "
f"plug-in, register it under a different CheckID."
)
continue
bulk_check_metadata[check_metadata.CheckID] = check_metadata
return bulk_check_metadata
@@ -470,7 +495,7 @@ class CheckMetadata(BaseModel):
# If the bulk checks metadata is not provided, get it
if not bulk_checks_metadata:
bulk_checks_metadata = {}
available_providers = [p.value for p in Provider]
available_providers = ProviderABC.get_available_providers()
for provider_name in available_providers:
bulk_checks_metadata.update(CheckMetadata.get_bulk(provider_name))
if provider:
@@ -495,7 +520,7 @@ class CheckMetadata(BaseModel):
# Loaded here, as it is not always needed
if not bulk_compliance_frameworks:
bulk_compliance_frameworks = {}
available_providers = [p.value for p in Provider]
available_providers = ProviderABC.get_available_providers()
for provider in available_providers:
bulk_compliance_frameworks = Compliance.get_bulk(provider=provider)
checks_from_compliance_framework = (
+57
View File
@@ -0,0 +1,57 @@
"""Standalone helper for tool-wrapper provider detection.
A provider is a "tool wrapper" if it delegates scanning to an external tool
(Trivy, promptfoo, etc.) instead of running checks/services through the
standard Prowler engine. This module is the single source of truth for that
classification across the codebase.
Kept as a leaf module with no Prowler imports beyond the leaf
`external_tool_providers` so it can be referenced from `prowler.lib.check.*`
and `prowler.providers.common.provider` without forming an import cycle.
"""
import importlib.metadata
from prowler.lib.check.external_tool_providers import EXTERNAL_TOOL_PROVIDERS
# Module-level cache for entry-point classes consulted by this helper.
# Independent of `Provider._ep_providers` to keep this module leaf — the cost
# of a duplicate cache entry is negligible (one class object per external
# provider, loaded lazily on first lookup).
_ep_class_cache: dict = {}
def _load_ep_class(provider: str):
"""Return the entry-point provider class for `provider`, or None.
Caches the result in `_ep_class_cache`. Errors during entry-point loading
are swallowed (returning None) so a broken plug-in never crashes the
is-tool-wrapper check; it just falls through to "not a tool wrapper".
"""
if provider in _ep_class_cache:
return _ep_class_cache[provider]
for ep in importlib.metadata.entry_points(group="prowler.providers"):
if ep.name == provider:
try:
cls = ep.load()
except Exception:
cls = None
_ep_class_cache[provider] = cls
return cls
_ep_class_cache[provider] = None
return None
def is_tool_wrapper_provider(provider: str) -> bool:
"""Return True if the provider delegates scanning to an external tool.
Combines the built-in `EXTERNAL_TOOL_PROVIDERS` frozenset (fast path for
iac/llm/image) with the `is_external_tool_provider` class attribute of
external plug-ins registered via entry points. This is the single source
of truth consulted by `__main__`, the `CheckMetadata` validators, the
check-loading utilities, and the checks loader.
"""
if provider in EXTERNAL_TOOL_PROVIDERS:
return True
cls = _load_ep_class(provider)
return bool(cls and getattr(cls, "is_external_tool_provider", False))
+84 -23
View File
@@ -1,9 +1,43 @@
import importlib
import importlib.metadata
import importlib.util
import os
import sys
from pkgutil import walk_packages
from prowler.lib.check.external_tool_providers import EXTERNAL_TOOL_PROVIDERS
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
from prowler.lib.logger import logger
from prowler.providers.common.builtin import is_builtin_provider
def _recover_ep_checks(provider: str, service: str = None) -> list[tuple]:
"""Discover external checks registered via entry points for a provider.
External plugins follow the same layout as built-ins:
`{plugin_root}.services.{service}.{check}.{check}`
When `service` is provided, only entry points whose dotted path contains
`.services.{service}.` are included — mirroring how built-in discovery
filters by the `prowler.providers.{provider}.services.{service}` package.
Uses find_spec to locate the check module without importing it,
avoiding service client initialization at discovery time.
"""
checks = []
for ep in importlib.metadata.entry_points(group=f"prowler.checks.{provider}"):
try:
if service and f".services.{service}." not in ep.value:
continue
spec = importlib.util.find_spec(ep.value)
if spec and spec.origin:
check_path = os.path.dirname(spec.origin)
checks.append((ep.name, check_path))
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return checks
def recover_checks_from_provider(
@@ -15,29 +49,55 @@ def recover_checks_from_provider(
Returns a list of tuples with the following format (check_name, check_path)
"""
try:
# Bypass check loading for providers that use external tools directly
if provider in EXTERNAL_TOOL_PROVIDERS:
# Bypass check loading for tool-wrapper providers — they delegate
# scanning to an external tool and have no checks to recover.
# Single source of truth: combines the EXTERNAL_TOOL_PROVIDERS
# frozenset (built-ins) with the per-provider `is_external_tool_provider`
# class attribute (so external plug-ins opt in via the contract).
if is_tool_wrapper_provider(provider):
return []
checks = []
modules = list_modules(provider, service)
for module_name in modules:
# Format: "prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
check_module_name = module_name.name
# We need to exclude common shared libraries in services
if (
check_module_name.count(".") == 6
and ".lib." not in check_module_name
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path
# Check name is the last part of the check_module_name
check_name = check_module_name.split(".")[-1]
check_info = (check_name, check_path)
checks.append(check_info)
except ModuleNotFoundError:
logger.critical(f"Service {service} was not found for the {provider} provider.")
sys.exit(1)
# Built-in checks from prowler.providers.{provider}.services. Gate
# the built-in branch on `is_builtin_provider(provider)` — calling
# `find_spec` directly on `prowler.providers.{provider}.services`
# would propagate `ModuleNotFoundError` when the parent package
# `prowler.providers.{provider}` does not exist (i.e. the provider
# is external), instead of returning None. The leaf helper
# encapsulates the safe lookup, so we only run the built-in
# discovery when the provider actually ships with the SDK; for
# external providers we go straight to entry points.
if is_builtin_provider(provider):
modules = list_modules(provider, service)
for module_name in modules:
# Format: "prowler.providers.{provider}.services.{service}.{check_name}.{check_name}"
check_module_name = module_name.name
# We need to exclude common shared libraries in services
if (
check_module_name.count(".") == 6
and ".lib." not in check_module_name
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path
check_name = check_module_name.split(".")[-1]
check_info = (check_name, check_path)
checks.append(check_info)
# External checks registered via entry points — always consulted, with
# optional service filter. Previously gated by `if not service:`, which
# prevented external providers from being usable with --service.
checks.extend(_recover_ep_checks(provider, service))
# A service was requested but nothing matched in either built-ins or
# entry points — surface this as a clear error instead of silently
# returning an empty list.
if service and not checks:
logger.critical(
f"Service '{service}' was not found for the '{provider}' provider "
f"(neither as a built-in nor via external entry points)."
)
sys.exit(1)
except Exception as e:
logger.critical(f"{e.__class__.__name__}[{e.__traceback__.tb_lineno}]: {e}")
sys.exit(1)
@@ -64,8 +124,9 @@ def recover_checks_from_service(service_list: list, provider: str) -> set:
Returns a set of checks from the given services
"""
try:
# Bypass check loading for providers that use external tools directly
if provider in EXTERNAL_TOOL_PROVIDERS:
# Bypass check loading for tool-wrapper providers — symmetric with
# `recover_checks_from_provider` above, using the same source of truth.
if is_tool_wrapper_provider(provider):
return set()
checks = set()
+48 -7
View File
@@ -20,19 +20,58 @@ from prowler.providers.common.arguments import (
validate_provider_arguments,
validate_sarif_usage,
)
from prowler.providers.common.provider import Provider
class ProwlerArgumentParser:
# Set the default parser
def __init__(self):
# Discover any providers not in the hardcoded list below
# TODO - First step to support current providers and the new external provider implementation
known_providers = {
"aws",
"azure",
"gcp",
"kubernetes",
"m365",
"github",
"googleworkspace",
"cloudflare",
"oraclecloud",
"openstack",
"alibabacloud",
"iac",
"llm",
"image",
"nhn",
"mongodbatlas",
"vercel",
}
all_providers = set(Provider.get_available_providers())
new_providers = sorted(all_providers - known_providers)
# Build extra strings for dynamically discovered providers
extra_providers_csv = ""
extra_providers_text = ""
if new_providers:
providers_help = Provider.get_providers_help_text()
extra_providers_csv = "," + ",".join(new_providers)
extra_lines = []
for name in new_providers:
help_text = providers_help.get(name, "")
if help_text:
extra_lines.append(f" {name:<20}{help_text}")
if extra_lines:
extra_providers_text = "\n" + "\n".join(extra_lines)
# CLI Arguments
self.parser = argparse.ArgumentParser(
prog="prowler",
formatter_class=RawTextHelpFormatter,
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm} ...",
epilog="""
usage=f"prowler [-h] [--version] {{aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm{extra_providers_csv}}} ...",
epilog=f"""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,googleworkspace,iac,llm,image,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel}
{{aws,azure,gcp,kubernetes,m365,github,googleworkspace,nhn,mongodbatlas,oraclecloud,alibabacloud,cloudflare,openstack,vercel,dashboard,iac,image,llm{extra_providers_csv}}}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
@@ -49,13 +88,13 @@ Available Cloud Providers:
image Container Image Provider
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider
vercel Vercel Provider
vercel Vercel Provider{extra_providers_text}
Available components:
dashboard Local dashboard
To see the different available options on a specific component, run:
prowler {provider|dashboard} -h|--help
prowler {{provider|dashboard}} -h|--help
Detailed documentation at https://docs.prowler.com
""",
@@ -114,8 +153,10 @@ Detailed documentation at https://docs.prowler.com
and (sys.argv[1] not in ("-v", "--version"))
):
# Since the provider is always the second argument, we are checking if
# a flag, starting by "-", is supplied
if "-" in sys.argv[1]:
# a flag is supplied. Use startswith("-") instead of "in" to avoid
# matching external provider names that contain hyphens
# (e.g. "local-acme-snowflake").
if sys.argv[1].startswith("-"):
sys.argv = self.__set_default_provider__(sys.argv)
# Provider aliases mapping
+26 -8
View File
@@ -243,14 +243,32 @@ def display_compliance_table(
compliance_overview,
)
else:
get_generic_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
# Try provider-specific table first, fall back to generic
from prowler.providers.common.provider import Provider
provider = Provider.get_global_provider()
handled = False
if provider is not None:
try:
handled = provider.display_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
except NotImplementedError:
handled = False
if not handled:
get_generic_compliance_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
+5
View File
@@ -474,6 +474,11 @@ class Finding(BaseModel):
check_output, "fixed_version", ""
)
else:
# Dynamic fallback: any external/custom provider
provider_data = provider.get_finding_output_data(check_output)
output_data.update(provider_data)
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name
+7 -5
View File
@@ -1417,11 +1417,13 @@ class HTML(Output):
# Azure_provider --> azure
# Kubernetes_provider --> kubernetes
# Dynamically get the Provider quick inventory handler
provider_html_assessment_summary_function = (
f"get_{provider.type}_assessment_summary"
)
return getattr(HTML, provider_html_assessment_summary_function)(provider)
# Try static method first, fall back to provider method
method_name = f"get_{provider.type}_assessment_summary"
if hasattr(HTML, method_name):
return getattr(HTML, method_name)(provider)
else:
# Dynamic fallback: any external/custom provider
return provider.get_html_assessment_summary()
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
+33 -19
View File
@@ -7,39 +7,46 @@ from prowler.lib.outputs.common import Status
from prowler.lib.outputs.finding import Finding
def stdout_report(finding, color, verbose, status, fix):
def stdout_report(finding, color, verbose, status, fix, provider=None):
if finding.check_metadata.Provider == "aws":
details = finding.region
if finding.check_metadata.Provider == "azure":
elif finding.check_metadata.Provider == "azure":
details = finding.location
if finding.check_metadata.Provider == "gcp":
elif finding.check_metadata.Provider == "gcp":
details = finding.location.lower()
if finding.check_metadata.Provider == "kubernetes":
elif finding.check_metadata.Provider == "kubernetes":
details = finding.namespace.lower()
if finding.check_metadata.Provider == "github":
elif finding.check_metadata.Provider == "github":
details = finding.owner
if finding.check_metadata.Provider == "m365":
elif finding.check_metadata.Provider == "m365":
details = finding.location
if finding.check_metadata.Provider == "mongodbatlas":
elif finding.check_metadata.Provider == "mongodbatlas":
details = finding.location
if finding.check_metadata.Provider == "nhn":
elif finding.check_metadata.Provider == "nhn":
details = finding.location
if finding.check_metadata.Provider == "llm":
elif finding.check_metadata.Provider == "llm":
details = finding.check_metadata.CheckID
if finding.check_metadata.Provider == "iac":
elif finding.check_metadata.Provider == "iac":
details = finding.check_metadata.CheckID
if finding.check_metadata.Provider == "oraclecloud":
elif finding.check_metadata.Provider == "oraclecloud":
details = finding.region
if finding.check_metadata.Provider == "alibabacloud":
elif finding.check_metadata.Provider == "alibabacloud":
details = finding.region
if finding.check_metadata.Provider == "openstack":
elif finding.check_metadata.Provider == "openstack":
details = finding.region
if finding.check_metadata.Provider == "cloudflare":
elif finding.check_metadata.Provider == "cloudflare":
details = finding.zone_name
if finding.check_metadata.Provider == "googleworkspace":
elif finding.check_metadata.Provider == "googleworkspace":
details = finding.location
if finding.check_metadata.Provider == "vercel":
elif finding.check_metadata.Provider == "vercel":
details = finding.region
else:
# Dynamic fallback: any external/custom provider
if provider is None:
from prowler.providers.common.provider import Provider
provider = Provider.get_global_provider()
details = provider.get_stdout_detail(finding)
if (verbose or fix) and (not status or finding.status in status):
if finding.muted:
@@ -59,12 +66,15 @@ def report(check_findings, provider, output_options):
if hasattr(output_options, "verbose"):
verbose = output_options.verbose
if check_findings:
# TO-DO Generic Function
if provider.type == "aws":
check_findings.sort(key=lambda x: x.region)
if provider.type == "azure":
elif provider.type == "azure":
check_findings.sort(key=lambda x: x.subscription)
else:
# Dynamic fallback: any external/custom provider
sort_key = provider.get_finding_sort_key()
if sort_key and isinstance(sort_key, str):
check_findings.sort(key=lambda x: getattr(x, sort_key, ""))
for finding in check_findings:
# Print findings by stdout
@@ -75,12 +85,16 @@ def report(check_findings, provider, output_options):
if hasattr(output_options, "fixer"):
fixer = output_options.fixer
color = set_report_color(finding.status, finding.muted)
# Pass the local `provider` through so the dynamic else inside
# `stdout_report` does not have to consult the global singleton
# — defeating the whole purpose of the new parameter.
stdout_report(
finding,
color,
verbose,
status,
fixer,
provider=provider,
)
else: # No service resources in the whole account
+3
View File
@@ -108,6 +108,9 @@ def display_summary_table(
)
else:
audited_entities = provider.identity.username or "Personal Account"
else:
# Dynamic fallback: any external/custom provider
entity_type, audited_entities = provider.get_summary_entity()
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):
+9 -4
View File
@@ -4,8 +4,8 @@ from types import SimpleNamespace
from typing import Generator
from prowler.lib.check.check import (
_resolve_check_module,
execute,
import_check,
list_services,
update_audit_metadata,
)
@@ -426,9 +426,14 @@ class Scan:
# Recover service from check name
service = get_service_name_from_check_name(check_name)
try:
# Import check module
check_module_path = f"prowler.providers.{self._provider.type}.services.{service}.{check_name}.{check_name}"
lib = import_check(check_module_path)
# Import check module (built-in or entry point) —
# delegates to `_resolve_check_module` so external
# providers registered via entry points are resolved
# correctly (their checks do not live under
# `prowler.providers.{type}.services...`).
lib = _resolve_check_module(
self._provider.type, service, check_name
)
# Recover functions from check
check_to_execute = getattr(lib, check_name)
check = check_to_execute()
+35 -12
View File
@@ -16,18 +16,41 @@ def init_providers_parser(self):
# We need to call the arguments parser for each provider
providers = Provider.get_available_providers()
for provider in providers:
try:
getattr(
import_module(
f"{providers_path}.{provider}.{provider_arguments_lib_path}"
),
init_provider_arguments_function,
)(self)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
# Discriminate built-in vs external upfront via find_spec, so an
# ImportError from a transitive dependency missing inside a built-in
# arguments module surfaces clearly instead of being silently
# re-routed to the entry-point path (which only has external providers).
if Provider.is_builtin(provider):
try:
getattr(
import_module(
f"{providers_path}.{provider}.{provider_arguments_lib_path}"
),
init_provider_arguments_function,
)(self)
except ImportError as e:
logger.critical(
f"Failed to load arguments for built-in provider '{provider}'. "
f"Missing dependency: {e}. "
f"Ensure all required dependencies are installed."
)
logger.debug("Full traceback:", exc_info=True)
sys.exit(1)
except Exception as error:
logger.critical(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
sys.exit(1)
else:
# External provider — init_parser classmethod via entry point
cls = Provider._load_ep_provider(provider)
if cls and hasattr(cls, "init_parser"):
try:
cls.init_parser(self)
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def validate_provider_arguments(arguments: Namespace) -> tuple[bool, str]:
+29
View File
@@ -0,0 +1,29 @@
"""Leaf helper for built-in provider detection.
Lives in its own module with no imports back into `prowler.lib.check` so
that callers in `prowler.lib.check.*` can ask "is this provider built-in?"
without creating an import cycle through `prowler.providers.common.provider`
(which transitively imports `prowler.config.config` and from there
`prowler.lib.check.compliance_models` / `prowler.lib.check.external_tool_providers`).
Same rationale as `prowler.lib.check.tool_wrapper`: extracting the predicate
to a leaf module is the canonical way to break the cycle in this codebase.
"""
import importlib.util
def is_builtin_provider(provider: str) -> bool:
"""Return True if the provider's own package ships with the SDK.
Wraps `importlib.util.find_spec` in `try/except (ImportError, ValueError)`
because `find_spec` propagates `ModuleNotFoundError` when a parent package
in the dotted path does not exist (instead of returning `None`). The
try/except is what makes the call safe for external providers, whose
package does not live under `prowler.providers.{provider}`.
"""
try:
spec = importlib.util.find_spec(f"prowler.providers.{provider}")
return spec is not None
except (ImportError, ValueError):
return False
+279 -28
View File
@@ -1,4 +1,6 @@
import importlib
import importlib.metadata
import importlib.util
import os
import pkgutil
import sys
@@ -136,6 +138,108 @@ class Provider(ABC):
"""
return set()
# --- Dynamic provider contract methods (not @abstractmethod for incremental migration) ---
_cli_help_text: str = ""
@classmethod
def from_cli_args(cls, arguments: Namespace, fixer_config: dict) -> "Provider":
"""Instantiate the provider from CLI arguments and return the instance.
The caller wires the returned instance into the global provider slot
via Provider.set_global_provider(). Implementations that already call
set_global_provider(self) from __init__ are also supported the call
site tolerates a None return in that case.
"""
raise NotImplementedError(f"{cls.__name__} has not implemented from_cli_args()")
def get_output_options(self, arguments, _bulk_checks_metadata):
"""Create the provider-specific OutputOptions."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_output_options()"
)
def get_stdout_detail(self, _finding) -> str:
"""Return the detail string for stdout reporting (region, location, etc.)."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_stdout_detail()"
)
def get_finding_sort_key(self) -> Optional[str]:
"""Return the attribute name to sort findings by, or None for no sorting."""
return None
def get_summary_entity(self) -> tuple:
"""Return (entity_type, audited_entities) for the summary table."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_summary_entity()"
)
def get_finding_output_data(self, _check_output) -> dict:
"""Return provider-specific fields for Finding.generate_output()."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_finding_output_data()"
)
def get_html_assessment_summary(self) -> str:
"""Return the HTML assessment summary card for this provider."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_html_assessment_summary()"
)
def generate_compliance_output(
self,
_findings,
_bulk_compliance_frameworks,
_input_compliance_frameworks,
_output_options,
_generated_outputs,
) -> None:
"""Generate compliance CSV output for this provider's frameworks."""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented generate_compliance_output()"
)
def get_mutelist_finding_args(self) -> dict:
"""Return extra kwargs for mutelist.is_finding_muted() besides 'finding'.
External providers must return a dict with the identity key their
Mutelist subclass expects, e.g. ``{"account_id": self.identity.account_id}``.
The ``finding`` kwarg is added automatically by the caller.
"""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented get_mutelist_finding_args()"
)
def display_compliance_table(
self,
_findings: list,
_bulk_checks_metadata: dict,
_compliance_framework: str,
_output_filename: str,
_output_directory: str,
_compliance_overview: bool,
) -> bool:
"""Render a custom compliance table in the terminal.
External providers can override this to display a detailed
compliance table (e.g., per-section breakdown). Return True
if the table was rendered, False to fall back to the generic table.
"""
raise NotImplementedError(
f"{self.__class__.__name__} has not implemented display_compliance_table()"
)
# Class-level flag: True for providers that delegate scanning to an external
# tool (e.g. Trivy, promptfoo) and bypass standard check/service loading and
# metadata validation. Subclasses override as `is_external_tool_provider = True`.
# Kept as a class attribute (not a property) so it can be read from the class
# without instantiation — the metadata validators in lib.check.models need to
# decide whether to relax validation before any provider instance exists.
is_external_tool_provider: bool = False
# --- End dynamic provider contract methods ---
@staticmethod
def get_excluded_regions_from_env() -> set:
"""Parse the PROWLER_AWS_DISALLOWED_REGIONS environment variable.
@@ -159,20 +263,70 @@ class Provider(ABC):
@staticmethod
def init_global_provider(arguments: Namespace) -> None:
try:
provider_class_path = (
f"{providers_path}.{arguments.provider}.{arguments.provider}_provider"
)
provider_class_name = f"{arguments.provider.capitalize()}Provider"
provider_class = getattr(
import_module(provider_class_path), provider_class_name
# Discriminate built-in vs external upfront via find_spec, so an
# ImportError from a transitive dependency missing inside a
# built-in's own import chain surfaces clearly instead of being
# silently re-routed to the entry-point path.
provider_class = None
if Provider.is_builtin(arguments.provider):
# Built-in wins on provider-name collision. Plug-ins are
# first-class extenders (they can register new provider
# names) but cannot override existing built-ins — a security
# tool prefers fail-loud predictability over silent
# overrides. Surface the override so the user knows their
# plug-in is being ignored and can rename it.
if Provider._load_ep_provider(arguments.provider) is not None:
logger.warning(
f"Plug-in provider '{arguments.provider}' registered "
f"via entry points is being IGNORED — a built-in with "
f"the same name exists. To use your plug-in, register "
f"it under a different name."
)
provider_class_path = f"{providers_path}.{arguments.provider}.{arguments.provider}_provider"
provider_class_name = f"{arguments.provider.capitalize()}Provider"
try:
provider_class = getattr(
import_module(provider_class_path), provider_class_name
)
except ImportError as e:
logger.critical(
f"Failed to load built-in provider '{arguments.provider}'. "
f"Missing dependency: {e}. "
f"Ensure all required dependencies are installed."
)
logger.debug("Full traceback:", exc_info=True)
sys.exit(1)
except AttributeError:
# Module exists but doesn't define the expected class —
# treat as external and try entry points.
provider_class = Provider._load_ep_provider(arguments.provider)
else:
provider_class = Provider._load_ep_provider(arguments.provider)
if provider_class is None:
raise ImportError(
f"Provider '{arguments.provider}' not found as built-in or entry point"
)
# Kept for downstream forks that may extend the dispatch below
# with their own custom built-in branches and reference this name.
# The upstream chain dispatches by `arguments.provider` directly.
provider_class_name = (
f"{arguments.provider.capitalize()}Provider" # noqa: F841
)
fixer_config = load_and_validate_config_file(
arguments.provider, arguments.fixer_config
)
# Dispatch by exact provider name (equality, not substring) so
# external plug-ins whose names contain a built-in substring
# (e.g. `awsx`, `azure_gov`, `iac_v2`) cannot be silently routed
# to the wrong built-in branch. Anything that doesn't match a
# built-in falls through to the dynamic else and uses the
# contract's `from_cli_args`.
if not isinstance(Provider._global, provider_class):
if "aws" in provider_class_name.lower():
if arguments.provider == "aws":
excluded_regions = (
set(arguments.excluded_region)
if getattr(arguments, "excluded_region", None)
@@ -196,7 +350,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "azure" in provider_class_name.lower():
elif arguments.provider == "azure":
provider_class(
az_cli_auth=arguments.az_cli_auth,
sp_env_auth=arguments.sp_env_auth,
@@ -209,7 +363,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "gcp" in provider_class_name.lower():
elif arguments.provider == "gcp":
provider_class(
retries_max_attempts=arguments.gcp_retries_max_attempts,
organization_id=arguments.organization_id,
@@ -223,7 +377,7 @@ class Provider(ABC):
fixer_config=fixer_config,
skip_api_check=arguments.skip_api_check,
)
elif "kubernetes" in provider_class_name.lower():
elif arguments.provider == "kubernetes":
provider_class(
kubeconfig_file=arguments.kubeconfig_file,
context=arguments.context,
@@ -233,7 +387,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "m365" in provider_class_name.lower():
elif arguments.provider == "m365":
provider_class(
region=arguments.region,
config_path=arguments.config_file,
@@ -247,7 +401,7 @@ class Provider(ABC):
init_modules=arguments.init_modules,
fixer_config=fixer_config,
)
elif "nhn" in provider_class_name.lower():
elif arguments.provider == "nhn":
provider_class(
username=arguments.nhn_username,
password=arguments.nhn_password,
@@ -256,7 +410,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "github" in provider_class_name.lower():
elif arguments.provider == "github":
orgs = []
repos = []
@@ -288,13 +442,13 @@ class Provider(ABC):
exclude_workflows=getattr(arguments, "exclude_workflows", []),
fixer_config=fixer_config,
)
elif "googleworkspace" in provider_class_name.lower():
elif arguments.provider == "googleworkspace":
provider_class(
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "cloudflare" in provider_class_name.lower():
elif arguments.provider == "cloudflare":
provider_class(
filter_zones=arguments.region,
filter_accounts=arguments.account_id,
@@ -302,7 +456,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "iac" in provider_class_name.lower():
elif arguments.provider == "iac":
provider_class(
scan_path=arguments.scan_path,
scan_repository_url=arguments.scan_repository_url,
@@ -315,13 +469,13 @@ class Provider(ABC):
oauth_app_token=arguments.oauth_app_token,
provider_uid=arguments.provider_uid,
)
elif "llm" in provider_class_name.lower():
elif arguments.provider == "llm":
provider_class(
max_concurrency=arguments.max_concurrency,
config_path=arguments.config_file,
fixer_config=fixer_config,
)
elif "image" in provider_class_name.lower():
elif arguments.provider == "image":
provider_class(
images=arguments.images,
image_list_file=arguments.image_list_file,
@@ -339,7 +493,7 @@ class Provider(ABC):
registry_insecure=arguments.registry_insecure,
registry_list_images=arguments.registry_list_images,
)
elif "mongodbatlas" in provider_class_name.lower():
elif arguments.provider == "mongodbatlas":
provider_class(
atlas_public_key=arguments.atlas_public_key,
atlas_private_key=arguments.atlas_private_key,
@@ -348,7 +502,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "oraclecloud" in provider_class_name.lower():
elif arguments.provider == "oraclecloud":
provider_class(
oci_config_file=arguments.oci_config_file,
profile=arguments.profile,
@@ -359,7 +513,7 @@ class Provider(ABC):
fixer_config=fixer_config,
use_instance_principal=arguments.use_instance_principal,
)
elif "openstack" in provider_class_name.lower():
elif arguments.provider == "openstack":
provider_class(
clouds_yaml_file=getattr(arguments, "clouds_yaml_file", None),
clouds_yaml_content=getattr(
@@ -384,7 +538,7 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "alibabacloud" in provider_class_name.lower():
elif arguments.provider == "alibabacloud":
provider_class(
role_arn=arguments.role_arn,
role_session_name=arguments.role_session_name,
@@ -396,13 +550,25 @@ class Provider(ABC):
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
elif "vercel" in provider_class_name.lower():
elif arguments.provider == "vercel":
provider_class(
projects=getattr(arguments, "project", None),
config_path=arguments.config_file,
mutelist_path=arguments.mutelist_file,
fixer_config=fixer_config,
)
else:
# Dynamic fallback: any external/custom provider.
# Honor the from_cli_args type hint (-> Provider): if the
# implementation returns an instance, wire it as the global
# provider here. Implementations that call
# set_global_provider(self) from __init__ return None and
# remain supported (the condition below is a no-op for them).
provider_instance = provider_class.from_cli_args(
arguments, fixer_config
)
if provider_instance is not None:
Provider.set_global_provider(provider_instance)
except TypeError as error:
logger.critical(
@@ -415,17 +581,102 @@ class Provider(ABC):
)
sys.exit(1)
# Cache for entry-point provider classes {name: class}
_ep_providers: dict = {}
@staticmethod
def get_available_providers() -> list[str]:
"""get_available_providers returns a list of the available providers"""
providers = []
# Dynamically import the package based on its string path
providers = set()
# Built-in providers from local package
prowler_providers = importlib.import_module(providers_path)
# Iterate over all modules found in the prowler_providers package
for _, provider, ispkg in pkgutil.iter_modules(prowler_providers.__path__):
if provider != "common" and ispkg:
providers.append(provider)
return providers
providers.add(provider)
# External providers registered via entry points
for ep in importlib.metadata.entry_points(group="prowler.providers"):
providers.add(ep.name)
return sorted(providers)
@staticmethod
def is_tool_wrapper_provider(provider: str) -> bool:
"""Return True if the provider delegates scanning to an external tool.
Delegates to `prowler.lib.check.tool_wrapper.is_tool_wrapper_provider`,
the leaf module that holds the actual logic. Kept on `Provider` as a
convenience entry point for callers that already import `Provider`.
"""
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider as _impl
return _impl(provider)
@staticmethod
def is_builtin(provider: str) -> bool:
"""Return True if the provider's own package is importable as a built-in.
Delegates to `prowler.providers.common.builtin.is_builtin_provider`,
the leaf module that holds the actual check. Kept on `Provider` as a
convenience entry point for callers that already import `Provider`.
Call sites in `prowler.lib.check.*` should import from the leaf
directly to avoid the import cycle through this module.
"""
from prowler.providers.common.builtin import is_builtin_provider as _impl
return _impl(provider)
@staticmethod
def _load_ep_provider(name: str):
"""Load an external provider class from entry points, with cache.
Caches both hits and misses so repeated lookups for unknown names do
not re-iterate entry_points(). Symmetric with
tool_wrapper._ep_class_cache.
"""
if name in Provider._ep_providers:
return Provider._ep_providers[name]
for ep in importlib.metadata.entry_points(group="prowler.providers"):
if ep.name == name:
try:
cls = ep.load()
Provider._ep_providers[name] = cls
return cls
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
Provider._ep_providers[name] = None
return None
@staticmethod
def get_providers_help_text() -> dict:
"""Returns a dict of {provider_name: cli_help_text} for all available providers."""
help_text = {}
for name in Provider.get_available_providers():
try:
# Try built-in first
module_path = f"{providers_path}.{name}.{name}_provider"
module = import_module(module_path)
cls = None
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and issubclass(attr, Provider)
and attr is not Provider
):
cls = attr
break
help_text[name] = getattr(cls, "_cli_help_text", "") if cls else ""
except ImportError:
# External provider — load via entry point
cls = Provider._load_ep_provider(name)
help_text[name] = getattr(cls, "_cli_help_text", "") if cls else ""
except Exception as error:
logger.warning(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
help_text[name] = ""
return help_text
@staticmethod
def update_provider_config(audit_config: dict, variable: str, value: str):
+53 -1
View File
@@ -17,7 +17,7 @@ MOCK_OLD_PROWLER_VERSION = "0.0.0"
MOCK_PROWLER_MASTER_VERSION = "3.4.0"
def mock_prowler_get_latest_release(_, **kwargs):
def mock_prowler_get_latest_release(_, **_kwargs):
"""Mock requests.get() to get the Prowler latest release"""
response = Response()
response._content = b'[{"name":"3.3.0"}]'
@@ -463,6 +463,32 @@ class Test_Config:
all_frameworks = get_available_compliance_frameworks()
assert "csa_ccm_4.0" in all_frameworks
@mock.patch("prowler.config.config._get_ep_compliance_dirs")
def test_get_available_compliance_frameworks_dedupes_ep_collisions_with_builtins(
self, mock_dirs
):
"""Entry-point compliance frameworks that collide with a built-in
name must appear only once in the available frameworks list.
Built-in wins silently same policy as the universal frameworks
loop and as Compliance.get_bulk."""
import json
import tempfile
with tempfile.TemporaryDirectory() as tmpdir:
# cis_2.0_aws ships as a built-in under prowler/compliance/aws/
json_path = os.path.join(tmpdir, "cis_2.0_aws.json")
with open(json_path, "w") as f:
json.dump({"Framework": "CIS", "Provider": "aws"}, f)
mock_dirs.return_value = {"aws": tmpdir}
frameworks = get_available_compliance_frameworks("aws")
assert frameworks.count("cis_2.0_aws") == 1, (
f"Expected cis_2.0_aws to appear exactly once, got "
f"{frameworks.count('cis_2.0_aws')} occurrences in: {frameworks}"
)
def test_load_and_validate_config_file_aws(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config.yaml"
@@ -500,6 +526,32 @@ class Test_Config:
assert load_and_validate_config_file("azure", config_test_file) == {}
assert load_and_validate_config_file("kubernetes", config_test_file) == {}
def test_load_and_validate_config_file_namespaced_non_listed_provider(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config_namespaced_external.yaml"
# github is a built-in not in the legacy hardcoded list; namespaced format must unwrap it.
assert load_and_validate_config_file("github", config_test_file) == {
"token": "abc",
"org": "prowler-cloud",
}
def test_load_and_validate_config_file_namespaced_external_provider(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config_namespaced_external.yaml"
# External plug-in provider: namespaced format must unwrap its block.
assert load_and_validate_config_file("custom_plugin", config_test_file) == {
"setting": "value",
"nested": {"key": 42},
}
def test_load_and_validate_config_file_namespaced_missing_provider(self):
path = pathlib.Path(os.path.dirname(os.path.realpath(__file__)))
config_test_file = f"{path}/fixtures/config_namespaced_external.yaml"
# Provider with no section in a namespaced file must return empty config,
# not the full file (prevents cross-provider config leakage).
assert load_and_validate_config_file("aws", config_test_file) == {}
assert load_and_validate_config_file("gcp", config_test_file) == {}
def test_load_and_validate_config_file_invalid_config_file_path(self, caplog):
provider = "aws"
config_file_path = "invalid/path/to/fixer_config.yaml"
@@ -0,0 +1,8 @@
# Namespaced config covering a non-listed built-in (github) and an external plugin.
github:
token: abc
org: prowler-cloud
custom_plugin:
setting: value
nested:
key: 42
+32
View File
@@ -95,6 +95,38 @@ class TestCheckMetada:
"/path/to/accessanalyzer_enabled/accessanalyzer_enabled.metadata.json"
)
@mock.patch("prowler.lib.check.models.logger")
@mock.patch("prowler.lib.check.models.load_check_metadata")
@mock.patch("prowler.lib.check.models.recover_checks_from_provider")
def test_get_bulk_builtin_wins_on_check_id_collision(
self, mock_recover_checks, mock_load_metadata, mock_logger
):
"""Regression guard: when an entry-point plug-in re-registers a
built-in CheckID, the BUILT-IN metadata wins (first-write-wins) and
the plug-in is IGNORED. The override is surfaced via a warning so
the user knows their plug-in duplicate is being skipped and can
rename it. Matches the precedence in `_resolve_check_module`. See
PR #10700 review (HugoPBrito)."""
# Built-in first, plug-in last (matches recover_checks_from_provider order)
mock_recover_checks.return_value = [
("accessanalyzer_enabled", "/builtin/accessanalyzer_enabled"),
("accessanalyzer_enabled", "/plugin/accessanalyzer_enabled"),
]
builtin_metadata = mock.MagicMock(CheckID="accessanalyzer_enabled")
plugin_metadata = mock.MagicMock(CheckID="accessanalyzer_enabled")
mock_load_metadata.side_effect = [builtin_metadata, plugin_metadata]
result = CheckMetadata.get_bulk(provider="aws")
# Built-in wins (first-write-wins on CheckID), plug-in is ignored
assert result["accessanalyzer_enabled"] is builtin_metadata
# Override is surfaced via warning naming the plug-in metadata file
mock_logger.warning.assert_called_once()
warning_msg = mock_logger.warning.call_args.args[0]
assert "accessanalyzer_enabled" in warning_msg
assert "/plugin/accessanalyzer_enabled" in warning_msg
@mock.patch("prowler.lib.check.models.load_check_metadata")
@mock.patch("prowler.lib.check.models.recover_checks_from_provider")
def test_list(self, mock_recover_checks, mock_load_metadata):
+110
View File
@@ -0,0 +1,110 @@
"""Unit tests for prowler.lib.check.tool_wrapper.
Covers the leaf helper directly (Provider.is_tool_wrapper_provider delegates
to it). Tests the frozenset fast path, the entry-point fallback for external
plug-ins, the broken-plug-in path, the no-match path, and the module-level
cache.
"""
from unittest.mock import MagicMock, patch
import pytest
@pytest.fixture(autouse=True)
def _clear_ep_class_cache():
"""Reset the leaf module's cache between tests so they stay independent."""
from prowler.lib.check import tool_wrapper
tool_wrapper._ep_class_cache.clear()
yield
tool_wrapper._ep_class_cache.clear()
def _make_entry_point(name, cls):
"""Create a mock entry point whose `load()` returns `cls`."""
ep = MagicMock()
ep.name = name
ep.load.return_value = cls
return ep
class TestIsToolWrapperProvider:
"""is_tool_wrapper_provider: frozenset + entry-point fallback."""
@pytest.mark.parametrize("name", ["iac", "llm", "image"])
def test_returns_true_for_builtin_tool_wrappers(self, name):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
assert is_tool_wrapper_provider(name) is True
@pytest.mark.parametrize("name", ["aws", "azure", "gcp", "github", "kubernetes"])
def test_returns_false_for_regular_builtins(self, name):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
assert is_tool_wrapper_provider(name) is False
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_true_for_external_plugin_with_flag(self, mock_eps):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
cls = MagicMock(is_external_tool_provider=True)
mock_eps.return_value = [_make_entry_point("custom_wrapper", cls)]
assert is_tool_wrapper_provider("custom_wrapper") is True
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_false_for_external_plugin_without_flag(self, mock_eps):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
cls = MagicMock(is_external_tool_provider=False)
mock_eps.return_value = [_make_entry_point("vanilla_external", cls)]
assert is_tool_wrapper_provider("vanilla_external") is False
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_false_for_unknown_provider(self, mock_eps):
from prowler.lib.check.tool_wrapper import is_tool_wrapper_provider
mock_eps.return_value = []
assert is_tool_wrapper_provider("does-not-exist") is False
class TestLoadEpClass:
"""_load_ep_class: cache, broken plug-ins, no-match."""
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_caches_result_across_calls(self, mock_eps):
from prowler.lib.check.tool_wrapper import _load_ep_class
cls = MagicMock(is_external_tool_provider=True)
mock_eps.return_value = [_make_entry_point("cached_one", cls)]
first = _load_ep_class("cached_one")
second = _load_ep_class("cached_one")
assert first is cls
assert second is cls
# entry_points consulted only on the first call
assert mock_eps.call_count == 1
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_none_for_broken_plugin(self, mock_eps):
from prowler.lib.check.tool_wrapper import _load_ep_class
broken_ep = MagicMock()
broken_ep.name = "broken"
broken_ep.load.side_effect = ImportError("plug-in is broken")
mock_eps.return_value = [broken_ep]
assert _load_ep_class("broken") is None
@patch("prowler.lib.check.tool_wrapper.importlib.metadata.entry_points")
def test_returns_none_when_no_entry_point_matches(self, mock_eps):
from prowler.lib.check.tool_wrapper import _load_ep_class
cls = MagicMock()
mock_eps.return_value = [_make_entry_point("other_provider", cls)]
assert _load_ep_class("missing_provider") is None
+4 -4
View File
@@ -51,7 +51,7 @@ def mock_provider():
def mock_execute():
with mock.patch("prowler.lib.scan.scan.execute", autospec=True) as mock_exec:
findings = [finding]
mock_exec.side_effect = lambda *args, **kwargs: findings
mock_exec.side_effect = lambda *_args, **_kwargs: findings
yield mock_exec
@@ -264,10 +264,10 @@ class TestScan:
@patch("prowler.lib.scan.scan.update_checks_metadata_with_compliance")
@patch("prowler.lib.scan.scan.Compliance.get_bulk")
@patch("prowler.lib.scan.scan.CheckMetadata.get_bulk")
@patch("prowler.lib.scan.scan.import_check")
@patch("prowler.lib.scan.scan._resolve_check_module")
def test_scan(
self,
mock_import_check,
mock_resolve_check_module,
mock_get_bulk,
mock_compliance_get_bulk,
mock_update_checks_metadata,
@@ -285,7 +285,7 @@ class TestScan:
mock_check_instance.CheckTitle = "Check if IAM Access Analyzer is enabled"
mock_check_instance.Categories = []
mock_import_check.return_value = MagicMock(
mock_resolve_check_module.return_value = MagicMock(
accessanalyzer_enabled=mock_check_class
)
View File
File diff suppressed because it is too large Load Diff