diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 1b499c34e9..d187019a5b 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -18,6 +18,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `cosmosdb_account_public_network_access_disabled` check for Azure provider, verifying Cosmos DB accounts have public network access disabled so connectivity is restricted to private endpoints or VNet service endpoints [(#11034)](https://github.com/prowler-cloud/prowler/pull/11034) - `databricks_workspace_public_network_access_disabled` check for Azure provider, verifying Databricks workspaces have public network access disabled so connectivity is restricted to Azure Private Link private endpoints [(#11035)](https://github.com/prowler-cloud/prowler/pull/11035) - `aks_cluster_auto_upgrade_enabled` check for Azure provider [(#11027)](https://github.com/prowler-cloud/prowler/pull/11027) +- Public `Provider.get_class()` method that resolves a provider class by name for both built-in and external (entry-point) providers [(#11398)](https://github.com/prowler-cloud/prowler/pull/11398) - Jira timeout preventing the calls from hanging indefinitely when the Jira endpoint is unreachable or slow [(#11602)](https://github.com/prowler-cloud/prowler/pull/11602) - TLS certificate verification in the `codepipeline_project_repo_private` check, which previously used an unverified SSL context, leaving the repository-visibility probe open to MITM tampering [(#11603)](https://github.com/prowler-cloud/prowler/pull/11603) - `entra_directory_sync_object_takeover_blocked` check for the M365 provider, verifying that hybrid Entra tenants block cloud object takeover through both soft-match and hard-match directory synchronization [(#11098)](https://github.com/prowler-cloud/prowler/pull/11098) diff --git a/prowler/providers/common/provider.py b/prowler/providers/common/provider.py index 8bc7567795..77ae9180a3 100644 --- a/prowler/providers/common/provider.py +++ b/prowler/providers/common/provider.py @@ -261,36 +261,16 @@ class Provider(ABC): @staticmethod def init_global_provider(arguments: Namespace) -> None: try: - # Discriminate built-in vs external upfront via find_spec, so an - # ImportError from a transitive dependency missing inside a - # built-in's own import chain surfaces clearly instead of being - # silently re-routed to the entry-point path. - provider_class = None - if Provider.is_builtin(arguments.provider): - # Built-in wins on provider-name collision. Plug-ins are - # first-class extenders (they can register new provider - # names) but cannot override existing built-ins — a security - # tool prefers fail-loud predictability over silent - # overrides. Surface the override so the user knows their - # plug-in is being ignored and can rename it. - # Match by name only — never ep.load() a shadowing plug-in. - if any( - ep.name == arguments.provider - for ep in importlib.metadata.entry_points(group="prowler.providers") - ): - logger.warning( - f"Plug-in provider '{arguments.provider}' registered " - f"via entry points is being IGNORED — a built-in with " - f"the same name exists. To use your plug-in, register " - f"it under a different name." - ) - provider_class_path = f"{providers_path}.{arguments.provider}.{arguments.provider}_provider" - provider_class_name = f"{arguments.provider.capitalize()}Provider" - try: - provider_class = getattr( - import_module(provider_class_path), provider_class_name - ) - except ImportError as e: + # Delegate class resolution to the public, side-effect-free + # resolver. init_global_provider owns the CLI-specific error + # handling: a missing transitive dep in a built-in becomes a + # logger.critical + sys.exit(1); a completely unknown provider + # re-raises so the outer try/except can sys.exit too. + try: + provider_class = Provider.get_class(arguments.provider) + except ImportError as e: + if Provider.is_builtin(arguments.provider): + # Built-in's transitive dependency is missing — loud CLI error. logger.critical( f"Failed to load built-in provider '{arguments.provider}'. " f"Missing dependency: {e}. " @@ -298,25 +278,28 @@ class Provider(ABC): ) logger.debug("Full traceback:", exc_info=True) sys.exit(1) - except AttributeError: - # Module exists but doesn't define the expected class — - # treat as external and try entry points. - provider_class = Provider._load_ep_provider(arguments.provider) - else: - provider_class = Provider._load_ep_provider(arguments.provider) + # Unknown or missing external provider — propagate so the + # outer try/except can handle it (sys.exit(1) via generic + # exception handler). + raise - if provider_class is None: - raise ImportError( - f"Provider '{arguments.provider}' not found as built-in or entry point" + # Built-in wins on name collision — warn that a same-named + # plug-in is ignored. This lives here (not in get_class) so + # that `prowler --help` and API callers that resolve a class + # without initialising a global provider do not see spurious + # warnings. Match by name only — never ep.load() a shadowing + # plug-in, or its module code would run during a built-in run. + if Provider.is_builtin(arguments.provider) and any( + ep.name == arguments.provider + for ep in importlib.metadata.entry_points(group="prowler.providers") + ): + logger.warning( + f"Plug-in provider '{arguments.provider}' registered " + f"via entry points is being IGNORED — a built-in with " + f"the same name exists. To use your plug-in, register " + f"it under a different name." ) - # Kept for downstream forks that may extend the dispatch below - # with their own custom built-in branches and reference this name. - # The upstream chain dispatches by `arguments.provider` directly. - provider_class_name = ( - f"{arguments.provider.capitalize()}Provider" # noqa: F841 - ) - fixer_config = load_and_validate_config_file( arguments.provider, arguments.fixer_config ) @@ -693,30 +676,69 @@ class Provider(ABC): Provider._ep_providers[name] = None return None + @staticmethod + def get_class(provider: str) -> type: + """Resolve the provider class for a name (built-in or entry-point). + + Does not call ``sys.exit`` and does not initialize the global + provider (it may populate the ``_ep_providers`` memoization cache). + Collision warnings are emitted by ``init_global_provider``, not here. + The caller handles errors (CLI exits; the API can return HTTP 400). + + Args: + provider: Provider name, e.g. ``"aws"`` or an external plug-in. + + Returns: + The provider class (a subclass of :class:`Provider`). + + Raises: + ImportError: If not found as built-in or entry point, a built-in's + transitive dependency is missing, or an entry point resolves to + an object that is not a subclass of :class:`Provider`. + """ + if Provider.is_builtin(provider): + provider_class_path = f"{providers_path}.{provider}.{provider}_provider" + provider_class_name = f"{provider.capitalize()}Provider" + # Let ImportError propagate — the caller decides whether to + # sys.exit (CLI) or return HTTP 400 (API). + module = import_module(provider_class_path) + try: + return getattr(module, provider_class_name) + except AttributeError as error: + # is_builtin already confirmed this is a built-in, so the + # module MUST define the expected class. A missing class is a + # broken built-in contract — raise rather than fall back to a + # same-named external plug-in, which would contradict + # is_builtin and silently return a foreign class. + raise ImportError( + f"Built-in provider '{provider}' module " + f"'{provider_class_path}' does not define expected class " + f"'{provider_class_name}'" + ) from error + + cls = Provider._load_ep_provider(provider) + if cls is None: + raise ImportError( + f"Provider '{provider}' not found as built-in or entry point" + ) + # ep.load() can return any object; enforce the public contract that + # get_class returns a Provider subclass. isinstance(cls, type) guards + # issubclass against a TypeError when cls is not a class at all. + if not (isinstance(cls, type) and issubclass(cls, Provider)): + raise ImportError( + f"Entry-point provider '{provider}' resolved to {cls!r}, " + f"which is not a subclass of Provider" + ) + return cls + @staticmethod def get_providers_help_text() -> dict: """Returns a dict of {provider_name: cli_help_text} for all available providers.""" help_text = {} for name in Provider.get_available_providers(): try: - # Try built-in first - module_path = f"{providers_path}.{name}.{name}_provider" - module = import_module(module_path) - cls = None - for attr_name in dir(module): - attr = getattr(module, attr_name) - if ( - isinstance(attr, type) - and issubclass(attr, Provider) - and attr is not Provider - ): - cls = attr - break - help_text[name] = getattr(cls, "_cli_help_text", "") if cls else "" - except ImportError: - # External provider — load via entry point - cls = Provider._load_ep_provider(name) - help_text[name] = getattr(cls, "_cli_help_text", "") if cls else "" + cls = Provider.get_class(name) + help_text[name] = getattr(cls, "_cli_help_text", "") except Exception as error: logger.warning( f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" diff --git a/tests/providers/external/test_dynamic_provider_loading.py b/tests/providers/external/test_dynamic_provider_loading.py index 6bd1e6a2ce..78e308597d 100644 --- a/tests/providers/external/test_dynamic_provider_loading.py +++ b/tests/providers/external/test_dynamic_provider_loading.py @@ -788,18 +788,22 @@ class TestInitGlobalProviderBuiltinDependencyFailure: assert cls is None @patch("prowler.providers.common.provider.import_module") + @patch("prowler.providers.common.provider.Provider.is_builtin") @patch("prowler.providers.common.provider.Provider.get_available_providers") - def test_get_providers_help_text_builtin_path(self, mock_providers, mock_import): + def test_get_providers_help_text_builtin_path( + self, mock_providers, mock_is_builtin, mock_import + ): """get_providers_help_text reads _cli_help_text from a built-in provider module.""" import types mock_providers.return_value = ["fakebuiltin"] + mock_is_builtin.return_value = True mock_cls = type( - "FakeBuiltinProvider", (Provider,), {"_cli_help_text": "Built-in Help"} + "FakebuiltinProvider", (Provider,), {"_cli_help_text": "Built-in Help"} ) mock_module = types.ModuleType("fake_module") - mock_module.FakeBuiltinProvider = mock_cls + mock_module.FakebuiltinProvider = mock_cls mock_import.return_value = mock_module help_text = Provider.get_providers_help_text() @@ -2423,3 +2427,340 @@ class TestComplianceTableDispatch: mock_generic.assert_called_once() Provider._global = None + + +# =========================================================================== +# 12. Provider.get_class — Public side-effect-free class resolver +# =========================================================================== + + +class TestGetClass: + """Tests for Provider.get_class(provider) — the public, side-effect-free + class resolver that unblocks the Django API and other callers that need + a provider class without triggering CLI side-effects (sys.exit, global + provider mutation).""" + + # ----------------------------------------------------------------------- + # T1: Built-in provider resolves to correct class + # ----------------------------------------------------------------------- + + def test_get_class_builtin_returns_correct_class(self): + """get_class('aws') returns AwsProvider — identity check.""" + from prowler.providers.aws.aws_provider import AwsProvider + + cls = Provider.get_class("aws") + + assert cls is AwsProvider + + # ----------------------------------------------------------------------- + # T2: External entry-point provider resolves + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.importlib.metadata.entry_points") + @patch("prowler.providers.common.provider.Provider.is_builtin") + def test_get_class_external_ep_returns_class(self, mock_is_builtin, mock_ep): + """get_class resolves an external entry-point provider and returns that class.""" + mock_is_builtin.return_value = False + mock_ep.return_value = [ + _make_entry_point( + "fakeexternal", "pkg:FakeExternalProvider", "prowler.providers" + ), + ] + mock_ep.return_value[0].load.return_value = FakeExternalProvider + + cls = Provider.get_class("fakeexternal") + + assert cls is FakeExternalProvider + + # ----------------------------------------------------------------------- + # T3: Unknown provider raises, does NOT call sys.exit + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.importlib.metadata.entry_points") + @patch("prowler.providers.common.provider.Provider.is_builtin") + def test_get_class_unknown_raises_and_does_not_sys_exit( + self, mock_is_builtin, mock_ep + ): + """get_class raises for an unknown provider and never calls sys.exit.""" + mock_is_builtin.return_value = False + mock_ep.return_value = [] + + # Assert ImportError specifically to enforce the public API contract + # (not a broad Exception). SystemExit belongs in init_global_provider's + # wrapper, not in the pure resolver. + with pytest.raises(ImportError): + Provider.get_class("totally_unknown_xyz_provider") + + # ----------------------------------------------------------------------- + # T4: get_class is PURE for built-ins — no collision warning, no EP call + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.logger") + @patch("prowler.providers.common.provider.Provider._load_ep_provider") + @patch("prowler.providers.common.provider.import_module") + @patch("prowler.providers.common.provider.Provider.is_builtin") + def test_get_class_builtin_with_ep_shadow_is_pure( + self, mock_is_builtin, mock_import, mock_load_ep, mock_logger + ): + """get_class for a built-in with a same-named EP is PURE: + - returns the built-in class + - does NOT emit a collision warning + - does NOT call _load_ep_provider (so _ep_providers cache stays empty for + this key, proving no side-effect) + """ + import types + + mock_is_builtin.return_value = True + mock_load_ep.return_value = FakeExternalProvider # plug-in shadow present + + fake_module = types.ModuleType("fake_builtin_module") + fake_builtin_cls = type("AwsProvider", (Provider,), {"_type": "aws"}) + fake_module.AwsProvider = fake_builtin_cls + mock_import.return_value = fake_module + + cls = Provider.get_class("aws") + + # Built-in class returned + assert cls is fake_builtin_cls + # No collision warning emitted — that is now init_global_provider's job + warning_msgs = [ + call.args[0] + for call in mock_logger.warning.call_args_list + if call.args and "IGNORED" in call.args[0] + ] + assert not warning_msgs, ( + "get_class must NOT emit a collision warning; " + "init_global_provider owns that responsibility" + ) + # _load_ep_provider must NOT have been called for the built-in path + mock_load_ep.assert_not_called() + # _ep_providers cache must not contain 'aws' (no side-effect) + assert "aws" not in Provider._ep_providers + + # ----------------------------------------------------------------------- + # T4b: Built-in module missing its expected class raises ImportError + # and does NOT fall back to a same-named entry point + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.Provider._load_ep_provider") + @patch("prowler.providers.common.provider.import_module") + @patch("prowler.providers.common.provider.Provider.is_builtin") + def test_get_class_builtin_missing_class_raises_importerror( + self, mock_is_builtin, mock_import, mock_load_ep + ): + """When is_builtin is True but the module does not define the expected + provider class, get_class raises ImportError and does NOT fall back to a + same-named entry point — falling back would contradict is_builtin and + silently return a foreign class.""" + import types + + mock_is_builtin.return_value = True + # Module imports fine but lacks the expected `Provider` attribute. + empty_module = types.ModuleType("empty_builtin_module") + mock_import.return_value = empty_module + + with pytest.raises(ImportError): + Provider.get_class("aws") + + # Must NOT fall back to entry points for a (broken) built-in. + mock_load_ep.assert_not_called() + + # ----------------------------------------------------------------------- + # T4c: Entry point resolving to a non-Provider class raises ImportError + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.importlib.metadata.entry_points") + @patch("prowler.providers.common.provider.Provider.is_builtin") + def test_get_class_external_ep_not_provider_subclass_raises_importerror( + self, mock_is_builtin, mock_ep + ): + """When an entry point resolves to an object that is not a Provider + subclass, get_class raises ImportError instead of returning it, so the + public contract (a Provider subclass) is enforced rather than trusted.""" + + class NotAProvider: + pass + + mock_is_builtin.return_value = False + mock_ep.return_value = [ + _make_entry_point("rogue", "pkg:NotAProvider", "prowler.providers"), + ] + mock_ep.return_value[0].load.return_value = NotAProvider + + with pytest.raises(ImportError): + Provider.get_class("rogue") + + # ----------------------------------------------------------------------- + # T4d: Contract — every built-in provider stays resolvable via get_class + # ----------------------------------------------------------------------- + + @pytest.mark.parametrize( + "provider", + [ + name + for name in Provider.get_available_providers() + if Provider.is_builtin(name) + ], + ) + def test_get_class_resolves_every_builtin_provider(self, provider): + """Contract test over all built-in providers: each one must remain + resolvable through get_class and return a Provider subclass whose name + follows the `{Capitalized}Provider` convention. This pins the naming + convention as the built-in resolution contract, so a future provider + that breaks it fails here instead of silently at runtime in a caller + (e.g. the API).""" + cls = Provider.get_class(provider) + + assert isinstance(cls, type) and issubclass(cls, Provider) + assert cls.__name__ == f"{provider.capitalize()}Provider" + + # ----------------------------------------------------------------------- + # T5: Regression — init_global_provider still resolves external correctly + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.load_and_validate_config_file") + @patch("prowler.providers.common.provider.Provider._load_ep_provider") + def test_init_global_provider_still_resolves_external_via_get_class( + self, mock_load_ep, mock_config + ): + """Regression: init_global_provider continues to work for external providers + after the class-resolution block is delegated to get_class. + + 'fakepure' is not a built-in, so is_builtin() returns False and get_class + takes the entry-point path. This verifies the FakePureContractProvider path + (pure from_cli_args returning an instance) still works — i.e., + init_global_provider correctly wires the returned instance as global provider. + """ + mock_load_ep.return_value = FakePureContractProvider + mock_config.return_value = {} + + args = Namespace( + provider="fakepure", + fixer_config="config.yaml", + config_file="config.yaml", + ) + + Provider._global = None + Provider.init_global_provider(args) + + assert isinstance(Provider._global, FakePureContractProvider) + Provider._global = None + + # ----------------------------------------------------------------------- + # T6: Regression — get_providers_help_text returns same text after refactor + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.Provider._load_ep_provider") + @patch("prowler.providers.common.provider.Provider.get_available_providers") + def test_get_providers_help_text_identical_after_refactor_external( + self, mock_providers, mock_load_ep + ): + """get_providers_help_text returns identical _cli_help_text for an external + provider both before and after the refactor to use get_class internally.""" + mock_providers.return_value = ["fakeexternal"] + mock_load_ep.return_value = FakeExternalProvider + + help_text = Provider.get_providers_help_text() + + # Must match the known _cli_help_text on FakeExternalProvider + assert help_text["fakeexternal"] == "Fake External Provider" + + @patch("prowler.providers.common.provider.import_module") + @patch("prowler.providers.common.provider.Provider.is_builtin") + @patch("prowler.providers.common.provider.Provider.get_available_providers") + def test_get_providers_help_text_identical_after_refactor_builtin( + self, mock_providers, mock_is_builtin, mock_import + ): + """get_providers_help_text returns identical _cli_help_text for a built-in + provider both before and after the refactor to use get_class internally. + is_builtin is mocked to True so get_class takes the built-in import path.""" + import types + + mock_providers.return_value = ["fakebuiltin"] + mock_is_builtin.return_value = True + mock_cls = type( + "FakebuiltinProvider", (Provider,), {"_cli_help_text": "Built-in Help"} + ) + mock_module = types.ModuleType("fake_module") + mock_module.FakebuiltinProvider = mock_cls + mock_import.return_value = mock_module + + help_text = Provider.get_providers_help_text() + + assert help_text["fakebuiltin"] == "Built-in Help" + + # ----------------------------------------------------------------------- + # T7: init_global_provider emits collision warning (not get_class) + # ----------------------------------------------------------------------- + + @patch("prowler.providers.common.provider.load_and_validate_config_file") + @patch("prowler.providers.common.provider.importlib.metadata.entry_points") + @patch("prowler.providers.common.provider.import_module") + @patch("prowler.providers.common.provider.Provider.is_builtin") + def test_init_global_provider_emits_collision_warning_for_builtin_ep_shadow( + self, mock_is_builtin, mock_import, mock_entry_points, mock_config, caplog + ): + """init_global_provider (not get_class) emits the collision warning + when a built-in provider has a same-named entry-point plug-in registered. + + This is the counterpart to test_get_class_builtin_with_ep_shadow_is_pure: + the warning responsibility moved OUT of get_class and INTO + init_global_provider, so users still see the message on CLI invocation + but prowler --help and API calls (which never hit init_global_provider) + do not spuriously emit it. The shadow is detected by entry-point name + only — the plug-in is never loaded to warn. + """ + import logging + import types + + mock_is_builtin.return_value = True + shadow_ep = MagicMock() + shadow_ep.name = "aws" # plug-in shadowing the built-in name + mock_entry_points.return_value = [shadow_ep] + + fake_module = types.ModuleType("fake_builtin_module") + fake_module.AwsProvider = MagicMock(side_effect=lambda **_kw: None) + mock_import.return_value = fake_module + mock_config.return_value = {} + + args = Namespace( + provider="aws", + fixer_config="config.yaml", + config_file="config.yaml", + aws_retries_max_attempts=3, + role=None, + session_duration=None, + external_id=None, + role_session_name=None, + mfa=None, + profile=None, + region=None, + excluded_region=None, + organizations_role=None, + scan_unused_services=False, + resource_tag=None, + resource_arn=None, + mutelist_file=None, + ) + + Provider._global = None + with caplog.at_level(logging.WARNING, logger="prowler"): + try: + Provider.init_global_provider(args) + except BaseException: + # AwsProvider mock is fake; dispatch may fail — only the + # warning emitted BEFORE dispatch matters here. + pass + Provider._global = None + + collision_warnings = [ + r.message + for r in caplog.records + if "Plug-in provider 'aws'" in r.message and "IGNORED" in r.message + ] + assert collision_warnings, ( + "init_global_provider must emit the collision warning when a " + "same-named EP plug-in exists for a built-in provider" + ) + # Shadow detected by name only — the plug-in is never loaded to warn. + shadow_ep.load.assert_not_called()