mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
2767 lines
106 KiB
Python
2767 lines
106 KiB
Python
"""
|
|
Tests for dynamic provider loading via entry points.
|
|
|
|
Covers: provider discovery, check discovery, check execution,
|
|
CLI argument registration, compliance frameworks, parser integration,
|
|
and all dispatch fallbacks for external providers.
|
|
"""
|
|
|
|
from argparse import Namespace
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from prowler.providers.common.provider import Provider
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_entry_point(name, value, group):
|
|
"""Create a mock entry point."""
|
|
ep = MagicMock()
|
|
ep.name = name
|
|
ep.value = value
|
|
ep.group = group
|
|
return ep
|
|
|
|
|
|
class FakeExternalProvider(Provider):
|
|
"""Minimal Provider subclass for testing the dynamic contract."""
|
|
|
|
_type = "fakeexternal"
|
|
_cli_help_text = "Fake External Provider"
|
|
|
|
def __init__(self):
|
|
Provider.set_global_provider(self)
|
|
|
|
@property
|
|
def type(self):
|
|
return self._type
|
|
|
|
@property
|
|
def identity(self):
|
|
return MagicMock(host_id="fake-host-1")
|
|
|
|
@property
|
|
def session(self):
|
|
return MagicMock()
|
|
|
|
@property
|
|
def audit_config(self):
|
|
return {}
|
|
|
|
def setup_session(self):
|
|
return MagicMock()
|
|
|
|
def print_credentials(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def from_cli_args(cls, _arguments, _fixer_config):
|
|
cls()
|
|
|
|
def get_output_options(self, _arguments, _bulk_checks_metadata):
|
|
return MagicMock(output_directory="/tmp", output_filename="fake")
|
|
|
|
def get_stdout_detail(self, finding):
|
|
return "fake-detail"
|
|
|
|
def get_finding_sort_key(self):
|
|
return "region"
|
|
|
|
def get_summary_entity(self):
|
|
return ("Fake Host", "fake-host-1")
|
|
|
|
def get_finding_output_data(self, check_output):
|
|
return {
|
|
"auth_method": "fake",
|
|
"account_uid": "fake-account",
|
|
"account_name": "fake",
|
|
"resource_name": "fake-resource",
|
|
"resource_uid": "fake-uid",
|
|
"region": "local",
|
|
}
|
|
|
|
def get_mutelist_finding_args(self):
|
|
return {"host_id": self.identity.host_id}
|
|
|
|
def display_compliance_table(
|
|
self,
|
|
findings,
|
|
_bulk_checks_metadata,
|
|
_compliance_framework,
|
|
_output_filename,
|
|
output_directory, # referenced via name elsewhere in tests
|
|
_compliance_overview,
|
|
):
|
|
return True
|
|
|
|
def get_html_assessment_summary(self):
|
|
return "<div>Fake Assessment</div>"
|
|
|
|
def generate_compliance_output(
|
|
self,
|
|
findings,
|
|
_bulk_compliance_frameworks,
|
|
_input_compliance_frameworks,
|
|
output_options,
|
|
generated_outputs,
|
|
):
|
|
generated_outputs["compliance"].append("fake-compliance-output")
|
|
|
|
@classmethod
|
|
def init_parser(cls, parser_instance):
|
|
pass
|
|
|
|
|
|
class FakeToolWrapperProvider(Provider):
|
|
"""External provider that declares itself a tool wrapper."""
|
|
|
|
_type = "faketoolwrapper"
|
|
is_external_tool_provider = True
|
|
|
|
@property
|
|
def type(self):
|
|
return self._type
|
|
|
|
@property
|
|
def identity(self):
|
|
return MagicMock()
|
|
|
|
@property
|
|
def session(self):
|
|
return MagicMock()
|
|
|
|
@property
|
|
def audit_config(self):
|
|
return {}
|
|
|
|
def setup_session(self):
|
|
return MagicMock()
|
|
|
|
def print_credentials(self):
|
|
pass
|
|
|
|
|
|
class FakePureContractProvider(Provider):
|
|
"""External provider that honors the from_cli_args type hint literally:
|
|
returns an instance without calling Provider.set_global_provider() from
|
|
__init__. Used to verify the call site wires the returned instance."""
|
|
|
|
_type = "fakepure"
|
|
|
|
@property
|
|
def type(self):
|
|
return self._type
|
|
|
|
@property
|
|
def identity(self):
|
|
return MagicMock(host_id="fake-pure-1")
|
|
|
|
@property
|
|
def session(self):
|
|
return MagicMock()
|
|
|
|
@property
|
|
def audit_config(self):
|
|
return {}
|
|
|
|
def setup_session(self):
|
|
return MagicMock()
|
|
|
|
def print_credentials(self):
|
|
pass
|
|
|
|
@classmethod
|
|
def from_cli_args(cls, _arguments, _fixer_config):
|
|
# Literal contract: return the instance, no side-effect in __init__.
|
|
return cls()
|
|
|
|
|
|
class FakeProviderNoHelpText(Provider):
|
|
"""Provider without _cli_help_text."""
|
|
|
|
_type = "nohelptext"
|
|
|
|
@property
|
|
def type(self):
|
|
return self._type
|
|
|
|
@property
|
|
def identity(self):
|
|
return MagicMock()
|
|
|
|
@property
|
|
def session(self):
|
|
return MagicMock()
|
|
|
|
@property
|
|
def audit_config(self):
|
|
return {}
|
|
|
|
def setup_session(self):
|
|
return MagicMock()
|
|
|
|
def print_credentials(self):
|
|
pass
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture(autouse=True)
|
|
def _clear_ep_cache():
|
|
"""Clear the entry point provider cache before each test."""
|
|
Provider._ep_providers = {}
|
|
yield
|
|
Provider._ep_providers = {}
|
|
|
|
|
|
@pytest.fixture
|
|
def fake_provider():
|
|
"""Create and register a FakeExternalProvider."""
|
|
p = FakeExternalProvider()
|
|
yield p
|
|
Provider._global = None
|
|
|
|
|
|
# ===========================================================================
|
|
# 1. Provider Discovery & Loading
|
|
# ===========================================================================
|
|
|
|
|
|
class TestProviderDiscovery:
|
|
"""Tests 1-7: get_available_providers, _load_ep_provider, get_providers_help_text."""
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_get_available_providers_merges_builtin_and_entrypoint(self, mock_ep):
|
|
"""Test 1: get_available_providers returns both built-in and entry point providers."""
|
|
mock_ep.return_value = [
|
|
_make_entry_point("fakeexternal", "pkg.provider:Cls", "prowler.providers"),
|
|
]
|
|
|
|
providers = Provider.get_available_providers()
|
|
|
|
# Built-in providers from actual prowler package
|
|
assert "aws" in providers
|
|
# External provider from entry point
|
|
assert "fakeexternal" in providers
|
|
assert "common" not in providers
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_get_available_providers_deduplicates(self, mock_ep):
|
|
"""Test 2: Same provider name in built-in and entry point appears once."""
|
|
# "aws" exists as built-in AND as entry point
|
|
mock_ep.return_value = [
|
|
_make_entry_point("aws", "pkg.provider:Cls", "prowler.providers"),
|
|
]
|
|
|
|
providers = Provider.get_available_providers()
|
|
|
|
assert providers.count("aws") == 1
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_load_ep_provider_loads_class(self, mock_ep):
|
|
"""Test 3: _load_ep_provider loads the class from entry point."""
|
|
mock_ep.return_value = [
|
|
_make_entry_point(
|
|
"fakeexternal", "pkg:FakeExternalProvider", "prowler.providers"
|
|
),
|
|
]
|
|
mock_ep.return_value[0].load.return_value = FakeExternalProvider
|
|
|
|
cls = Provider._load_ep_provider("fakeexternal")
|
|
|
|
assert cls is FakeExternalProvider
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_load_ep_provider_returns_none_for_unknown(self, mock_ep):
|
|
"""Test 4: _load_ep_provider returns None for unknown provider."""
|
|
mock_ep.return_value = []
|
|
|
|
cls = Provider._load_ep_provider("nonexistent")
|
|
|
|
assert cls is None
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_load_ep_provider_caches_result(self, mock_ep):
|
|
"""Test 5: _load_ep_provider caches the loaded class."""
|
|
mock_ep.return_value = [
|
|
_make_entry_point("fakeexternal", "pkg:Cls", "prowler.providers"),
|
|
]
|
|
mock_ep.return_value[0].load.return_value = FakeExternalProvider
|
|
|
|
cls1 = Provider._load_ep_provider("fakeexternal")
|
|
cls2 = Provider._load_ep_provider("fakeexternal")
|
|
|
|
assert cls1 is cls2
|
|
# load() should only be called once due to caching
|
|
mock_ep.return_value[0].load.assert_called_once()
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_load_ep_provider_caches_misses(self, mock_ep):
|
|
"""A miss (unknown provider) must also be cached so repeated lookups
|
|
do not re-iterate entry_points(). Aligns with tool_wrapper._ep_class_cache,
|
|
which already caches None on miss."""
|
|
mock_ep.return_value = []
|
|
|
|
first = Provider._load_ep_provider("nonexistent")
|
|
second = Provider._load_ep_provider("nonexistent")
|
|
|
|
assert first is None
|
|
assert second is None
|
|
# entry_points() should only be called once across the two lookups
|
|
mock_ep.assert_called_once()
|
|
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.Provider.get_available_providers")
|
|
def test_get_providers_help_text_reads_cli_help_text(
|
|
self, mock_providers, mock_load
|
|
):
|
|
"""Test 6: get_providers_help_text reads _cli_help_text from entry point provider."""
|
|
mock_providers.return_value = ["fakeexternal"]
|
|
mock_load.return_value = FakeExternalProvider
|
|
|
|
help_text = Provider.get_providers_help_text()
|
|
|
|
assert help_text["fakeexternal"] == "Fake External Provider"
|
|
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.Provider.get_available_providers")
|
|
def test_get_providers_help_text_empty_without_cli_help_text(
|
|
self, mock_providers, mock_load
|
|
):
|
|
"""Test 7: get_providers_help_text returns empty string without _cli_help_text."""
|
|
mock_providers.return_value = ["nohelptext"]
|
|
mock_load.return_value = FakeProviderNoHelpText
|
|
|
|
help_text = Provider.get_providers_help_text()
|
|
|
|
assert help_text["nohelptext"] == ""
|
|
|
|
|
|
class TestIsToolWrapperProvider:
|
|
"""Tests for Provider.is_tool_wrapper_provider — the helper that combines the
|
|
built-in EXTERNAL_TOOL_PROVIDERS frozenset with the is_external_tool_provider
|
|
class attribute of entry-point providers."""
|
|
|
|
def test_returns_true_for_builtin_tool_wrappers(self):
|
|
# iac/llm/image are in the EXTERNAL_TOOL_PROVIDERS frozenset (fast path)
|
|
assert Provider.is_tool_wrapper_provider("iac") is True
|
|
assert Provider.is_tool_wrapper_provider("llm") is True
|
|
assert Provider.is_tool_wrapper_provider("image") is True
|
|
|
|
def test_returns_false_for_regular_builtin_providers(self):
|
|
# Regular built-ins must not be classified as tool wrappers
|
|
assert Provider.is_tool_wrapper_provider("aws") is False
|
|
assert Provider.is_tool_wrapper_provider("gcp") is False
|
|
assert Provider.is_tool_wrapper_provider("github") is False
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_returns_true_for_external_provider_declaring_flag(self, mock_ep):
|
|
# External plugin explicitly declares is_external_tool_provider = True
|
|
mock_ep.return_value = [
|
|
_make_entry_point("faketoolwrapper", "pkg:Cls", "prowler.providers"),
|
|
]
|
|
mock_ep.return_value[0].load.return_value = FakeToolWrapperProvider
|
|
|
|
assert Provider.is_tool_wrapper_provider("faketoolwrapper") is True
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_returns_false_for_external_provider_without_flag(self, mock_ep):
|
|
# External plugin without the flag (default False) is treated as regular
|
|
mock_ep.return_value = [
|
|
_make_entry_point("fakeexternal", "pkg:Cls", "prowler.providers"),
|
|
]
|
|
mock_ep.return_value[0].load.return_value = FakeExternalProvider
|
|
|
|
assert Provider.is_tool_wrapper_provider("fakeexternal") is False
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_returns_false_for_unknown_provider(self, mock_ep):
|
|
mock_ep.return_value = []
|
|
|
|
assert Provider.is_tool_wrapper_provider("does-not-exist") is False
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_returns_false_for_none_provider(self, mock_ep):
|
|
# Pydantic validators may pass None when values.get("Provider") is missing
|
|
mock_ep.return_value = []
|
|
|
|
assert Provider.is_tool_wrapper_provider(None) is False
|
|
|
|
|
|
class TestIsBuiltinProvider:
|
|
"""Tests for Provider.is_builtin — the helper that discriminates built-in
|
|
providers from external ones before attempting the import, so transitive
|
|
dependency failures in built-ins don't get silently re-routed to entry points."""
|
|
|
|
def test_returns_true_for_builtin_provider(self):
|
|
assert Provider.is_builtin("aws") is True
|
|
assert Provider.is_builtin("github") is True
|
|
|
|
def test_returns_false_for_unknown_provider(self):
|
|
assert Provider.is_builtin("nonexistent_xyz") is False
|
|
|
|
@patch("prowler.providers.common.provider.importlib.util.find_spec")
|
|
def test_returns_false_when_find_spec_raises(self, mock_find_spec):
|
|
# Certain namespace package edge cases raise ValueError/ImportError —
|
|
# helper should swallow and return False rather than propagate.
|
|
mock_find_spec.side_effect = ValueError("namespace package edge case")
|
|
|
|
assert Provider.is_builtin("some_provider") is False
|
|
|
|
|
|
class TestInitProvidersParserBuiltinDependencyFailure:
|
|
"""Selective fail-loud: init captures failures silently, enforce emits
|
|
warning for non-invoked and exits for the invoked broken provider."""
|
|
|
|
@patch("sys.argv", ["prowler", "aws"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_builtin_with_missing_transitive_dep_fails_loudly(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
mock_import.side_effect = ImportError("No module named 'boto3'")
|
|
|
|
parser = MagicMock()
|
|
parser._providers = ["aws"]
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws"],
|
|
):
|
|
init_providers_parser(parser)
|
|
assert "aws" in parser._builtin_load_failures
|
|
with pytest.raises(SystemExit):
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.Provider._load_ep_provider")
|
|
def test_external_provider_does_not_touch_builtin_path(
|
|
self, mock_load_ep, mock_is_builtin
|
|
):
|
|
from prowler.providers.common.arguments import init_providers_parser
|
|
|
|
mock_is_builtin.return_value = False
|
|
ext_cls = MagicMock()
|
|
ext_cls.init_parser = MagicMock()
|
|
mock_load_ep.return_value = ext_cls
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["fakeexternal"],
|
|
):
|
|
init_providers_parser(parser)
|
|
|
|
ext_cls.init_parser.assert_called_once_with(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "aws"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_unrelated_builtin_failure_does_not_abort_when_other_provider_invoked(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""Broken stackit + invoked aws → warning, no abort."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
aws_module = MagicMock()
|
|
|
|
def import_side_effect(module_path):
|
|
if "stackit" in module_path:
|
|
raise ImportError("No module named 'stackit.objectstorage'")
|
|
return aws_module
|
|
|
|
mock_import.side_effect = import_side_effect
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws", "stackit"],
|
|
):
|
|
init_providers_parser(parser)
|
|
assert "stackit" in parser._builtin_load_failures
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
aws_module.init_parser.assert_called_once_with(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "-h"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_no_provider_invoked_failure_does_not_abort(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""`prowler -h` + broken built-in → warning, help still renders."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
mock_import.side_effect = ImportError("No module named 'stackit.objectstorage'")
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["stackit"],
|
|
):
|
|
init_providers_parser(parser)
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "microsoft365"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_invoked_microsoft365_alias_still_triggers_fail_loud(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""Alias `microsoft365 → m365` must be normalised before matching."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
mock_import.side_effect = ImportError("No module named 'msgraph'")
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["m365"],
|
|
):
|
|
init_providers_parser(parser)
|
|
with pytest.raises(SystemExit):
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "oci"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_invoked_oci_alias_still_triggers_fail_loud(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""Alias `oci → oraclecloud` must be normalised before matching."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
mock_import.side_effect = ImportError("No module named 'oci'")
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["oraclecloud"],
|
|
):
|
|
init_providers_parser(parser)
|
|
with pytest.raises(SystemExit):
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "--output-directory", "stackit"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_flag_value_matching_provider_name_not_treated_as_invoked(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""Flag-first invocation → invoked is 'aws' (default), not the flag's value."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
aws_module = MagicMock()
|
|
|
|
def import_side_effect(module_path):
|
|
if "stackit" in module_path:
|
|
raise ImportError("No module named 'stackit.objectstorage'")
|
|
return aws_module
|
|
|
|
mock_import.side_effect = import_side_effect
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws", "stackit"],
|
|
):
|
|
init_providers_parser(parser)
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
aws_module.init_parser.assert_called_once_with(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "aws"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_invoked_builtin_non_import_error_fails_loudly(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""Non-ImportError in invoked provider → still fail-loud."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
mock_import.side_effect = RuntimeError("Unexpected error in aws init_parser")
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws"],
|
|
):
|
|
init_providers_parser(parser)
|
|
with pytest.raises(SystemExit):
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
@patch("sys.argv", ["prowler", "aws"])
|
|
@patch("prowler.providers.common.arguments.Provider.is_builtin")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_unrelated_builtin_non_import_error_does_not_abort(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
"""Non-ImportError in unrelated provider → warning, no abort."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
mock_is_builtin.return_value = True
|
|
aws_module = MagicMock()
|
|
|
|
def import_side_effect(module_path):
|
|
if "stackit" in module_path:
|
|
raise RuntimeError("Unexpected error in stackit init_parser")
|
|
return aws_module
|
|
|
|
mock_import.side_effect = import_side_effect
|
|
|
|
parser = MagicMock()
|
|
|
|
with patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws", "stackit"],
|
|
):
|
|
init_providers_parser(parser)
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
aws_module.init_parser.assert_called_once_with(parser)
|
|
|
|
|
|
class TestParseArgsOverrideAlignment:
|
|
"""Regression: `parse(args=...)` overrides sys.argv AFTER __init__ ran;
|
|
the selective fail-loud must read argv at enforce time, not init time."""
|
|
|
|
def test_enforce_reads_current_sys_argv_not_init_time_sys_argv(self):
|
|
"""Init with argv=['prowler','-h'] (no provider) captures stackit
|
|
failure silently. Enforce with argv=['prowler','stackit'] must
|
|
fail-loud — proving alignment under parse(args=...)."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
def import_side_effect(path):
|
|
if "stackit" in path:
|
|
raise ImportError("No module named 'stackit.objectstorage'")
|
|
return MagicMock()
|
|
|
|
parser = MagicMock()
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.common.arguments.Provider.is_builtin",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws", "stackit"],
|
|
),
|
|
patch(
|
|
"prowler.providers.common.arguments.import_module",
|
|
side_effect=import_side_effect,
|
|
),
|
|
):
|
|
# Phase 1: __init__ with ambient argv = ['prowler', '-h']
|
|
with patch("sys.argv", ["prowler", "-h"]):
|
|
init_providers_parser(parser)
|
|
# Failure captured silently — no SystemExit during init
|
|
assert "stackit" in parser._builtin_load_failures
|
|
|
|
# Phase 2: parse(args=...) overrode sys.argv → stackit invoked
|
|
with patch("sys.argv", ["prowler", "stackit"]):
|
|
with pytest.raises(SystemExit):
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
def test_enforce_reads_current_sys_argv_for_no_invocation(self):
|
|
"""Inverse: init's argv invokes stackit, but parse(args=['prowler',
|
|
'-h']) overrides. Enforce must NOT fail-loud."""
|
|
from prowler.providers.common.arguments import (
|
|
enforce_invoked_provider_loaded,
|
|
init_providers_parser,
|
|
)
|
|
|
|
def import_side_effect(path):
|
|
if "stackit" in path:
|
|
raise ImportError("No module named 'stackit.objectstorage'")
|
|
return MagicMock()
|
|
|
|
parser = MagicMock()
|
|
|
|
with (
|
|
patch(
|
|
"prowler.providers.common.arguments.Provider.is_builtin",
|
|
return_value=True,
|
|
),
|
|
patch(
|
|
"prowler.providers.common.arguments.Provider.get_available_providers",
|
|
return_value=["aws", "stackit"],
|
|
),
|
|
patch(
|
|
"prowler.providers.common.arguments.import_module",
|
|
side_effect=import_side_effect,
|
|
),
|
|
):
|
|
# Phase 1: __init__ with ambient argv pretending stackit invoked
|
|
with patch("sys.argv", ["prowler", "stackit"]):
|
|
init_providers_parser(parser)
|
|
assert "stackit" in parser._builtin_load_failures
|
|
|
|
# Phase 2: parse(args=['prowler', '-h']) overrode sys.argv →
|
|
# no provider invoked anymore → enforce must NOT exit
|
|
with patch("sys.argv", ["prowler", "-h"]):
|
|
enforce_invoked_provider_loaded(parser)
|
|
|
|
|
|
class TestInitGlobalProviderBuiltinDependencyFailure:
|
|
"""Same contract as TestInitProvidersParserBuiltinDependencyFailure but
|
|
for the provider class import path in init_global_provider."""
|
|
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
def test_builtin_with_missing_transitive_dep_fails_loudly(
|
|
self, mock_import, mock_is_builtin
|
|
):
|
|
mock_is_builtin.return_value = True
|
|
mock_import.side_effect = ImportError("No module named 'boto3'")
|
|
|
|
args = Namespace(
|
|
provider="aws",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
)
|
|
|
|
Provider._global = None
|
|
with pytest.raises(SystemExit):
|
|
Provider.init_global_provider(args)
|
|
Provider._global = None
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
def test_load_ep_provider_handles_load_exception(self, mock_ep):
|
|
"""_load_ep_provider returns None when ep.load() raises."""
|
|
ep = _make_entry_point("broken", "pkg:Cls", "prowler.providers")
|
|
ep.load.side_effect = Exception("Import failed")
|
|
mock_ep.return_value = [ep]
|
|
|
|
cls = Provider._load_ep_provider("broken")
|
|
|
|
assert cls is None
|
|
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
@patch("prowler.providers.common.provider.Provider.get_available_providers")
|
|
def test_get_providers_help_text_builtin_path(
|
|
self, mock_providers, mock_is_builtin, mock_import
|
|
):
|
|
"""get_providers_help_text reads _cli_help_text from a built-in provider module."""
|
|
import types
|
|
|
|
mock_providers.return_value = ["fakebuiltin"]
|
|
mock_is_builtin.return_value = True
|
|
|
|
mock_cls = type(
|
|
"FakebuiltinProvider", (Provider,), {"_cli_help_text": "Built-in Help"}
|
|
)
|
|
mock_module = types.ModuleType("fake_module")
|
|
mock_module.FakebuiltinProvider = mock_cls
|
|
mock_import.return_value = mock_module
|
|
|
|
help_text = Provider.get_providers_help_text()
|
|
|
|
assert help_text["fakebuiltin"] == "Built-in Help"
|
|
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.get_available_providers")
|
|
def test_get_providers_help_text_generic_exception(
|
|
self, mock_providers, mock_import
|
|
):
|
|
"""get_providers_help_text handles generic exceptions with empty string."""
|
|
mock_providers.return_value = ["broken"]
|
|
mock_import.side_effect = RuntimeError("Unexpected error")
|
|
|
|
help_text = Provider.get_providers_help_text()
|
|
|
|
assert help_text["broken"] == ""
|
|
|
|
|
|
# ===========================================================================
|
|
# 2. Provider Initialization
|
|
# ===========================================================================
|
|
|
|
|
|
class TestProviderInitialization:
|
|
"""Tests 8-9: init_global_provider fallback to entry point."""
|
|
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
def test_init_global_provider_fallback_to_entry_point(
|
|
self, mock_import, mock_load_ep, mock_config
|
|
):
|
|
"""Test 8: init_global_provider falls back to entry point when built-in fails."""
|
|
mock_import.side_effect = ImportError("No built-in")
|
|
mock_load_ep.return_value = FakeExternalProvider
|
|
mock_config.return_value = {}
|
|
|
|
args = Namespace(
|
|
provider="fakeexternal",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
)
|
|
|
|
Provider._global = None
|
|
Provider.init_global_provider(args)
|
|
|
|
assert isinstance(Provider._global, FakeExternalProvider)
|
|
Provider._global = None
|
|
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
def test_init_global_provider_exits_for_unknown_provider(
|
|
self, mock_import, mock_load_ep, mock_config
|
|
):
|
|
"""Test 9: init_global_provider exits when provider not found anywhere."""
|
|
mock_import.side_effect = ImportError("No built-in")
|
|
mock_load_ep.return_value = None
|
|
mock_config.return_value = {}
|
|
|
|
args = Namespace(
|
|
provider="nonexistent",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
)
|
|
|
|
with pytest.raises(SystemExit):
|
|
Provider.init_global_provider(args)
|
|
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
def test_init_global_provider_wires_instance_returned_by_from_cli_args(
|
|
self, mock_import, mock_load_ep, mock_config
|
|
):
|
|
"""A provider that implements from_cli_args as a pure function (returns
|
|
the instance without calling set_global_provider from __init__) is
|
|
correctly wired as the global provider by init_global_provider."""
|
|
mock_import.side_effect = ImportError("No built-in")
|
|
mock_load_ep.return_value = FakePureContractProvider
|
|
mock_config.return_value = {}
|
|
|
|
args = Namespace(
|
|
provider="fakepure",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
)
|
|
|
|
Provider._global = None
|
|
Provider.init_global_provider(args)
|
|
|
|
assert isinstance(Provider._global, FakePureContractProvider)
|
|
Provider._global = None
|
|
|
|
@pytest.mark.parametrize(
|
|
"plugin_name",
|
|
[
|
|
"awsx",
|
|
"aws_lite",
|
|
"azure_gov",
|
|
"gcp_org",
|
|
"github_enterprise",
|
|
"iac_v2",
|
|
],
|
|
)
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
def test_init_global_provider_external_with_builtin_substring_uses_from_cli_args(
|
|
self, mock_import, mock_load_ep, mock_config, plugin_name
|
|
):
|
|
"""Regression guard for the substring footgun in the dispatch chain.
|
|
|
|
An external plug-in whose name contains a built-in substring
|
|
(e.g. `awsx`, `aws_lite`, `azure_gov`, `gcp_org`, `github_enterprise`,
|
|
`iac_v2`) MUST be routed to the dynamic else and instantiated via
|
|
`from_cli_args` — not silently captured by the built-in branch whose
|
|
name happens to be a substring of the plug-in name. See PR #10700
|
|
review.
|
|
"""
|
|
mock_import.side_effect = ImportError("No built-in")
|
|
mock_load_ep.return_value = FakeExternalProvider
|
|
mock_config.return_value = {}
|
|
|
|
# Namespace deliberately omits the kwargs of any built-in branch
|
|
# (no `aws_retries_max_attempts`, `az_cli_auth`, `personal_access_token`,
|
|
# etc.). If equality dispatch is broken and the plug-in is misrouted to
|
|
# a built-in branch, attribute access will raise and the global never
|
|
# gets wired.
|
|
args = Namespace(
|
|
provider=plugin_name,
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
)
|
|
|
|
Provider._global = None
|
|
Provider.init_global_provider(args)
|
|
|
|
assert isinstance(Provider._global, FakeExternalProvider)
|
|
Provider._global = None
|
|
|
|
@patch("prowler.providers.common.provider.logger")
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_init_global_provider_warns_when_plugin_shadowed_by_builtin(
|
|
self, mock_is_builtin, mock_import, mock_entry_points, mock_config, mock_logger
|
|
):
|
|
"""Regression guard: when a plug-in registers a provider name that
|
|
collides with a built-in, the BUILT-IN wins and a warning is emitted
|
|
naming the shadowed plug-in. Shadow detection matches by entry-point
|
|
name only — the plug-in is never `ep.load()`-ed just to warn, so its
|
|
module code cannot run during a built-in run. See PR #10700 review
|
|
(HugoPBrito, Alan-TheGentleman).
|
|
"""
|
|
# Simulate a built-in `aws` that exists, AND a plug-in registered
|
|
# under the same `aws` name via entry points.
|
|
mock_is_builtin.return_value = True
|
|
shadow_ep = MagicMock()
|
|
shadow_ep.name = "aws" # plug-in shadowing the built-in name
|
|
mock_entry_points.return_value = [shadow_ep]
|
|
mock_import.return_value = MagicMock(
|
|
AwsProvider=MagicMock(side_effect=lambda **_kw: None)
|
|
)
|
|
mock_config.return_value = {}
|
|
|
|
args = Namespace(
|
|
provider="aws",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
aws_retries_max_attempts=3,
|
|
role=None,
|
|
session_duration=None,
|
|
external_id=None,
|
|
role_session_name=None,
|
|
mfa=None,
|
|
profile=None,
|
|
region=None,
|
|
excluded_region=None,
|
|
organizations_role=None,
|
|
scan_unused_services=False,
|
|
resource_tag=None,
|
|
resource_arn=None,
|
|
mutelist_file=None,
|
|
)
|
|
|
|
Provider._global = None
|
|
try:
|
|
Provider.init_global_provider(args)
|
|
except BaseException:
|
|
# The AwsProvider mock is fake and the dispatch may sys.exit on
|
|
# the simulated failure; we only care about the warning emitted
|
|
# before the dispatch happens.
|
|
pass
|
|
finally:
|
|
Provider._global = None
|
|
|
|
# Warning was emitted naming the shadowed plug-in
|
|
warning_msgs = [
|
|
call.args[0]
|
|
for call in mock_logger.warning.call_args_list
|
|
if call.args and "Plug-in provider 'aws'" in call.args[0]
|
|
]
|
|
assert warning_msgs, "expected a warning about the shadowed plug-in 'aws'"
|
|
assert "IGNORED" in warning_msgs[0]
|
|
# Shadow detected by name only — plug-in code never executed to warn
|
|
shadow_ep.load.assert_not_called()
|
|
|
|
|
|
# ===========================================================================
|
|
# 3. Check Discovery
|
|
# ===========================================================================
|
|
|
|
|
|
class TestCheckDiscovery:
|
|
"""Tests 10-14: _recover_ep_checks, recover_checks_from_provider."""
|
|
|
|
@patch("prowler.lib.check.utils.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_ep_checks_discovers_checks(self, mock_spec, mock_ep):
|
|
"""Test 10: _recover_ep_checks discovers checks from entry points."""
|
|
from prowler.lib.check.utils import _recover_ep_checks
|
|
|
|
mock_ep.return_value = [
|
|
_make_entry_point("my_check", "pkg.checks.my_check", "prowler.checks.fake"),
|
|
]
|
|
mock_spec_obj = MagicMock()
|
|
mock_spec_obj.origin = "/path/to/pkg/checks/my_check.py"
|
|
mock_spec.return_value = mock_spec_obj
|
|
|
|
checks = _recover_ep_checks("fake")
|
|
|
|
assert len(checks) == 1
|
|
assert checks[0][0] == "my_check"
|
|
assert checks[0][1] == "/path/to/pkg/checks"
|
|
|
|
@patch("prowler.lib.check.utils.importlib.metadata.entry_points")
|
|
def test_recover_ep_checks_empty_without_entry_points(self, mock_ep):
|
|
"""Test 11: _recover_ep_checks returns empty list with no entry points."""
|
|
from prowler.lib.check.utils import _recover_ep_checks
|
|
|
|
mock_ep.return_value = []
|
|
|
|
checks = _recover_ep_checks("fake")
|
|
|
|
assert checks == []
|
|
|
|
@patch("prowler.lib.check.utils.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_ep_checks_handles_broken_entry_point(self, mock_spec, mock_ep):
|
|
"""Test 12: _recover_ep_checks handles failed entry points gracefully."""
|
|
from prowler.lib.check.utils import _recover_ep_checks
|
|
|
|
mock_ep.return_value = [
|
|
_make_entry_point("broken_check", "pkg.broken", "prowler.checks.fake"),
|
|
]
|
|
mock_spec.side_effect = Exception("Module not found")
|
|
|
|
checks = _recover_ep_checks("fake")
|
|
|
|
assert checks == []
|
|
|
|
@patch("prowler.lib.check.utils._recover_ep_checks")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_checks_handles_external_provider_without_services(
|
|
self, mock_find_spec, mock_ep_checks
|
|
):
|
|
"""Test 13: recover_checks_from_provider doesn't crash for external providers.
|
|
|
|
With find_spec returning None (built-in package doesn't exist), discovery
|
|
falls through to entry points cleanly — no ModuleNotFoundError catch
|
|
needed.
|
|
"""
|
|
from prowler.lib.check.utils import recover_checks_from_provider
|
|
|
|
mock_find_spec.return_value = None # not a built-in
|
|
mock_ep_checks.return_value = [("ext_check", "/path/to/check")]
|
|
|
|
checks = recover_checks_from_provider("fakeexternal")
|
|
|
|
assert len(checks) == 1
|
|
assert checks[0][0] == "ext_check"
|
|
|
|
@patch("prowler.lib.check.utils._recover_ep_checks")
|
|
@patch("prowler.lib.check.utils.list_modules")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_checks_combines_builtin_and_entry_points(
|
|
self, mock_find_spec, mock_list_modules, mock_ep_checks
|
|
):
|
|
"""Test 14: recover_checks_from_provider combines built-in and entry point checks."""
|
|
from prowler.lib.check.utils import recover_checks_from_provider
|
|
|
|
mock_find_spec.return_value = MagicMock() # built-in package exists
|
|
|
|
# Simulate a built-in module
|
|
builtin_module = MagicMock()
|
|
builtin_module.name = "prowler.providers.aws.services.ec2.check_a.check_a"
|
|
builtin_module.module_finder.path = "/builtin/path"
|
|
mock_list_modules.return_value = [builtin_module]
|
|
|
|
mock_ep_checks.return_value = [("check_b", "/external/path")]
|
|
|
|
checks = recover_checks_from_provider("aws")
|
|
|
|
check_names = [c[0] for c in checks]
|
|
assert "check_a" in check_names
|
|
assert "check_b" in check_names
|
|
|
|
@patch("prowler.lib.check.utils.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_ep_checks_filters_by_service(self, mock_spec, mock_ep):
|
|
"""Service filter keeps only entry points whose dotted path includes
|
|
`.services.{service}.` — mirroring the built-in package filter."""
|
|
from prowler.lib.check.utils import _recover_ep_checks
|
|
|
|
mock_ep.return_value = [
|
|
_make_entry_point(
|
|
"container_has_no_root_user",
|
|
"prowler_artifacts_dockerdesktop.services.container.container_has_no_root_user.container_has_no_root_user",
|
|
"prowler.checks.dockerdesktop",
|
|
),
|
|
_make_entry_point(
|
|
"image_is_signed",
|
|
"prowler_artifacts_dockerdesktop.services.image.image_is_signed.image_is_signed",
|
|
"prowler.checks.dockerdesktop",
|
|
),
|
|
]
|
|
mock_spec_obj = MagicMock()
|
|
mock_spec_obj.origin = "/some/path/check.py"
|
|
mock_spec.return_value = mock_spec_obj
|
|
|
|
checks = _recover_ep_checks("dockerdesktop", service="container")
|
|
|
|
assert len(checks) == 1
|
|
assert checks[0][0] == "container_has_no_root_user"
|
|
|
|
@patch("prowler.lib.check.utils.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_ep_checks_without_service_returns_all(self, mock_spec, mock_ep):
|
|
"""Without a service filter, all entry points for the provider are returned."""
|
|
from prowler.lib.check.utils import _recover_ep_checks
|
|
|
|
mock_ep.return_value = [
|
|
_make_entry_point(
|
|
"container_has_no_root_user",
|
|
"prowler_artifacts_dockerdesktop.services.container.container_has_no_root_user.container_has_no_root_user",
|
|
"prowler.checks.dockerdesktop",
|
|
),
|
|
_make_entry_point(
|
|
"image_is_signed",
|
|
"prowler_artifacts_dockerdesktop.services.image.image_is_signed.image_is_signed",
|
|
"prowler.checks.dockerdesktop",
|
|
),
|
|
]
|
|
mock_spec_obj = MagicMock()
|
|
mock_spec_obj.origin = "/some/path/check.py"
|
|
mock_spec.return_value = mock_spec_obj
|
|
|
|
checks = _recover_ep_checks("dockerdesktop")
|
|
|
|
assert len(checks) == 2
|
|
|
|
@patch("prowler.lib.check.utils._recover_ep_checks")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_checks_external_provider_with_service(
|
|
self, mock_find_spec, mock_ep_checks
|
|
):
|
|
"""External provider with --service: built-in package doesn't exist,
|
|
but entry points are still consulted and return the requested service's
|
|
checks. No premature sys.exit."""
|
|
from prowler.lib.check.utils import recover_checks_from_provider
|
|
|
|
mock_find_spec.return_value = None # not a built-in
|
|
mock_ep_checks.return_value = [("container_check", "/ext/path")]
|
|
|
|
checks = recover_checks_from_provider("dockerdesktop", service="container")
|
|
|
|
assert len(checks) == 1
|
|
assert checks[0][0] == "container_check"
|
|
mock_ep_checks.assert_called_once_with("dockerdesktop", "container")
|
|
|
|
@patch("prowler.lib.check.utils._recover_ep_checks")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_checks_unknown_service_fails_cleanly(
|
|
self, mock_find_spec, mock_ep_checks
|
|
):
|
|
"""A typo or unknown service (not in built-ins nor in entry points)
|
|
fails with a clear error message, not a silent empty result."""
|
|
from prowler.lib.check.utils import recover_checks_from_provider
|
|
|
|
mock_find_spec.return_value = None
|
|
mock_ep_checks.return_value = []
|
|
|
|
with pytest.raises(SystemExit):
|
|
recover_checks_from_provider("aws", service="typo_service")
|
|
|
|
@patch("prowler.lib.check.utils._recover_ep_checks")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_checks_builtin_with_new_external_service(
|
|
self, mock_find_spec, mock_ep_checks
|
|
):
|
|
"""Built-in provider with a new service added via entry points:
|
|
the built-in package for that specific service doesn't exist (find_spec
|
|
returns None), but entry points pick it up. The gate `if not service:`
|
|
that previously skipped entry points when --service was passed is removed."""
|
|
from prowler.lib.check.utils import recover_checks_from_provider
|
|
|
|
mock_find_spec.return_value = None # built-in for new_aws_service doesn't exist
|
|
mock_ep_checks.return_value = [("new_check", "/ext/path")]
|
|
|
|
checks = recover_checks_from_provider("aws", service="new_aws_service")
|
|
|
|
assert len(checks) == 1
|
|
assert checks[0][0] == "new_check"
|
|
mock_ep_checks.assert_called_once_with("aws", "new_aws_service")
|
|
|
|
@patch("prowler.lib.check.utils._recover_ep_checks")
|
|
@patch("prowler.lib.check.utils.list_modules")
|
|
@patch("prowler.lib.check.utils.importlib.util.find_spec")
|
|
def test_recover_checks_surfaces_error_when_builtin_service_import_fails(
|
|
self, mock_find_spec, mock_list_modules, mock_ep_checks
|
|
):
|
|
"""Regression guard: when a built-in service's package exists but one
|
|
of its modules fails to import (e.g. a broken transitive dependency),
|
|
the error must surface via the global exception handler — not be
|
|
silently swallowed and replaced by an entry-point plug-in that happens
|
|
to share a name. See PR #10700 review (HugoPBrito)."""
|
|
from prowler.lib.check.utils import recover_checks_from_provider
|
|
|
|
mock_find_spec.return_value = MagicMock() # built-in service exists
|
|
mock_list_modules.side_effect = ImportError("missing transitive dep: foo")
|
|
|
|
# Even if a plug-in registers checks for the same service, it must NOT
|
|
# silently take over — the import error wins.
|
|
mock_ep_checks.return_value = [("evil_check", "/evil/path")]
|
|
|
|
with pytest.raises(SystemExit):
|
|
recover_checks_from_provider("aws", service="ec2")
|
|
|
|
|
|
# ===========================================================================
|
|
# 4. Check Execution
|
|
# ===========================================================================
|
|
|
|
|
|
class TestCheckExecution:
|
|
"""Tests 15-17: _resolve_check_module."""
|
|
|
|
@patch("prowler.lib.check.check.importlib.util.find_spec")
|
|
@patch("prowler.lib.check.check.import_check")
|
|
def test_resolve_check_module_builtin_first(self, mock_import, mock_find_spec):
|
|
"""Test 15: _resolve_check_module resolves built-in checks first."""
|
|
from prowler.lib.check.check import _resolve_check_module
|
|
|
|
mock_module = MagicMock()
|
|
mock_import.return_value = mock_module
|
|
mock_find_spec.return_value = MagicMock() # built-in package exists
|
|
|
|
result = _resolve_check_module("aws", "ec2", "my_check")
|
|
|
|
assert result is mock_module
|
|
mock_import.assert_called_once_with(
|
|
"prowler.providers.aws.services.ec2.my_check.my_check"
|
|
)
|
|
|
|
@patch("prowler.lib.check.check.importlib.util.find_spec")
|
|
@patch("prowler.lib.check.check.import_check")
|
|
def test_resolve_check_module_fallback_to_entry_point(
|
|
self, mock_import_check, mock_find_spec
|
|
):
|
|
"""Test 16: _resolve_check_module falls back to entry point when built-in is absent."""
|
|
from prowler.lib.check.check import _resolve_check_module
|
|
|
|
mock_find_spec.return_value = None # built-in does not exist
|
|
|
|
mock_ext_module = MagicMock()
|
|
ep = _make_entry_point(
|
|
"my_check", "ext_pkg.checks.my_check", "prowler.checks.fake"
|
|
)
|
|
|
|
with (
|
|
patch("importlib.metadata.entry_points", return_value=[ep]),
|
|
patch("importlib.import_module", return_value=mock_ext_module) as mock_imp,
|
|
):
|
|
result = _resolve_check_module("fake", "svc", "my_check")
|
|
|
|
assert result is mock_ext_module
|
|
mock_imp.assert_called_with("ext_pkg.checks.my_check")
|
|
mock_import_check.assert_not_called()
|
|
|
|
@patch("prowler.lib.check.check.importlib.util.find_spec")
|
|
@patch("prowler.lib.check.check.import_check")
|
|
def test_resolve_check_module_builtin_wins_over_entry_point(
|
|
self, mock_import_check, mock_find_spec
|
|
):
|
|
"""Regression guard: when both a built-in and an entry-point check
|
|
exist with the same CheckID, the BUILT-IN wins. Plug-ins extend
|
|
Prowler with new checks but cannot silently override existing
|
|
built-ins — a security tool prefers fail-loud predictability over
|
|
permissive overrides. CheckMetadata.get_bulk applies the same
|
|
precedence (first-write-wins) and emits a warning. See PR #10700
|
|
review (HugoPBrito)."""
|
|
from prowler.lib.check.check import _resolve_check_module
|
|
|
|
mock_find_spec.return_value = MagicMock() # built-in exists
|
|
builtin_module = MagicMock()
|
|
mock_import_check.return_value = builtin_module
|
|
|
|
# Plug-in registers same CheckID — must NOT take precedence
|
|
ep = _make_entry_point(
|
|
"ec2_instance_public_ip",
|
|
"plug_pkg.checks.ec2_instance_public_ip",
|
|
"prowler.checks.aws",
|
|
)
|
|
|
|
with (
|
|
patch("importlib.metadata.entry_points", return_value=[ep]),
|
|
patch("importlib.import_module") as mock_imp,
|
|
):
|
|
result = _resolve_check_module("aws", "ec2", "ec2_instance_public_ip")
|
|
|
|
assert result is builtin_module
|
|
mock_import_check.assert_called_once_with(
|
|
"prowler.providers.aws.services.ec2.ec2_instance_public_ip.ec2_instance_public_ip"
|
|
)
|
|
# Plug-in must NOT be loaded when a built-in with the same CheckID exists
|
|
mock_imp.assert_not_called()
|
|
|
|
@patch("prowler.lib.check.check.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.check.importlib.util.find_spec")
|
|
def test_resolve_check_module_raises_when_not_found(self, mock_find_spec, mock_ep):
|
|
"""Test 17: _resolve_check_module raises ModuleNotFoundError when both fail."""
|
|
from prowler.lib.check.check import _resolve_check_module
|
|
|
|
mock_find_spec.return_value = None
|
|
mock_ep.return_value = []
|
|
|
|
with pytest.raises(ModuleNotFoundError, match="not found"):
|
|
_resolve_check_module("fake", "svc", "nonexistent_check")
|
|
|
|
@patch("prowler.lib.check.check.importlib.util.find_spec")
|
|
@patch("prowler.lib.check.check.import_check")
|
|
def test_resolve_check_module_surfaces_error_when_builtin_import_fails(
|
|
self, mock_import_check, mock_find_spec
|
|
):
|
|
"""Regression guard: when no plug-in entry-point overrides the
|
|
check, a built-in whose module exists but fails to import (e.g.
|
|
broken transitive dependency) MUST surface the real error instead
|
|
of being silently treated as 'not found'. See PR #10700 review
|
|
(HugoPBrito)."""
|
|
from prowler.lib.check.check import _resolve_check_module
|
|
|
|
mock_find_spec.return_value = MagicMock() # built-in module exists
|
|
mock_import_check.side_effect = ImportError("missing transitive dep: foo")
|
|
|
|
# No plug-in override — the built-in's import failure must propagate
|
|
with patch("importlib.metadata.entry_points", return_value=[]):
|
|
with pytest.raises(ImportError, match="missing transitive dep"):
|
|
_resolve_check_module("aws", "ec2", "ec2_instance_public_ip")
|
|
|
|
|
|
# ===========================================================================
|
|
# 5. CLI Arguments
|
|
# ===========================================================================
|
|
|
|
|
|
class TestCLIArguments:
|
|
"""Tests 18-19: init_providers_parser fallback."""
|
|
|
|
@patch("prowler.providers.common.arguments.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.arguments.Provider.get_available_providers")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_init_providers_parser_fallback_to_init_parser(
|
|
self, mock_import, mock_providers, mock_load_ep
|
|
):
|
|
"""Test 18: init_providers_parser falls back to cls.init_parser for external providers."""
|
|
from prowler.providers.common.arguments import init_providers_parser
|
|
|
|
mock_providers.return_value = ["fakeexternal"]
|
|
mock_import.side_effect = ImportError("No built-in arguments module")
|
|
mock_load_ep.return_value = FakeExternalProvider
|
|
|
|
parser_instance = MagicMock()
|
|
|
|
# Should not raise
|
|
init_providers_parser(parser_instance)
|
|
|
|
@patch("prowler.providers.common.arguments.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.arguments.Provider.get_available_providers")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_init_providers_parser_no_crash_without_init_parser(
|
|
self, mock_import, mock_providers, mock_load_ep
|
|
):
|
|
"""Test 19: init_providers_parser doesn't crash if provider has no init_parser."""
|
|
from prowler.providers.common.arguments import init_providers_parser
|
|
|
|
mock_providers.return_value = ["nohelptext"]
|
|
mock_import.side_effect = ImportError("No built-in")
|
|
# FakeProviderNoHelpText has no init_parser
|
|
mock_load_ep.return_value = FakeProviderNoHelpText
|
|
|
|
parser_instance = MagicMock()
|
|
|
|
# Should not raise
|
|
init_providers_parser(parser_instance)
|
|
|
|
@patch("prowler.providers.common.arguments.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.arguments.Provider.get_available_providers")
|
|
@patch("prowler.providers.common.arguments.import_module")
|
|
def test_init_providers_parser_handles_init_parser_exception(
|
|
self, mock_import, mock_providers, mock_load_ep
|
|
):
|
|
"""init_providers_parser handles exception when init_parser raises."""
|
|
from prowler.providers.common.arguments import init_providers_parser
|
|
|
|
mock_providers.return_value = ["fakeexternal"]
|
|
mock_import.side_effect = ImportError("No built-in")
|
|
|
|
broken_cls = MagicMock()
|
|
broken_cls.init_parser.side_effect = RuntimeError("Parser init failed")
|
|
mock_load_ep.return_value = broken_cls
|
|
|
|
parser_instance = MagicMock()
|
|
|
|
# Should not raise
|
|
init_providers_parser(parser_instance)
|
|
|
|
|
|
# ===========================================================================
|
|
# 6. Compliance
|
|
# ===========================================================================
|
|
|
|
|
|
class TestCompliance:
|
|
"""Tests 20-23: compliance discovery and loading."""
|
|
|
|
@patch("prowler.config.config.importlib.metadata.entry_points")
|
|
def test_get_ep_compliance_dirs_discovers_dirs(self, mock_ep):
|
|
"""Test 20: _get_ep_compliance_dirs discovers compliance directories."""
|
|
from prowler.config.config import _get_ep_compliance_dirs
|
|
|
|
mock_module = MagicMock()
|
|
mock_module.__path__ = ["/path/to/compliance"]
|
|
ep = _make_entry_point("fakeexternal", "pkg.compliance", "prowler.compliance")
|
|
ep.load.return_value = mock_module
|
|
mock_ep.return_value = [ep]
|
|
|
|
dirs = _get_ep_compliance_dirs()
|
|
|
|
assert dirs["fakeexternal"] == "/path/to/compliance"
|
|
|
|
@patch("prowler.config.config.importlib.metadata.entry_points")
|
|
def test_get_ep_compliance_dirs_file_fallback(self, mock_ep):
|
|
"""_get_ep_compliance_dirs uses __file__ when module has no __path__."""
|
|
from prowler.config.config import _get_ep_compliance_dirs
|
|
|
|
mock_module = MagicMock(spec=[])
|
|
mock_module.__file__ = "/path/to/compliance/__init__.py"
|
|
del mock_module.__path__
|
|
ep = _make_entry_point("ext", "pkg.compliance", "prowler.compliance")
|
|
ep.load.return_value = mock_module
|
|
mock_ep.return_value = [ep]
|
|
|
|
dirs = _get_ep_compliance_dirs()
|
|
|
|
assert dirs["ext"] == "/path/to/compliance"
|
|
|
|
@patch("prowler.config.config.importlib.metadata.entry_points")
|
|
def test_get_ep_compliance_dirs_handles_load_exception(self, mock_ep):
|
|
"""_get_ep_compliance_dirs handles ep.load() exception gracefully."""
|
|
from prowler.config.config import _get_ep_compliance_dirs
|
|
|
|
ep = _make_entry_point("broken", "pkg.compliance", "prowler.compliance")
|
|
ep.load.side_effect = Exception("Load failed")
|
|
mock_ep.return_value = [ep]
|
|
|
|
dirs = _get_ep_compliance_dirs()
|
|
|
|
assert dirs == {}
|
|
|
|
@patch("prowler.config.config._get_ep_compliance_dirs")
|
|
def test_get_available_compliance_includes_external(self, mock_dirs):
|
|
"""Test 21: get_available_compliance_frameworks includes external compliance."""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.config.config import get_available_compliance_frameworks
|
|
|
|
# Create a temp dir with a compliance JSON
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
json_path = os.path.join(tmpdir, "custom_1.0_ext.json")
|
|
with open(json_path, "w") as f:
|
|
json.dump({"Framework": "Custom", "Provider": "ext"}, f)
|
|
|
|
mock_dirs.return_value = {"ext": tmpdir}
|
|
|
|
frameworks = get_available_compliance_frameworks("ext")
|
|
|
|
assert "custom_1.0_ext" in frameworks
|
|
|
|
@patch("prowler.config.config.importlib.metadata.entry_points")
|
|
def test_get_available_compliance_includes_external_universal(self, mock_ep):
|
|
"""External universal frameworks under prowler.compliance.universal are
|
|
listed, for a provider and for the provider=None case that feeds
|
|
--compliance choices."""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.config.config import get_available_compliance_frameworks
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
framework = {
|
|
"framework": "CustomUniversal",
|
|
"name": "Custom Universal",
|
|
"version": "1.0",
|
|
"description": "Multi-provider",
|
|
"requirements": [
|
|
{
|
|
"id": "1",
|
|
"name": "r",
|
|
"description": "d",
|
|
"checks": {"aws": ["c"]},
|
|
}
|
|
],
|
|
}
|
|
with open(os.path.join(tmpdir, "customuniversal_1.0.json"), "w") as f:
|
|
json.dump(framework, f)
|
|
|
|
module = MagicMock()
|
|
module.__path__ = [tmpdir]
|
|
ep = _make_entry_point(
|
|
"anyname", "pkg.compliance", "prowler.compliance.universal"
|
|
)
|
|
ep.load.return_value = module
|
|
mock_ep.side_effect = lambda group: (
|
|
[ep] if group == "prowler.compliance.universal" else []
|
|
)
|
|
|
|
assert "customuniversal_1.0" in get_available_compliance_frameworks("aws")
|
|
assert "customuniversal_1.0" in get_available_compliance_frameworks(None)
|
|
|
|
@patch("prowler.lib.check.compliance_models.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.compliance_models.list_compliance_modules")
|
|
def test_compliance_get_bulk_loads_external(self, mock_list_modules, mock_ep):
|
|
"""Test 22: Compliance.get_bulk loads external compliance JSON."""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.lib.check.compliance_models import Compliance
|
|
|
|
mock_list_modules.return_value = []
|
|
|
|
# Create a valid compliance JSON
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
json_data = {
|
|
"Framework": "Custom",
|
|
"Name": "Custom Framework",
|
|
"Version": "1.0",
|
|
"Provider": "fakeexternal",
|
|
"Description": "Test framework",
|
|
"Requirements": [],
|
|
}
|
|
json_path = os.path.join(tmpdir, "custom_1.0_fakeexternal.json")
|
|
with open(json_path, "w") as f:
|
|
json.dump(json_data, f)
|
|
|
|
mock_module = MagicMock()
|
|
mock_module.__path__ = [tmpdir]
|
|
ep = _make_entry_point(
|
|
"fakeexternal", "pkg.compliance", "prowler.compliance"
|
|
)
|
|
ep.load.return_value = mock_module
|
|
mock_ep.return_value = [ep]
|
|
|
|
bulk = Compliance.get_bulk("fakeexternal")
|
|
|
|
assert "custom_1.0_fakeexternal" in bulk
|
|
assert bulk["custom_1.0_fakeexternal"].Framework == "Custom"
|
|
|
|
@patch("prowler.lib.check.compliance_models.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.compliance_models.list_compliance_modules")
|
|
def test_compliance_get_bulk_skips_non_legacy_external_json(
|
|
self, mock_list_modules, mock_ep
|
|
):
|
|
"""A universal-schema JSON registered under prowler.compliance is skipped,
|
|
not aborting the run via sys.exit."""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.lib.check.compliance_models import Compliance
|
|
|
|
mock_list_modules.return_value = []
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
json_data = {
|
|
"framework": "Universal",
|
|
"name": "Universal Framework",
|
|
"version": "1.0",
|
|
"description": "Multi-provider",
|
|
"requirements": [
|
|
{
|
|
"id": "1",
|
|
"name": "r",
|
|
"description": "d",
|
|
"checks": {"aws": ["c"]},
|
|
}
|
|
],
|
|
}
|
|
with open(os.path.join(tmpdir, "universal_1.0.json"), "w") as f:
|
|
json.dump(json_data, f)
|
|
|
|
mock_module = MagicMock()
|
|
mock_module.__path__ = [tmpdir]
|
|
ep = _make_entry_point("aws", "pkg.compliance", "prowler.compliance")
|
|
ep.load.return_value = mock_module
|
|
mock_ep.return_value = [ep]
|
|
|
|
bulk = Compliance.get_bulk("aws")
|
|
|
|
assert "universal_1.0" not in bulk
|
|
|
|
@patch("prowler.lib.check.compliance_models.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.compliance_models.list_compliance_modules")
|
|
def test_compliance_get_bulk_file_fallback(self, mock_list_modules, mock_ep):
|
|
"""Compliance.get_bulk uses __file__ when external module has no __path__."""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.lib.check.compliance_models import Compliance
|
|
|
|
mock_list_modules.return_value = []
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
json_data = {
|
|
"Framework": "Custom",
|
|
"Name": "Custom File Fallback",
|
|
"Version": "1.0",
|
|
"Provider": "fakeexternal",
|
|
"Description": "Test",
|
|
"Requirements": [],
|
|
}
|
|
json_path = os.path.join(tmpdir, "custom_file_fakeexternal.json")
|
|
with open(json_path, "w") as f:
|
|
json.dump(json_data, f)
|
|
|
|
mock_module = MagicMock(spec=[])
|
|
mock_module.__file__ = os.path.join(tmpdir, "__init__.py")
|
|
del mock_module.__path__
|
|
ep = _make_entry_point(
|
|
"fakeexternal", "pkg.compliance", "prowler.compliance"
|
|
)
|
|
ep.load.return_value = mock_module
|
|
mock_ep.return_value = [ep]
|
|
|
|
bulk = Compliance.get_bulk("fakeexternal")
|
|
|
|
assert "custom_file_fakeexternal" in bulk
|
|
|
|
@patch("prowler.lib.check.compliance_models.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.compliance_models.list_compliance_modules")
|
|
def test_compliance_get_bulk_handles_external_exception(
|
|
self, mock_list_modules, mock_ep
|
|
):
|
|
"""Compliance.get_bulk handles exception when loading external compliance."""
|
|
from prowler.lib.check.compliance_models import Compliance
|
|
|
|
mock_list_modules.return_value = []
|
|
|
|
ep = _make_entry_point("fakeexternal", "pkg.compliance", "prowler.compliance")
|
|
ep.load.side_effect = Exception("Load failed")
|
|
mock_ep.return_value = [ep]
|
|
|
|
bulk = Compliance.get_bulk("fakeexternal")
|
|
|
|
assert bulk == {}
|
|
|
|
@patch("prowler.lib.check.compliance_models.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.compliance_models.list_compliance_modules")
|
|
def test_compliance_get_bulk_builtin_wins_on_duplicate(
|
|
self, mock_list_modules, mock_ep
|
|
):
|
|
"""Test 23: Compliance.get_bulk built-in wins on duplicate framework names."""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.lib.check.compliance_models import Compliance
|
|
|
|
mock_list_modules.return_value = []
|
|
mock_ep.return_value = []
|
|
|
|
# If both exist with same key, built-in (loaded first) should win
|
|
# Since we have no built-in modules mocked, just verify external loads
|
|
# The actual dedup logic: `if name not in bulk_compliance_frameworks`
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
json_data = {
|
|
"Framework": "CIS",
|
|
"Name": "CIS Test",
|
|
"Version": "1.0",
|
|
"Provider": "fakeexternal",
|
|
"Description": "Test",
|
|
"Requirements": [],
|
|
}
|
|
with open(os.path.join(tmpdir, "dup_framework.json"), "w") as f:
|
|
json.dump(json_data, f)
|
|
|
|
mock_module = MagicMock()
|
|
mock_module.__path__ = [tmpdir]
|
|
ep = _make_entry_point(
|
|
"fakeexternal", "pkg.compliance", "prowler.compliance"
|
|
)
|
|
ep.load.return_value = mock_module
|
|
mock_ep.return_value = [ep]
|
|
|
|
bulk = Compliance.get_bulk("fakeexternal")
|
|
|
|
assert "dup_framework" in bulk
|
|
|
|
@pytest.mark.parametrize(
|
|
"provider, framework_segments",
|
|
[
|
|
# `cloud` is a substring of THREE built-in modules at once.
|
|
("cloud", ["alibabacloud", "cloudflare", "oraclecloud"]),
|
|
("git", ["github"]),
|
|
("work", ["googleworkspace"]),
|
|
("open", ["openstack"]),
|
|
],
|
|
)
|
|
@patch("prowler.lib.check.compliance_models.importlib.metadata.entry_points")
|
|
@patch("prowler.lib.check.compliance_models.list_compliance_modules")
|
|
def test_compliance_get_bulk_matches_provider_segment_exactly(
|
|
self, mock_list_modules, mock_ep, provider, framework_segments
|
|
):
|
|
"""Regression: a provider whose name is a substring of one or more
|
|
framework modules must NOT load them. The old `provider in name`
|
|
check captured overlapping built-ins (e.g. `cloud` matched
|
|
alibabacloud, cloudflare and oraclecloud). See PR #10700 review
|
|
(Alan-TheGentleman).
|
|
"""
|
|
import json
|
|
import os
|
|
import tempfile
|
|
|
|
from prowler.lib.check.compliance_models import Compliance
|
|
|
|
mock_ep.return_value = []
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
# The substring path the old code would have read from.
|
|
os.mkdir(os.path.join(tmpdir, provider))
|
|
json_data = {
|
|
"Framework": "Custom",
|
|
"Name": f"Should not load for '{provider}'",
|
|
"Version": "1.0",
|
|
"Provider": provider,
|
|
"Description": "Test",
|
|
"Requirements": [],
|
|
}
|
|
with open(os.path.join(tmpdir, provider, "wrong.json"), "w") as f:
|
|
json.dump(json_data, f)
|
|
|
|
modules = []
|
|
for segment in framework_segments:
|
|
module = MagicMock()
|
|
module.name = f"prowler.compliance.{segment}"
|
|
module.module_finder.path = tmpdir
|
|
modules.append(module)
|
|
mock_list_modules.return_value = modules
|
|
|
|
bulk = Compliance.get_bulk(provider)
|
|
|
|
# Exact-segment match: the provider is not any of these modules.
|
|
assert "wrong" not in bulk
|
|
assert bulk == {}
|
|
|
|
|
|
# ===========================================================================
|
|
# 7. Parser
|
|
# ===========================================================================
|
|
|
|
|
|
class TestParser:
|
|
"""Tests 24-27: parser dynamic discovery."""
|
|
|
|
@patch("prowler.lib.cli.parser.Provider.get_providers_help_text")
|
|
@patch("prowler.lib.cli.parser.Provider.get_available_providers")
|
|
def test_parser_discovers_new_providers(self, mock_providers, mock_help):
|
|
"""Test 24: Parser discovers providers not in known_providers."""
|
|
from prowler.lib.cli.parser import ProwlerArgumentParser
|
|
|
|
mock_providers.return_value = [
|
|
"aws",
|
|
"azure",
|
|
"gcp",
|
|
"kubernetes",
|
|
"m365",
|
|
"github",
|
|
"googleworkspace",
|
|
"cloudflare",
|
|
"oraclecloud",
|
|
"openstack",
|
|
"alibabacloud",
|
|
"iac",
|
|
"llm",
|
|
"image",
|
|
"nhn",
|
|
"mongodbatlas",
|
|
"fakeexternal",
|
|
]
|
|
mock_help.return_value = {"fakeexternal": "Fake External Provider"}
|
|
|
|
parser = ProwlerArgumentParser()
|
|
|
|
assert "fakeexternal" in parser.parser.format_usage()
|
|
|
|
@patch("prowler.lib.cli.parser.Provider.get_providers_help_text")
|
|
@patch("prowler.lib.cli.parser.Provider.get_available_providers")
|
|
def test_parser_appends_to_epilog_with_help_text(self, mock_providers, mock_help):
|
|
"""Test 25: Parser appends new providers to epilog with _cli_help_text."""
|
|
from prowler.lib.cli.parser import ProwlerArgumentParser
|
|
|
|
mock_providers.return_value = [
|
|
"aws",
|
|
"azure",
|
|
"gcp",
|
|
"kubernetes",
|
|
"m365",
|
|
"github",
|
|
"googleworkspace",
|
|
"cloudflare",
|
|
"oraclecloud",
|
|
"openstack",
|
|
"alibabacloud",
|
|
"iac",
|
|
"llm",
|
|
"image",
|
|
"nhn",
|
|
"mongodbatlas",
|
|
"fakeexternal",
|
|
]
|
|
mock_help.return_value = {"fakeexternal": "Fake External Provider"}
|
|
|
|
parser = ProwlerArgumentParser()
|
|
epilog = parser.parser.epilog
|
|
|
|
assert "fakeexternal" in epilog
|
|
assert "Fake External Provider" in epilog
|
|
|
|
@patch("prowler.lib.cli.parser.Provider.get_providers_help_text")
|
|
@patch("prowler.lib.cli.parser.Provider.get_available_providers")
|
|
def test_parser_skips_epilog_entry_without_help_text(
|
|
self, mock_providers, mock_help
|
|
):
|
|
"""Test 26: Parser doesn't add epilog entry if _cli_help_text is empty."""
|
|
from prowler.lib.cli.parser import ProwlerArgumentParser
|
|
|
|
mock_providers.return_value = [
|
|
"aws",
|
|
"azure",
|
|
"gcp",
|
|
"kubernetes",
|
|
"m365",
|
|
"github",
|
|
"googleworkspace",
|
|
"cloudflare",
|
|
"oraclecloud",
|
|
"openstack",
|
|
"alibabacloud",
|
|
"iac",
|
|
"llm",
|
|
"image",
|
|
"nhn",
|
|
"mongodbatlas",
|
|
"nohelptext",
|
|
]
|
|
mock_help.return_value = {"nohelptext": ""}
|
|
|
|
parser = ProwlerArgumentParser()
|
|
epilog = parser.parser.epilog
|
|
|
|
# Should appear in usage/csv but NOT in the descriptive epilog listing
|
|
assert "nohelptext" in parser.parser.format_usage()
|
|
# No line with "nohelptext Something" in epilog
|
|
epilog_lines = [
|
|
line.strip() for line in epilog.splitlines() if "nohelptext" in line
|
|
]
|
|
assert len(epilog_lines) == 0 or all(
|
|
"nohelptext" in line and line.strip() == "nohelptext" or "{" in line
|
|
for line in epilog_lines
|
|
)
|
|
|
|
@patch("prowler.lib.cli.parser.Provider.get_providers_help_text")
|
|
@patch("prowler.lib.cli.parser.Provider.get_available_providers")
|
|
def test_parser_does_not_duplicate_known_providers(self, mock_providers, mock_help):
|
|
"""Test 27: Parser doesn't duplicate providers already in the known list."""
|
|
from prowler.lib.cli.parser import ProwlerArgumentParser
|
|
|
|
# No new providers
|
|
mock_providers.return_value = [
|
|
"aws",
|
|
"azure",
|
|
"gcp",
|
|
"kubernetes",
|
|
"m365",
|
|
"github",
|
|
"googleworkspace",
|
|
"cloudflare",
|
|
"oraclecloud",
|
|
"openstack",
|
|
"alibabacloud",
|
|
"iac",
|
|
"llm",
|
|
"image",
|
|
"nhn",
|
|
"mongodbatlas",
|
|
]
|
|
mock_help.return_value = {}
|
|
|
|
parser = ProwlerArgumentParser()
|
|
usage = parser.parser.format_usage()
|
|
|
|
# aws should appear exactly once in usage
|
|
assert usage.count("aws") == 1
|
|
|
|
|
|
# ===========================================================================
|
|
# 8. Dispatch Fallbacks
|
|
# ===========================================================================
|
|
|
|
|
|
class TestDispatchFallbacks:
|
|
"""Tests 28-34: all else clause fallbacks for external providers."""
|
|
|
|
def test_stdout_report_calls_get_stdout_detail(self, fake_provider):
|
|
"""Test 28: stdout_report else clause calls provider.get_stdout_detail."""
|
|
from prowler.lib.outputs.outputs import stdout_report
|
|
|
|
finding = MagicMock()
|
|
finding.check_metadata.Provider = "fakeexternal"
|
|
finding.status = "FAIL"
|
|
finding.muted = False
|
|
finding.status_extended = "test"
|
|
|
|
with patch("builtins.print") as mock_print:
|
|
stdout_report(
|
|
finding, "\033[31m", True, ["FAIL"], False, provider=fake_provider
|
|
)
|
|
|
|
mock_print.assert_called_once()
|
|
printed = mock_print.call_args[0][0]
|
|
assert "fake-detail" in printed
|
|
|
|
def test_stdout_report_resolves_provider_when_none(self, fake_provider):
|
|
"""stdout_report resolves provider via get_global_provider when not passed."""
|
|
from prowler.lib.outputs.outputs import stdout_report
|
|
|
|
finding = MagicMock()
|
|
finding.check_metadata.Provider = "fakeexternal"
|
|
finding.status = "FAIL"
|
|
finding.muted = False
|
|
finding.status_extended = "test"
|
|
|
|
with patch("builtins.print") as mock_print:
|
|
stdout_report(finding, "\033[31m", True, ["FAIL"], False)
|
|
|
|
mock_print.assert_called_once()
|
|
printed = mock_print.call_args[0][0]
|
|
assert "fake-detail" in printed
|
|
|
|
def test_report_propagates_provider_to_stdout_report(self):
|
|
"""Regression guard: report() must pass its local `provider` through to
|
|
stdout_report so the dynamic else does not fall back to the global
|
|
singleton. With Provider._global cleared, the call chain still has to
|
|
work for an external provider — proving the argument is being used
|
|
instead of `Provider.get_global_provider()`. See PR #10700 review
|
|
(HugoPBrito)."""
|
|
from prowler.lib.outputs.outputs import report
|
|
from prowler.providers.common.provider import Provider
|
|
|
|
local_provider = FakeExternalProvider.__new__(FakeExternalProvider)
|
|
|
|
finding = MagicMock()
|
|
finding.status = "PASS"
|
|
finding.muted = False
|
|
finding.region = "x"
|
|
finding.check_metadata.Provider = "fakeexternal"
|
|
finding.status_extended = "test"
|
|
|
|
output_options = MagicMock()
|
|
output_options.verbose = True
|
|
output_options.status = []
|
|
output_options.fixer = False
|
|
|
|
# Clear the global singleton so any unintended fallback would crash.
|
|
previous_global = Provider._global
|
|
Provider._global = None
|
|
try:
|
|
with patch("builtins.print") as mock_print:
|
|
report([finding], local_provider, output_options)
|
|
|
|
# report() prints the finding line plus an empty separator when
|
|
# verbose is set; we only care that the finding was rendered using
|
|
# the local provider's `get_stdout_detail` ("fake-detail").
|
|
printed = "".join(
|
|
call.args[0] for call in mock_print.call_args_list if call.args
|
|
)
|
|
assert "fake-detail" in printed
|
|
finally:
|
|
Provider._global = previous_global
|
|
|
|
def test_report_sort_calls_get_finding_sort_key(self, fake_provider):
|
|
"""Test 29: report else clause calls provider.get_finding_sort_key."""
|
|
from prowler.lib.outputs.outputs import report
|
|
|
|
finding1 = MagicMock()
|
|
finding1.status = "PASS"
|
|
finding1.muted = False
|
|
finding1.region = "b-region"
|
|
finding1.check_metadata.Provider = "fakeexternal"
|
|
finding1.status_extended = "test1"
|
|
|
|
finding2 = MagicMock()
|
|
finding2.status = "PASS"
|
|
finding2.muted = False
|
|
finding2.region = "a-region"
|
|
finding2.check_metadata.Provider = "fakeexternal"
|
|
finding2.status_extended = "test2"
|
|
|
|
output_options = MagicMock()
|
|
output_options.verbose = False
|
|
output_options.status = []
|
|
|
|
findings = [finding1, finding2]
|
|
report(findings, fake_provider, output_options)
|
|
|
|
# Should be sorted by region (get_finding_sort_key returns "region")
|
|
assert findings[0].region == "a-region"
|
|
assert findings[1].region == "b-region"
|
|
|
|
def test_display_summary_table_calls_get_summary_entity(self, fake_provider):
|
|
"""Test 30: display_summary_table else clause calls provider.get_summary_entity."""
|
|
from prowler.lib.outputs.summary_table import display_summary_table
|
|
|
|
finding = MagicMock()
|
|
finding.status = "FAIL"
|
|
finding.muted = False
|
|
finding.check_metadata.ServiceName = "test_service"
|
|
finding.check_metadata.Provider = "fakeexternal"
|
|
finding.check_metadata.Severity = "high"
|
|
|
|
output_options = MagicMock()
|
|
output_options.output_directory = "/tmp"
|
|
output_options.output_filename = "test"
|
|
output_options.output_modes = []
|
|
|
|
with patch("builtins.print") as mock_print:
|
|
display_summary_table([finding], fake_provider, output_options)
|
|
|
|
printed_text = " ".join(str(c) for c in mock_print.call_args_list)
|
|
assert "Fake Host" in printed_text or "fake-host-1" in printed_text
|
|
|
|
def test_generate_output_calls_get_finding_output_data(self, fake_provider):
|
|
"""Test 31: finding.generate_output else clause calls provider.get_finding_output_data."""
|
|
from prowler.lib.check.models import (
|
|
CheckMetadata,
|
|
Code,
|
|
Recommendation,
|
|
Remediation,
|
|
)
|
|
from prowler.lib.outputs.finding import Finding
|
|
|
|
metadata = CheckMetadata(
|
|
Provider="fakeexternal",
|
|
CheckID="test_check",
|
|
CheckTitle="Test check title",
|
|
CheckType=[],
|
|
ServiceName="test",
|
|
SubServiceName="",
|
|
ResourceIdTemplate="",
|
|
Severity="high",
|
|
ResourceType="Test",
|
|
ResourceGroup="",
|
|
Description="Test description",
|
|
Risk="Test risk",
|
|
RelatedUrl="",
|
|
Remediation=Remediation(
|
|
Code=Code(CLI="", NativeIaC="", Other="", Terraform=""),
|
|
Recommendation=Recommendation(
|
|
Text="Fix it", Url="https://hub.prowler.com/check/test_check"
|
|
),
|
|
),
|
|
Categories=[],
|
|
DependsOn=[],
|
|
RelatedTo=[],
|
|
Notes="",
|
|
)
|
|
|
|
check_output = MagicMock()
|
|
check_output.check_metadata = metadata
|
|
check_output.status = "FAIL"
|
|
check_output.status_extended = "test failed"
|
|
check_output.muted = False
|
|
check_output.resource = {}
|
|
check_output.resource_details = ""
|
|
check_output.resource_tags = {}
|
|
check_output.compliance = {}
|
|
|
|
output_options = MagicMock()
|
|
output_options.unix_timestamp = False
|
|
output_options.bulk_checks_metadata = {}
|
|
|
|
finding = Finding.generate_output(fake_provider, check_output, output_options)
|
|
|
|
assert finding.auth_method == "fake"
|
|
assert finding.account_uid == "fake-account"
|
|
assert finding.resource_name == "fake-resource"
|
|
assert finding.region == "local"
|
|
|
|
def test_output_options_calls_get_output_options(self, fake_provider):
|
|
"""Test 32: __main__.py else clause calls provider.get_output_options."""
|
|
result = fake_provider.get_output_options(MagicMock(), {})
|
|
|
|
assert result is not None
|
|
assert hasattr(result, "output_directory")
|
|
|
|
def test_html_assessment_calls_get_html_assessment_summary(self, fake_provider):
|
|
"""Test 33: html.py fallback calls provider.get_html_assessment_summary."""
|
|
from prowler.lib.outputs.html.html import HTML
|
|
|
|
result = HTML.get_assessment_summary(fake_provider)
|
|
|
|
assert "<div>Fake Assessment</div>" in result
|
|
|
|
def test_compliance_output_calls_generate_compliance_output(self, fake_provider):
|
|
"""Test 34: __main__.py else clause calls provider.generate_compliance_output."""
|
|
generated_outputs = {"compliance": []}
|
|
|
|
fake_provider.generate_compliance_output(
|
|
[],
|
|
{},
|
|
set(),
|
|
MagicMock(),
|
|
generated_outputs,
|
|
)
|
|
|
|
assert "fake-compliance-output" in generated_outputs["compliance"]
|
|
|
|
|
|
# ===========================================================================
|
|
# 9. Base Contract Defaults
|
|
# ===========================================================================
|
|
|
|
|
|
class TestBaseContractDefaults:
|
|
"""Tests for Provider base class default implementations."""
|
|
|
|
def test_from_cli_args_raises_not_implemented(self):
|
|
"""Base Provider.from_cli_args raises NotImplementedError."""
|
|
with pytest.raises(NotImplementedError):
|
|
FakeProviderNoHelpText.from_cli_args(MagicMock(), {})
|
|
|
|
def test_get_output_options_raises_not_implemented(self):
|
|
"""Base Provider.get_output_options raises NotImplementedError; the
|
|
generic default is applied at the call site via default_output_options."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.get_output_options(MagicMock(), {})
|
|
|
|
def test_default_output_options_builds_generic_default(self):
|
|
"""default_output_options returns a generic ProviderOutputOptions so an
|
|
external provider without get_output_options still produces output
|
|
instead of aborting the run."""
|
|
from prowler.config.config import output_file_timestamp
|
|
from prowler.providers.common.models import (
|
|
ProviderOutputOptions,
|
|
default_output_options,
|
|
)
|
|
|
|
provider = FakeProviderNoHelpText()
|
|
arguments = Namespace(
|
|
status=None,
|
|
output_formats=None,
|
|
output_directory=None,
|
|
output_filename=None,
|
|
verbose=None,
|
|
only_logs=None,
|
|
unix_timestamp=None,
|
|
shodan=None,
|
|
fixer=None,
|
|
)
|
|
|
|
output_options = default_output_options(provider, arguments, {})
|
|
|
|
assert isinstance(output_options, ProviderOutputOptions)
|
|
assert (
|
|
output_options.output_filename
|
|
== f"prowler-output-{provider.type}-{output_file_timestamp}"
|
|
)
|
|
|
|
def test_default_output_options_honors_explicit_filename(self):
|
|
"""A user-supplied output_filename is preserved by default_output_options."""
|
|
from prowler.providers.common.models import default_output_options
|
|
|
|
provider = FakeProviderNoHelpText()
|
|
arguments = Namespace(
|
|
status=None,
|
|
output_formats=None,
|
|
output_directory=None,
|
|
output_filename="custom-name",
|
|
verbose=None,
|
|
only_logs=None,
|
|
unix_timestamp=None,
|
|
shodan=None,
|
|
fixer=None,
|
|
)
|
|
|
|
output_options = default_output_options(provider, arguments, {})
|
|
|
|
assert output_options.output_filename == "custom-name"
|
|
|
|
def test_get_stdout_detail_raises_not_implemented(self):
|
|
"""Base Provider.get_stdout_detail raises NotImplementedError."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.get_stdout_detail(MagicMock())
|
|
|
|
def test_get_finding_sort_key_returns_none(self):
|
|
"""Base Provider.get_finding_sort_key returns None."""
|
|
provider = FakeProviderNoHelpText()
|
|
assert provider.get_finding_sort_key() is None
|
|
|
|
def test_get_summary_entity_returns_type_and_account_default(self):
|
|
"""Base Provider.get_summary_entity returns (type, account_id) so the
|
|
summary table is not silently dropped for providers that don't override
|
|
it."""
|
|
from types import SimpleNamespace
|
|
from unittest.mock import PropertyMock
|
|
|
|
provider = FakeProviderNoHelpText()
|
|
with patch.object(
|
|
type(provider),
|
|
"identity",
|
|
new_callable=PropertyMock,
|
|
return_value=SimpleNamespace(account_id="acc-123"),
|
|
):
|
|
entity_type, audited_entities = provider.get_summary_entity()
|
|
|
|
assert entity_type == provider.type
|
|
assert audited_entities == "acc-123"
|
|
|
|
def test_get_summary_entity_defaults_account_to_empty_string(self):
|
|
"""When the identity has no account_id, audited_entities falls back to ''."""
|
|
from types import SimpleNamespace
|
|
from unittest.mock import PropertyMock
|
|
|
|
provider = FakeProviderNoHelpText()
|
|
with patch.object(
|
|
type(provider),
|
|
"identity",
|
|
new_callable=PropertyMock,
|
|
return_value=SimpleNamespace(),
|
|
):
|
|
entity_type, audited_entities = provider.get_summary_entity()
|
|
|
|
assert entity_type == provider.type
|
|
assert audited_entities == ""
|
|
|
|
def test_get_finding_output_data_raises_not_implemented(self):
|
|
"""Base Provider.get_finding_output_data raises NotImplementedError."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.get_finding_output_data(MagicMock())
|
|
|
|
def test_get_html_assessment_summary_raises_not_implemented(self):
|
|
"""Base Provider.get_html_assessment_summary raises NotImplementedError."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.get_html_assessment_summary()
|
|
|
|
def test_generate_compliance_output_raises_not_implemented(self):
|
|
"""Base Provider.generate_compliance_output raises NotImplementedError."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.generate_compliance_output([], {}, set(), MagicMock(), {})
|
|
|
|
def test_get_mutelist_finding_args_raises_not_implemented(self):
|
|
"""Base Provider.get_mutelist_finding_args raises NotImplementedError."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.get_mutelist_finding_args()
|
|
|
|
def test_display_compliance_table_raises_not_implemented(self):
|
|
"""Base Provider.display_compliance_table raises NotImplementedError."""
|
|
provider = FakeProviderNoHelpText()
|
|
with pytest.raises(NotImplementedError):
|
|
provider.display_compliance_table([], {}, "fw", "out", "/tmp", False)
|
|
|
|
def test_is_external_tool_provider_defaults_to_false(self):
|
|
"""Base Provider.is_external_tool_provider returns False."""
|
|
provider = FakeProviderNoHelpText()
|
|
assert provider.is_external_tool_provider is False
|
|
|
|
|
|
# ===========================================================================
|
|
# 10. Mutelist Dispatch for External Providers
|
|
# ===========================================================================
|
|
|
|
|
|
class TestMutelistDispatch:
|
|
"""Tests for mutelist integration with external providers."""
|
|
|
|
def test_get_mutelist_finding_args_returns_identity(self, fake_provider):
|
|
"""External provider returns identity kwargs for mutelist."""
|
|
args = fake_provider.get_mutelist_finding_args()
|
|
|
|
assert args == {"host_id": "fake-host-1"}
|
|
|
|
def test_mutelist_dispatch_calls_external_provider(self, fake_provider):
|
|
"""execute() uses get_mutelist_finding_args for unknown provider types."""
|
|
from prowler.lib.check.check import execute
|
|
|
|
# Create a mock check that returns one finding
|
|
finding = MagicMock()
|
|
finding.status = "FAIL"
|
|
finding.muted = False
|
|
finding.check_metadata.Provider = "fakeexternal"
|
|
|
|
check = MagicMock()
|
|
check.execute.return_value = [finding]
|
|
check.CheckID = "fake_check"
|
|
check.ServiceName = "fake_service"
|
|
check.Severity.value = "high"
|
|
|
|
# Setup mutelist on the provider
|
|
fake_provider.mutelist = MagicMock()
|
|
fake_provider.mutelist.mutelist = {"Accounts": {}}
|
|
fake_provider.mutelist.is_finding_muted.return_value = True
|
|
|
|
output_options = MagicMock()
|
|
output_options.status = []
|
|
output_options.unix_timestamp = False
|
|
|
|
execute(check, fake_provider, None, output_options)
|
|
|
|
# is_finding_muted should have been called with host_id + finding
|
|
fake_provider.mutelist.is_finding_muted.assert_called_once_with(
|
|
host_id="fake-host-1", finding=finding
|
|
)
|
|
|
|
|
|
# ===========================================================================
|
|
# 11. Compliance Table Dispatch for External Providers
|
|
# ===========================================================================
|
|
|
|
|
|
class TestComplianceTableDispatch:
|
|
"""Tests for compliance table display with external providers."""
|
|
|
|
def test_display_compliance_table_delegates_to_provider(self, fake_provider):
|
|
"""display_compliance_table uses provider method for unknown frameworks."""
|
|
from prowler.lib.outputs.compliance.compliance import (
|
|
display_compliance_table,
|
|
)
|
|
|
|
fake_provider.display_compliance_table = MagicMock(return_value=True)
|
|
|
|
display_compliance_table(
|
|
[], {}, "custom_1.0_fakeexternal", "out", "/tmp", False
|
|
)
|
|
|
|
fake_provider.display_compliance_table.assert_called_once_with(
|
|
[],
|
|
{},
|
|
"custom_1.0_fakeexternal",
|
|
"out",
|
|
"/tmp",
|
|
False,
|
|
)
|
|
|
|
def test_display_compliance_table_falls_back_to_generic(self, fake_provider):
|
|
"""display_compliance_table falls back to generic when provider returns False."""
|
|
from prowler.lib.outputs.compliance.compliance import (
|
|
display_compliance_table,
|
|
)
|
|
|
|
fake_provider.display_compliance_table = MagicMock(return_value=False)
|
|
|
|
with patch(
|
|
"prowler.lib.outputs.compliance.compliance.get_generic_compliance_table"
|
|
) as mock_generic:
|
|
display_compliance_table(
|
|
[], {}, "custom_1.0_fakeexternal", "out", "/tmp", False
|
|
)
|
|
|
|
mock_generic.assert_called_once()
|
|
|
|
def test_display_compliance_table_falls_back_on_not_implemented(self):
|
|
"""display_compliance_table falls back to generic when NotImplementedError."""
|
|
# Use a provider that doesn't implement display_compliance_table
|
|
provider = FakeProviderNoHelpText()
|
|
Provider.set_global_provider(provider)
|
|
|
|
with patch(
|
|
"prowler.lib.outputs.compliance.compliance.get_generic_compliance_table"
|
|
) as mock_generic:
|
|
from prowler.lib.outputs.compliance.compliance import (
|
|
display_compliance_table,
|
|
)
|
|
|
|
display_compliance_table(
|
|
[], {}, "unknown_1.0_nohelptext", "out", "/tmp", False
|
|
)
|
|
|
|
mock_generic.assert_called_once()
|
|
Provider._global = None
|
|
|
|
|
|
# ===========================================================================
|
|
# 12. Provider.get_class — Public side-effect-free class resolver
|
|
# ===========================================================================
|
|
|
|
|
|
class TestGetClass:
|
|
"""Tests for Provider.get_class(provider) — the public, side-effect-free
|
|
class resolver that unblocks the Django API and other callers that need
|
|
a provider class without triggering CLI side-effects (sys.exit, global
|
|
provider mutation)."""
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T1: Built-in provider resolves to correct class
|
|
# -----------------------------------------------------------------------
|
|
|
|
def test_get_class_builtin_returns_correct_class(self):
|
|
"""get_class('aws') returns AwsProvider — identity check."""
|
|
from prowler.providers.aws.aws_provider import AwsProvider
|
|
|
|
cls = Provider.get_class("aws")
|
|
|
|
assert cls is AwsProvider
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T2: External entry-point provider resolves
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_get_class_external_ep_returns_class(self, mock_is_builtin, mock_ep):
|
|
"""get_class resolves an external entry-point provider and returns that class."""
|
|
mock_is_builtin.return_value = False
|
|
mock_ep.return_value = [
|
|
_make_entry_point(
|
|
"fakeexternal", "pkg:FakeExternalProvider", "prowler.providers"
|
|
),
|
|
]
|
|
mock_ep.return_value[0].load.return_value = FakeExternalProvider
|
|
|
|
cls = Provider.get_class("fakeexternal")
|
|
|
|
assert cls is FakeExternalProvider
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T3: Unknown provider raises, does NOT call sys.exit
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_get_class_unknown_raises_and_does_not_sys_exit(
|
|
self, mock_is_builtin, mock_ep
|
|
):
|
|
"""get_class raises for an unknown provider and never calls sys.exit."""
|
|
mock_is_builtin.return_value = False
|
|
mock_ep.return_value = []
|
|
|
|
# Assert ImportError specifically to enforce the public API contract
|
|
# (not a broad Exception). SystemExit belongs in init_global_provider's
|
|
# wrapper, not in the pure resolver.
|
|
with pytest.raises(ImportError):
|
|
Provider.get_class("totally_unknown_xyz_provider")
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T4: get_class is PURE for built-ins — no collision warning, no EP call
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.logger")
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_get_class_builtin_with_ep_shadow_is_pure(
|
|
self, mock_is_builtin, mock_import, mock_load_ep, mock_logger
|
|
):
|
|
"""get_class for a built-in with a same-named EP is PURE:
|
|
- returns the built-in class
|
|
- does NOT emit a collision warning
|
|
- does NOT call _load_ep_provider (so _ep_providers cache stays empty for
|
|
this key, proving no side-effect)
|
|
"""
|
|
import types
|
|
|
|
mock_is_builtin.return_value = True
|
|
mock_load_ep.return_value = FakeExternalProvider # plug-in shadow present
|
|
|
|
fake_module = types.ModuleType("fake_builtin_module")
|
|
fake_builtin_cls = type("AwsProvider", (Provider,), {"_type": "aws"})
|
|
fake_module.AwsProvider = fake_builtin_cls
|
|
mock_import.return_value = fake_module
|
|
|
|
cls = Provider.get_class("aws")
|
|
|
|
# Built-in class returned
|
|
assert cls is fake_builtin_cls
|
|
# No collision warning emitted — that is now init_global_provider's job
|
|
warning_msgs = [
|
|
call.args[0]
|
|
for call in mock_logger.warning.call_args_list
|
|
if call.args and "IGNORED" in call.args[0]
|
|
]
|
|
assert not warning_msgs, (
|
|
"get_class must NOT emit a collision warning; "
|
|
"init_global_provider owns that responsibility"
|
|
)
|
|
# _load_ep_provider must NOT have been called for the built-in path
|
|
mock_load_ep.assert_not_called()
|
|
# _ep_providers cache must not contain 'aws' (no side-effect)
|
|
assert "aws" not in Provider._ep_providers
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T4b: Built-in module missing its expected class raises ImportError
|
|
# and does NOT fall back to a same-named entry point
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_get_class_builtin_missing_class_raises_importerror(
|
|
self, mock_is_builtin, mock_import, mock_load_ep
|
|
):
|
|
"""When is_builtin is True but the module does not define the expected
|
|
provider class, get_class raises ImportError and does NOT fall back to a
|
|
same-named entry point — falling back would contradict is_builtin and
|
|
silently return a foreign class."""
|
|
import types
|
|
|
|
mock_is_builtin.return_value = True
|
|
# Module imports fine but lacks the expected `<Name>Provider` attribute.
|
|
empty_module = types.ModuleType("empty_builtin_module")
|
|
mock_import.return_value = empty_module
|
|
|
|
with pytest.raises(ImportError):
|
|
Provider.get_class("aws")
|
|
|
|
# Must NOT fall back to entry points for a (broken) built-in.
|
|
mock_load_ep.assert_not_called()
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T4c: Entry point resolving to a non-Provider class raises ImportError
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_get_class_external_ep_not_provider_subclass_raises_importerror(
|
|
self, mock_is_builtin, mock_ep
|
|
):
|
|
"""When an entry point resolves to an object that is not a Provider
|
|
subclass, get_class raises ImportError instead of returning it, so the
|
|
public contract (a Provider subclass) is enforced rather than trusted."""
|
|
|
|
class NotAProvider:
|
|
pass
|
|
|
|
mock_is_builtin.return_value = False
|
|
mock_ep.return_value = [
|
|
_make_entry_point("rogue", "pkg:NotAProvider", "prowler.providers"),
|
|
]
|
|
mock_ep.return_value[0].load.return_value = NotAProvider
|
|
|
|
with pytest.raises(ImportError):
|
|
Provider.get_class("rogue")
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T4d: Contract — every built-in provider stays resolvable via get_class
|
|
# -----------------------------------------------------------------------
|
|
|
|
@pytest.mark.parametrize(
|
|
"provider",
|
|
[
|
|
name
|
|
for name in Provider.get_available_providers()
|
|
if Provider.is_builtin(name)
|
|
],
|
|
)
|
|
def test_get_class_resolves_every_builtin_provider(self, provider):
|
|
"""Contract test over all built-in providers: each one must remain
|
|
resolvable through get_class and return a Provider subclass whose name
|
|
follows the `{Capitalized}Provider` convention. This pins the naming
|
|
convention as the built-in resolution contract, so a future provider
|
|
that breaks it fails here instead of silently at runtime in a caller
|
|
(e.g. the API)."""
|
|
cls = Provider.get_class(provider)
|
|
|
|
assert isinstance(cls, type) and issubclass(cls, Provider)
|
|
assert cls.__name__ == f"{provider.capitalize()}Provider"
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T5: Regression — init_global_provider still resolves external correctly
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
def test_init_global_provider_still_resolves_external_via_get_class(
|
|
self, mock_load_ep, mock_config
|
|
):
|
|
"""Regression: init_global_provider continues to work for external providers
|
|
after the class-resolution block is delegated to get_class.
|
|
|
|
'fakepure' is not a built-in, so is_builtin() returns False and get_class
|
|
takes the entry-point path. This verifies the FakePureContractProvider path
|
|
(pure from_cli_args returning an instance) still works — i.e.,
|
|
init_global_provider correctly wires the returned instance as global provider.
|
|
"""
|
|
mock_load_ep.return_value = FakePureContractProvider
|
|
mock_config.return_value = {}
|
|
|
|
args = Namespace(
|
|
provider="fakepure",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
)
|
|
|
|
Provider._global = None
|
|
Provider.init_global_provider(args)
|
|
|
|
assert isinstance(Provider._global, FakePureContractProvider)
|
|
Provider._global = None
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T6: Regression — get_providers_help_text returns same text after refactor
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.Provider._load_ep_provider")
|
|
@patch("prowler.providers.common.provider.Provider.get_available_providers")
|
|
def test_get_providers_help_text_identical_after_refactor_external(
|
|
self, mock_providers, mock_load_ep
|
|
):
|
|
"""get_providers_help_text returns identical _cli_help_text for an external
|
|
provider both before and after the refactor to use get_class internally."""
|
|
mock_providers.return_value = ["fakeexternal"]
|
|
mock_load_ep.return_value = FakeExternalProvider
|
|
|
|
help_text = Provider.get_providers_help_text()
|
|
|
|
# Must match the known _cli_help_text on FakeExternalProvider
|
|
assert help_text["fakeexternal"] == "Fake External Provider"
|
|
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
@patch("prowler.providers.common.provider.Provider.get_available_providers")
|
|
def test_get_providers_help_text_identical_after_refactor_builtin(
|
|
self, mock_providers, mock_is_builtin, mock_import
|
|
):
|
|
"""get_providers_help_text returns identical _cli_help_text for a built-in
|
|
provider both before and after the refactor to use get_class internally.
|
|
is_builtin is mocked to True so get_class takes the built-in import path."""
|
|
import types
|
|
|
|
mock_providers.return_value = ["fakebuiltin"]
|
|
mock_is_builtin.return_value = True
|
|
mock_cls = type(
|
|
"FakebuiltinProvider", (Provider,), {"_cli_help_text": "Built-in Help"}
|
|
)
|
|
mock_module = types.ModuleType("fake_module")
|
|
mock_module.FakebuiltinProvider = mock_cls
|
|
mock_import.return_value = mock_module
|
|
|
|
help_text = Provider.get_providers_help_text()
|
|
|
|
assert help_text["fakebuiltin"] == "Built-in Help"
|
|
|
|
# -----------------------------------------------------------------------
|
|
# T7: init_global_provider emits collision warning (not get_class)
|
|
# -----------------------------------------------------------------------
|
|
|
|
@patch("prowler.providers.common.provider.load_and_validate_config_file")
|
|
@patch("prowler.providers.common.provider.importlib.metadata.entry_points")
|
|
@patch("prowler.providers.common.provider.import_module")
|
|
@patch("prowler.providers.common.provider.Provider.is_builtin")
|
|
def test_init_global_provider_emits_collision_warning_for_builtin_ep_shadow(
|
|
self, mock_is_builtin, mock_import, mock_entry_points, mock_config, caplog
|
|
):
|
|
"""init_global_provider (not get_class) emits the collision warning
|
|
when a built-in provider has a same-named entry-point plug-in registered.
|
|
|
|
This is the counterpart to test_get_class_builtin_with_ep_shadow_is_pure:
|
|
the warning responsibility moved OUT of get_class and INTO
|
|
init_global_provider, so users still see the message on CLI invocation
|
|
but prowler --help and API calls (which never hit init_global_provider)
|
|
do not spuriously emit it. The shadow is detected by entry-point name
|
|
only — the plug-in is never loaded to warn.
|
|
"""
|
|
import logging
|
|
import types
|
|
|
|
mock_is_builtin.return_value = True
|
|
shadow_ep = MagicMock()
|
|
shadow_ep.name = "aws" # plug-in shadowing the built-in name
|
|
mock_entry_points.return_value = [shadow_ep]
|
|
|
|
fake_module = types.ModuleType("fake_builtin_module")
|
|
fake_module.AwsProvider = MagicMock(side_effect=lambda **_kw: None)
|
|
mock_import.return_value = fake_module
|
|
mock_config.return_value = {}
|
|
|
|
args = Namespace(
|
|
provider="aws",
|
|
fixer_config="config.yaml",
|
|
config_file="config.yaml",
|
|
aws_retries_max_attempts=3,
|
|
role=None,
|
|
session_duration=None,
|
|
external_id=None,
|
|
role_session_name=None,
|
|
mfa=None,
|
|
profile=None,
|
|
region=None,
|
|
excluded_region=None,
|
|
organizations_role=None,
|
|
scan_unused_services=False,
|
|
resource_tag=None,
|
|
resource_arn=None,
|
|
mutelist_file=None,
|
|
)
|
|
|
|
Provider._global = None
|
|
with caplog.at_level(logging.WARNING, logger="prowler"):
|
|
try:
|
|
Provider.init_global_provider(args)
|
|
except BaseException:
|
|
# AwsProvider mock is fake; dispatch may fail — only the
|
|
# warning emitted BEFORE dispatch matters here.
|
|
pass
|
|
Provider._global = None
|
|
|
|
collision_warnings = [
|
|
r.message
|
|
for r in caplog.records
|
|
if "Plug-in provider 'aws'" in r.message and "IGNORED" in r.message
|
|
]
|
|
assert collision_warnings, (
|
|
"init_global_provider must emit the collision warning when a "
|
|
"same-named EP plug-in exists for a built-in provider"
|
|
)
|
|
# Shadow detected by name only — the plug-in is never loaded to warn.
|
|
shadow_ep.load.assert_not_called()
|