Files
prowler/tests/providers/stackit/stackit_provider_test.py
Johannes Engler 368d3a2661 feat(stackit): add objectstorage checks (#11397)
Co-authored-by: Hugo P.Brito <hugopbrit@gmail.com>
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-06-10 18:43:24 +02:00

479 lines
18 KiB
Python

"""Tests for StackIT Provider input validation."""
import types
from argparse import Namespace
from unittest.mock import patch
import pytest
import prowler.providers.common.provider as common_provider
import prowler.providers.stackit.stackit_provider as stackit_provider_module
from prowler.providers.common.models import Connection
from prowler.providers.stackit.exceptions.exceptions import (
StackITAPIError,
StackITInvalidProjectIdError,
StackITInvalidTokenError,
StackITNonExistentTokenError,
)
from prowler.providers.stackit.stackit_provider import StackitProvider
class TestStackITProviderValidation:
"""Test suite for StackIT Provider input validation."""
KEY_PATH = "/tmp/sa-key.json"
VALID_PROJECT_ID = "12345678-1234-1234-1234-123456789abc"
def test_validate_arguments_valid(self):
"""Test validation passes with a key path and a valid UUID."""
StackitProvider.validate_arguments(self.VALID_PROJECT_ID, self.KEY_PATH)
def test_validate_arguments_empty_key_path(self):
with pytest.raises(StackITNonExistentTokenError) as exc_info:
StackitProvider.validate_arguments(self.VALID_PROJECT_ID, "")
assert "service account credentials are required" in str(exc_info.value)
def test_validate_arguments_none_key_path(self):
with pytest.raises(StackITNonExistentTokenError) as exc_info:
StackitProvider.validate_arguments(self.VALID_PROJECT_ID, None)
assert "service account credentials are required" in str(exc_info.value)
def test_validate_arguments_whitespace_only_key_path(self):
with pytest.raises(StackITNonExistentTokenError) as exc_info:
StackitProvider.validate_arguments(self.VALID_PROJECT_ID, " ")
assert "service account credentials are required" in str(exc_info.value)
def test_validate_arguments_empty_project_id(self):
with pytest.raises(StackITInvalidProjectIdError) as exc_info:
StackitProvider.validate_arguments("", self.KEY_PATH)
assert "project ID is required" in str(exc_info.value)
def test_validate_arguments_none_project_id(self):
with pytest.raises(StackITInvalidProjectIdError) as exc_info:
StackitProvider.validate_arguments(None, self.KEY_PATH)
assert "project ID is required" in str(exc_info.value)
def test_validate_arguments_whitespace_only_project_id(self):
with pytest.raises(StackITInvalidProjectIdError) as exc_info:
StackitProvider.validate_arguments(" ", self.KEY_PATH)
assert "project ID is required" in str(exc_info.value)
def test_validate_arguments_invalid_uuid_format(self):
with pytest.raises(StackITInvalidProjectIdError) as exc_info:
StackitProvider.validate_arguments("not-a-valid-uuid", self.KEY_PATH)
assert "must be a valid UUID format" in str(exc_info.value)
assert "not-a-valid-uuid" in str(exc_info.value)
def test_validate_arguments_invalid_uuid_too_short(self):
with pytest.raises(StackITInvalidProjectIdError) as exc_info:
StackitProvider.validate_arguments("1234-5678", self.KEY_PATH)
assert "must be a valid UUID format" in str(exc_info.value)
def test_validate_arguments_uuid_without_hyphens(self):
"""Python's UUID() accepts compact form."""
StackitProvider.validate_arguments(
"12345678123412341234123456789abc", self.KEY_PATH
)
def test_validate_arguments_numeric_only_project_id(self):
with pytest.raises(StackITInvalidProjectIdError) as exc_info:
StackitProvider.validate_arguments("123456789012", self.KEY_PATH)
assert "must be a valid UUID format" in str(exc_info.value)
def test_validate_arguments_uuid_with_uppercase(self):
StackitProvider.validate_arguments(
"12345678-1234-1234-1234-123456789ABC", self.KEY_PATH
)
def test_validate_arguments_uuid_with_braces(self):
StackitProvider.validate_arguments(
"{12345678-1234-1234-1234-123456789abc}", self.KEY_PATH
)
def test_validate_arguments_uuid_v4_format(self):
StackitProvider.validate_arguments(
"550e8400-e29b-41d4-a716-446655440000", self.KEY_PATH
)
def test_validate_arguments_both_invalid(self):
"""Key path is checked first."""
with pytest.raises(StackITNonExistentTokenError):
StackitProvider.validate_arguments("not-a-uuid", "")
def test_validate_arguments_both_none(self):
"""Key path is checked first."""
with pytest.raises(StackITNonExistentTokenError):
StackitProvider.validate_arguments(None, None)
def test_validate_arguments_accepts_inline_key_without_path(self):
"""Inline key content alone is enough; the path can be omitted."""
StackitProvider.validate_arguments(
self.VALID_PROJECT_ID, None, '{"keyId": "abc"}'
)
def test_validate_arguments_accepts_inline_key_when_path_is_empty(self):
StackitProvider.validate_arguments(
self.VALID_PROJECT_ID, "", '{"keyId": "abc"}'
)
def test_validate_arguments_rejects_when_inline_key_is_whitespace(self):
with pytest.raises(StackITNonExistentTokenError):
StackitProvider.validate_arguments(self.VALID_PROJECT_ID, None, " ")
class TestStackITProviderInitialization:
def test_init_global_provider_passes_key_path_from_cli(self, monkeypatch):
"""The service account key path and inline key content from CLI args
are forwarded to the provider constructor; tokens are not part of
the API anymore."""
captured_kwargs = {}
class FakeStackitProvider:
def __init__(self, **kwargs):
captured_kwargs.update(kwargs)
fake_module = types.SimpleNamespace(StackitProvider=FakeStackitProvider)
arguments = Namespace(
provider="stackit",
stackit_project_id="12345678-1234-1234-1234-123456789abc",
stackit_service_account_key_path="/tmp/sa-key.json",
stackit_service_account_key='{"keyId": "abc"}',
stackit_region=None,
scan_unused_services=False,
config_file="config.yaml",
mutelist_file=None,
fixer_config="fixer_config.yaml",
)
monkeypatch.setattr(common_provider.Provider, "_global", None)
with (
patch.object(common_provider, "import_module", return_value=fake_module),
patch.object(
common_provider, "load_and_validate_config_file", return_value={}
),
):
common_provider.Provider.init_global_provider(arguments)
assert "api_token" not in captured_kwargs
assert captured_kwargs["project_id"] == arguments.stackit_project_id
assert (
captured_kwargs["service_account_key_path"]
== arguments.stackit_service_account_key_path
)
assert (
captured_kwargs["service_account_key"]
== arguments.stackit_service_account_key
)
assert captured_kwargs["scan_unused_services"] is False
def test_init_global_provider_passes_scan_unused_services_true(self, monkeypatch):
"""scan_unused_services=True is forwarded to the provider constructor."""
captured_kwargs = {}
class FakeStackitProvider:
def __init__(self, **kwargs):
captured_kwargs.update(kwargs)
fake_module = types.SimpleNamespace(StackitProvider=FakeStackitProvider)
arguments = Namespace(
provider="stackit",
stackit_project_id="12345678-1234-1234-1234-123456789abc",
stackit_service_account_key_path="/tmp/sa-key.json",
stackit_region=None,
scan_unused_services=True,
config_file="config.yaml",
mutelist_file=None,
fixer_config="fixer_config.yaml",
)
monkeypatch.setattr(common_provider.Provider, "_global", None)
with (
patch.object(common_provider, "import_module", return_value=fake_module),
patch.object(
common_provider, "load_and_validate_config_file", return_value={}
),
):
common_provider.Provider.init_global_provider(arguments)
assert captured_kwargs["scan_unused_services"] is True
class TestStackITProviderTestConnection:
KEY_PATH = "/tmp/sa-key.json"
KEY_CONTENT = '{"keyId": "abc", "publicKey": "..."}'
PROJECT_ID = "12345678-1234-1234-1234-123456789abc"
@pytest.fixture
def fake_stackit_resourcemanager(self, monkeypatch):
class FakeConfiguration:
def __init__(self, service_account_key_path=None, service_account_key=None):
self.service_account_key_path = service_account_key_path
self.service_account_key = service_account_key
class FakeDefaultApi:
error = None
calls = []
def __init__(self, config):
self.config = config
def get_project(self, id):
self.__class__.calls.append(
(
self.config.service_account_key_path,
self.config.service_account_key,
id,
)
)
if self.__class__.error:
raise self.__class__.error
return {"name": "Test Project"}
# The SDK is imported at module level, so patch the names bound in the
# provider module rather than sys.modules.
monkeypatch.setattr(stackit_provider_module, "Configuration", FakeConfiguration)
monkeypatch.setattr(
stackit_provider_module, "ResourceManagerDefaultApi", FakeDefaultApi
)
return FakeDefaultApi
def test_connection_success_with_key_path(self, fake_stackit_resourcemanager):
connection = StackitProvider.test_connection(
project_id=self.PROJECT_ID,
service_account_key_path=self.KEY_PATH,
)
assert connection == Connection(is_connected=True)
assert fake_stackit_resourcemanager.calls == [
(self.KEY_PATH, None, self.PROJECT_ID)
]
def test_connection_success_with_inline_key(self, fake_stackit_resourcemanager):
"""The inline key path takes precedence over the file path."""
connection = StackitProvider.test_connection(
project_id=self.PROJECT_ID,
service_account_key=self.KEY_CONTENT,
)
assert connection == Connection(is_connected=True)
assert fake_stackit_resourcemanager.calls == [
(None, self.KEY_CONTENT, self.PROJECT_ID)
]
def test_connection_inline_key_overrides_path(self, fake_stackit_resourcemanager):
connection = StackitProvider.test_connection(
project_id=self.PROJECT_ID,
service_account_key_path=self.KEY_PATH,
service_account_key=self.KEY_CONTENT,
)
assert connection == Connection(is_connected=True)
# When the inline key is present the SDK is configured with it and
# the key path is not passed on at all.
assert fake_stackit_resourcemanager.calls == [
(None, self.KEY_CONTENT, self.PROJECT_ID)
]
def test_connection_returns_error_when_raise_on_exception_is_false(
self, fake_stackit_resourcemanager
):
fake_stackit_resourcemanager.error = RuntimeError("denied")
connection = StackitProvider.test_connection(
project_id=self.PROJECT_ID,
service_account_key_path=self.KEY_PATH,
raise_on_exception=False,
)
assert isinstance(connection.error, StackITAPIError)
assert "Failed to connect to StackIT using Resource Manager" in str(
connection.error
)
def test_connection_raises_when_raise_on_exception_is_true(
self, fake_stackit_resourcemanager
):
fake_stackit_resourcemanager.error = RuntimeError("denied")
with pytest.raises(StackITAPIError):
StackitProvider.test_connection(
project_id=self.PROJECT_ID,
service_account_key_path=self.KEY_PATH,
raise_on_exception=True,
)
class TestStackITProviderGetProjectName:
@pytest.fixture
def fake_resourcemanager(self, monkeypatch):
class FakeConfiguration:
def __init__(self, service_account_key_path=None, service_account_key=None):
self.service_account_key_path = service_account_key_path
self.service_account_key = service_account_key
class FakeDefaultApi:
error = None
def __init__(self, config):
pass
def get_project(self, id):
if self.__class__.error:
raise self.__class__.error
return {"name": "My Project"}
# The SDK is imported at module level, so patch the names bound in the
# provider module rather than sys.modules.
monkeypatch.setattr(stackit_provider_module, "Configuration", FakeConfiguration)
monkeypatch.setattr(
stackit_provider_module, "ResourceManagerDefaultApi", FakeDefaultApi
)
return FakeDefaultApi
def _make_provider(self):
provider = object.__new__(StackitProvider)
provider._service_account_key_path = "/tmp/sa-key.json"
provider._service_account_key = None
provider._project_id = "12345678-1234-1234-1234-123456789abc"
return provider
def test_get_project_name_403_returns_empty_string_and_does_not_abort(
self, fake_resourcemanager
):
"""The Resource Manager lookup is cosmetic; a 403 there must NOT
abort the scan because the service account can legitimately hold
IaaS roles on the project without holding Resource Manager roles.
"""
class Http403Error(Exception):
status = 403
fake_resourcemanager.error = Http403Error()
provider = self._make_provider()
# Must not raise; returns an empty project name so the scan can
# continue and the IaaS service can decide whether *its* endpoints
# are forbidden too.
assert provider._get_project_name() == ""
def test_get_project_name_401_raises_invalid_token_error(
self, fake_resourcemanager
):
class Http401Error(Exception):
status = 401
fake_resourcemanager.error = Http401Error()
provider = self._make_provider()
with pytest.raises(StackITInvalidTokenError):
provider._get_project_name()
def test_get_project_name_other_error_returns_empty_string(
self, fake_resourcemanager
):
fake_resourcemanager.error = RuntimeError("network error")
provider = self._make_provider()
result = provider._get_project_name()
assert result == ""
class Test_StackitProvider_Handle_API_Error:
"""Direct coverage for ``StackitProvider.handle_api_error`` so 401 and 403
are treated as credential failures by every service call site, not just
``_get_project_name``.
"""
def _http(self, status_code: int) -> Exception:
err = Exception(f"http {status_code}")
err.status = status_code
return err
def test_401_raises_invalid_token_error(self):
with pytest.raises(StackITInvalidTokenError):
StackitProvider.handle_api_error(self._http(401))
def test_403_raises_invalid_token_error(self):
with pytest.raises(StackITInvalidTokenError):
StackitProvider.handle_api_error(self._http(403))
def test_other_http_status_is_reraised_unchanged(self):
original = self._http(500)
with pytest.raises(Exception) as excinfo:
StackitProvider.handle_api_error(original)
assert excinfo.value is original
def test_exception_without_status_is_reraised_unchanged(self):
original = RuntimeError("network error")
with pytest.raises(RuntimeError) as excinfo:
StackitProvider.handle_api_error(original)
assert excinfo.value is original
class TestGenerateRegionalClients:
"""Tests for StackitProvider.generate_regional_clients."""
def _make_provider(self):
provider = object.__new__(StackitProvider)
provider._service_account_key_path = "/tmp/sa-key.json"
provider._service_account_key = None
provider._audited_regions = None
return provider
def _fake_classes(self):
class FakeConfig:
pass
class FakeIaasClient:
def __init__(self, config):
pass
class FakeObjStorageClient:
def __init__(self, config):
pass
return FakeConfig, FakeIaasClient, FakeObjStorageClient
def test_objectstorage_service_uses_objectstorage_api_class(self, monkeypatch):
FakeConfig, FakeIaasClient, FakeObjStorageClient = self._fake_classes()
monkeypatch.setattr(
StackitProvider,
"_SERVICE_API_CLASS",
{"iaas": FakeIaasClient, "objectstorage": FakeObjStorageClient},
)
provider = self._make_provider()
monkeypatch.setattr(
provider, "get_available_service_regions", lambda _s, _r: ["eu01"]
)
with patch.object(
StackitProvider, "_build_sdk_configuration", return_value=FakeConfig()
):
clients = provider.generate_regional_clients("objectstorage")
assert "eu01" in clients
assert isinstance(clients["eu01"], FakeObjStorageClient)
def test_iaas_service_uses_iaas_api_class(self, monkeypatch):
FakeConfig, FakeIaasClient, FakeObjStorageClient = self._fake_classes()
monkeypatch.setattr(
StackitProvider,
"_SERVICE_API_CLASS",
{"iaas": FakeIaasClient, "objectstorage": FakeObjStorageClient},
)
provider = self._make_provider()
monkeypatch.setattr(
provider, "get_available_service_regions", lambda _s, _r: ["eu01"]
)
with patch.object(
StackitProvider, "_build_sdk_configuration", return_value=FakeConfig()
):
clients = provider.generate_regional_clients("iaas")
assert "eu01" in clients
assert isinstance(clients["eu01"], FakeIaasClient)