feat(stackit): add objectstorage checks (#11397)

Co-authored-by: Hugo P.Brito <hugopbrit@gmail.com>
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
This commit is contained in:
Johannes Engler
2026-06-10 18:43:24 +02:00
committed by GitHub
parent 3c8fde25ee
commit 368d3a2661
23 changed files with 1671 additions and 2 deletions
+1 -1
View File
@@ -122,7 +122,7 @@ Every AWS provider scan will enqueue an Attack Paths ingestion job automatically
| Vercel | 26 | 6 | 0 | 8 | Official | UI, API, CLI | | Vercel | 26 | 6 | 0 | 8 | Official | UI, API, CLI |
| Okta | 1 | 1 | 0 | 1 | Official | CLI | | Okta | 1 | 1 | 0 | 1 | Official | CLI |
| Scaleway [Contact us](https://prowler.com/contact) | 1 | 1 | 0 | 1 | Unofficial | CLI | | Scaleway [Contact us](https://prowler.com/contact) | 1 | 1 | 0 | 1 | Unofficial | CLI |
| StackIT [Contact us](https://prowler.com/contact) | 4 | 1 | 0 | 1 | Unofficial | CLI | | StackIT [Contact us](https://prowler.com/contact) | 7 | 2 | 0 | 3 | Unofficial | CLI |
| NHN | 6 | 2 | 1 | 0 | Unofficial | CLI | | NHN | 6 | 2 | 1 | 0 | Unofficial | CLI |
> [!Note] > [!Note]
+1
View File
@@ -21,6 +21,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `kms_key_rotation_max_90_days` check for GCP provider, verifying KMS customer-managed keys are rotated every 90 days or less in line with the CIS Benchmark [(#11516)](https://github.com/prowler-cloud/prowler/pull/11516) - `kms_key_rotation_max_90_days` check for GCP provider, verifying KMS customer-managed keys are rotated every 90 days or less in line with the CIS Benchmark [(#11516)](https://github.com/prowler-cloud/prowler/pull/11516)
- `exchange_mailbox_primary_smtp_uses_custom_domain` check for M365 provider [(#11215)](https://github.com/prowler-cloud/prowler/pull/11215) - `exchange_mailbox_primary_smtp_uses_custom_domain` check for M365 provider [(#11215)](https://github.com/prowler-cloud/prowler/pull/11215)
- `bedrock_agent_role_least_privilege` check for AWS provider, flagging Bedrock Agent execution roles with full-access managed policies, broad `Resource:*` inline statements, or missing permissions boundaries [(#11335)](https://github.com/prowler-cloud/prowler/pull/11335) - `bedrock_agent_role_least_privilege` check for AWS provider, flagging Bedrock Agent execution roles with full-access managed policies, broad `Resource:*` inline statements, or missing permissions boundaries [(#11335)](https://github.com/prowler-cloud/prowler/pull/11335)
- STACKIT ObjectStorage service with Object Lock, default retention policy, and access key expiration checks [(#11397)](https://github.com/prowler-cloud/prowler/pull/11397)
### 🐞 Fixed ### 🐞 Fixed
@@ -0,0 +1,37 @@
{
"Provider": "stackit",
"CheckID": "objectstorage_access_key_expiration",
"CheckTitle": "ObjectStorage access keys should have an expiration date",
"CheckType": [],
"ServiceName": "objectstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "**ObjectStorage access keys** should have an explicit expiration date. Long-lived credentials increase the blast radius of a credential compromise because they cannot expire on their own. Setting an expiration date enforces periodic rotation and limits the exposure window if a key is leaked.",
"Risk": "If an **ObjectStorage access key** is leaked, stolen, or forgotten without an expiration date, it remains usable indefinitely. An attacker can retain persistent access to object storage resources until the key is manually revoked.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.stackit.cloud/products/storage/object-storage/",
"https://docs.stackit.cloud/products/storage/object-storage/how-tos/create-and-delete-object-storage-credentials/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. In the STACKIT Portal navigate to Object Storage > Access Keys. 2. Delete the non-expiring access key. 3. Create a new access key with an expiration date appropriate for your rotation policy (e.g. 90 days). 4. Update all applications and services that use the old key with the new credentials.",
"Terraform": ""
},
"Recommendation": {
"Text": "Create **ObjectStorage access keys** with an explicit expiration date and establish a rotation process. Delete non-expiring keys and replace them with time-limited credentials. A rotation period of **90 days or less** is recommended.",
"Url": "https://hub.prowler.com/check/objectstorage_access_key_expiration"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Access keys are scoped to credentials groups. This check evaluates all access keys across all credentials groups in the project."
}
@@ -0,0 +1,27 @@
from prowler.lib.check.models import Check, CheckReportStackIT
from prowler.providers.stackit.services.objectstorage.objectstorage_client import (
objectstorage_client,
)
class objectstorage_access_key_expiration(Check):
def execute(self):
findings = []
for key in objectstorage_client.access_keys:
report = CheckReportStackIT(
metadata=self.metadata(),
resource=key,
)
report.resource_id = key.key_id
report.resource_name = key.display_name
report.location = key.region
if key.has_expiration():
report.status = "PASS"
report.status_extended = f"Access key {key.display_name} has an expiration date set ({key.expires})."
else:
report.status = "FAIL"
report.status_extended = f"Access key {key.display_name} has no expiration date and never rotates."
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "stackit",
"CheckID": "objectstorage_bucket_object_lock_enabled",
"CheckTitle": "ObjectStorage buckets should have S3 Object Lock enabled",
"CheckType": [],
"ServiceName": "objectstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "storage",
"Description": "**S3 Object Lock** prevents objects from being deleted or overwritten for a fixed period or indefinitely. Enabling it protects against accidental deletion and ransomware by enforcing a **write-once-read-many (WORM)** model. Object Lock can only be enabled when the bucket is created.",
"Risk": "Without **Object Lock**, objects can be deleted or overwritten at any time, increasing the risk of data loss from accidental deletion, malicious actors, or ransomware. Backups and compliance data are particularly vulnerable.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.stackit.cloud/products/storage/object-storage/",
"https://docs.stackit.cloud/products/storage/object-storage/how-tos/object-lock-bucket/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Object Lock must be enabled at bucket creation time and cannot be enabled on an existing bucket. Create a new bucket with Object Lock enabled and migrate your data to it.",
"Terraform": ""
},
"Recommendation": {
"Text": "Create **ObjectStorage buckets** with S3 Object Lock enabled for workloads that require data immutability, compliance archiving, or ransomware protection. Object Lock cannot be retroactively enabled on existing buckets.",
"Url": "https://hub.prowler.com/check/objectstorage_bucket_object_lock_enabled"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Object Lock can only be activated at bucket creation. Buckets without Object Lock are not necessarily misconfigured — evaluate based on the sensitivity and compliance requirements of the stored data."
}
@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, CheckReportStackIT
from prowler.providers.stackit.services.objectstorage.objectstorage_client import (
objectstorage_client,
)
class objectstorage_bucket_object_lock_enabled(Check):
def execute(self):
findings = []
for bucket in objectstorage_client.buckets:
report = CheckReportStackIT(
metadata=self.metadata(),
resource=bucket,
)
report.resource_id = bucket.name
report.resource_name = bucket.name
report.location = bucket.region
if bucket.object_lock_enabled:
report.status = "PASS"
report.status_extended = (
f"Bucket {bucket.name} has S3 Object Lock enabled."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Bucket {bucket.name} does not have S3 Object Lock enabled."
)
findings.append(report)
return findings
@@ -0,0 +1,39 @@
{
"Provider": "stackit",
"CheckID": "objectstorage_bucket_retention_policy",
"CheckTitle": "ObjectStorage buckets should have a default retention policy configured",
"CheckType": [],
"ServiceName": "objectstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "storage",
"Description": "An **ObjectStorage default retention policy** automatically applies a minimum retention period to every object uploaded to the bucket, preventing deletion or overwriting before the period expires. Without it, objects can be removed immediately after upload, undermining compliance and data durability requirements.",
"Risk": "Buckets without a **default retention policy** offer no automatic protection against premature object deletion. Compliance data, audit logs, and backups may be deleted before their required retention period elapses.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.stackit.cloud/products/storage/object-storage/",
"https://docs.stackit.cloud/products/storage/object-storage/how-tos/object-lock-default-retention/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Use the STACKIT Object Storage API or Portal to set a default retention policy on the bucket. Choose COMPLIANCE mode for strict immutability or GOVERNANCE mode to allow privileged users to override the policy.",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure a **default retention policy** on every bucket that stores compliance-relevant or sensitive data. Choose `COMPLIANCE` mode for regulatory requirements and `GOVERNANCE` mode when administrative overrides are acceptable.",
"Url": "https://hub.prowler.com/check/objectstorage_bucket_retention_policy"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [
"objectstorage_bucket_object_lock_enabled"
],
"Notes": "A default retention policy requires Object Lock to be enabled on the bucket. Buckets without Object Lock cannot have a retention policy."
}
@@ -0,0 +1,30 @@
from prowler.lib.check.models import Check, CheckReportStackIT
from prowler.providers.stackit.services.objectstorage.objectstorage_client import (
objectstorage_client,
)
class objectstorage_bucket_retention_policy(Check):
def execute(self):
findings = []
for bucket in objectstorage_client.buckets:
report = CheckReportStackIT(
metadata=self.metadata(),
resource=bucket,
)
report.resource_id = bucket.name
report.resource_name = bucket.name
report.location = bucket.region
if bucket.retention_days and bucket.retention_days > 0:
report.status = "PASS"
report.status_extended = (
f"Bucket {bucket.name} has a default retention policy of "
f"{bucket.retention_days} day(s) in {bucket.retention_mode} mode."
)
else:
report.status = "FAIL"
report.status_extended = f"Bucket {bucket.name} does not have a default retention policy configured."
findings.append(report)
return findings
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.stackit.services.objectstorage.objectstorage_service import (
ObjectStorageService,
)
objectstorage_client = ObjectStorageService(Provider.get_global_provider())
@@ -0,0 +1,306 @@
import json
from datetime import datetime, timezone
from typing import Optional
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
from prowler.providers.stackit.stackit_provider import StackitProvider, suppress_stderr
class ObjectStorageService:
def __init__(self, provider: StackitProvider):
self.provider = provider
self.project_id = provider.identity.project_id
self.regional_clients = provider.generate_regional_clients("objectstorage")
self.buckets: list[Bucket] = []
self.access_keys: list[AccessKey] = []
self._fetch_all_regions()
def _fetch_all_regions(self):
for region, client in self.regional_clients.items():
try:
self._list_buckets(client, region)
self._list_access_keys(client, region)
except Exception as error:
if getattr(error, "status", None) == 404:
logger.info(
f"StackIT project {self.project_id} has no ObjectStorage "
f"presence in region {region}; skipping."
)
continue
raise
def _handle_api_call(self, api_function, *args, **kwargs):
try:
with suppress_stderr():
return api_function(*args, **kwargs)
except Exception as e:
self.provider.handle_api_error(e)
raise
def _list_buckets(self, client, region: str):
response = self._handle_api_call(
client.list_buckets, project_id=self.project_id, region=region
)
buckets_list = getattr(response, "buckets", None) or []
if isinstance(response, dict):
buckets_list = response.get("buckets", [])
for bucket_data in buckets_list:
try:
if hasattr(bucket_data, "name"):
name = bucket_data.name
object_lock_enabled = getattr(
bucket_data, "object_lock_enabled", False
)
elif isinstance(bucket_data, dict):
name = bucket_data.get("name", "")
object_lock_enabled = bucket_data.get("objectLockEnabled", False)
else:
continue
except Exception as e:
logger.error(f"Error processing bucket: {e}")
continue
retention_days, retention_mode = self._get_default_retention(
client, region, name
)
self.buckets.append(
Bucket(
name=name,
region=region,
project_id=self.project_id,
object_lock_enabled=object_lock_enabled,
retention_days=retention_days,
retention_mode=retention_mode,
)
)
logger.info(f"Listed {len(buckets_list)} buckets in {region}")
def _get_default_retention(
self, client, region: str, bucket_name: str
) -> tuple[Optional[int], Optional[str]]:
try:
response = self._handle_api_call(
client.get_default_retention,
project_id=self.project_id,
region=region,
bucket_name=bucket_name,
)
days = getattr(response, "days", None)
mode = getattr(response, "mode", None)
if isinstance(response, dict):
days = response.get("days")
mode = response.get("mode")
return days, str(mode) if mode else None
except Exception as e:
if getattr(e, "status", None) == 404:
return None, None
raise
def _list_access_keys(self, client, region: str):
credentials_groups_response = self._handle_api_call(
client.list_credentials_groups, project_id=self.project_id, region=region
)
credentials_groups = (
getattr(credentials_groups_response, "credentials_groups", None) or []
)
if isinstance(credentials_groups_response, dict):
credentials_groups = credentials_groups_response.get(
"credentialsGroups",
credentials_groups_response.get("credentials_groups", []),
)
total_keys = 0
for credentials_group_data in credentials_groups:
try:
if isinstance(credentials_group_data, dict):
credentials_group_id = credentials_group_data.get(
"id",
credentials_group_data.get(
"groupId",
credentials_group_data.get("credentialsGroupId", ""),
),
)
credentials_group_name = credentials_group_data.get(
"displayName",
credentials_group_data.get("name", credentials_group_id),
)
else:
credentials_group_id = (
getattr(credentials_group_data, "id", None)
or getattr(credentials_group_data, "group_id", None)
or getattr(credentials_group_data, "credentials_group_id", "")
)
credentials_group_name = getattr(
credentials_group_data,
"display_name",
getattr(credentials_group_data, "name", credentials_group_id),
)
except Exception as e:
logger.error(f"Error processing credentials group: {e}")
continue
if not credentials_group_id:
continue
response = self._list_access_keys_response(
client, region, credentials_group_id
)
keys_list = self._extract_access_keys(response)
for key_data in keys_list:
try:
if hasattr(key_data, "key_id"):
key_id = key_data.key_id
display_name = getattr(key_data, "display_name", key_id)
expires = getattr(key_data, "expires", None)
elif isinstance(key_data, dict):
key_id = key_data.get("keyId", key_data.get("key_id", ""))
display_name = key_data.get(
"displayName", key_data.get("display_name", key_id)
)
expires = key_data.get("expires")
else:
continue
if not key_id:
continue
self.access_keys.append(
AccessKey(
key_id=key_id,
display_name=display_name,
expires=expires,
region=region,
project_id=self.project_id,
credentials_group_id=credentials_group_id,
credentials_group_name=credentials_group_name,
)
)
except Exception as e:
logger.error(f"Error processing access key: {e}")
continue
total_keys += len(keys_list)
logger.info(f"Listed {total_keys} access keys in {region}")
def _list_access_keys_response(
self, client, region: str, credentials_group_id: str
):
raw_method = None
if callable(
getattr(type(client), "list_access_keys_without_preload_content", None)
):
raw_method = client.list_access_keys_without_preload_content
elif callable(vars(client).get("list_access_keys_without_preload_content")):
raw_method = vars(client)["list_access_keys_without_preload_content"]
if raw_method:
response = self._handle_api_call(
raw_method,
project_id=self.project_id,
region=region,
credentials_group=credentials_group_id,
)
self._raise_for_raw_response_status(response)
return response
return self._handle_api_call(
client.list_access_keys,
project_id=self.project_id,
region=region,
credentials_group=credentials_group_id,
)
def _raise_for_raw_response_status(self, response):
status = getattr(response, "status", None)
if status is None:
status = getattr(response, "status_code", None)
if isinstance(status, int) and status >= 400:
error = Exception(
f"StackIT ObjectStorage list_access_keys failed with status {status}"
)
error.status = status
self.provider.handle_api_error(error)
raise error
@staticmethod
def _extract_access_keys(response) -> list:
payload = response
if not isinstance(payload, (dict, list)):
json_method = getattr(response, "json", None)
if callable(json_method):
payload = json_method()
elif hasattr(response, "data"):
payload = ObjectStorageService._parse_raw_json(response.data)
elif hasattr(response, "text"):
payload = ObjectStorageService._parse_raw_json(response.text)
if isinstance(payload, dict):
return payload.get("accessKeys", payload.get("access_keys", []))
if isinstance(payload, list):
return payload
return getattr(response, "access_keys", None) or []
@staticmethod
def _parse_raw_json(raw):
if raw in (None, b"", ""):
return {}
if isinstance(raw, (bytes, bytearray)):
raw = raw.decode("utf-8")
if isinstance(raw, str):
return json.loads(raw)
return raw
class Bucket(BaseModel):
name: str
region: str
project_id: str
object_lock_enabled: bool = False
retention_days: Optional[int] = None
retention_mode: Optional[str] = None
class AccessKey(BaseModel):
key_id: str
display_name: str
# None or a sentinel year-0001 date string means the key never expires.
expires: Optional[str] = None
region: str
project_id: str
credentials_group_id: Optional[str] = None
credentials_group_name: Optional[str] = None
def has_expiration(self) -> bool:
"""Return True if the key has a real (non-sentinel) expiration date."""
if not self.expires:
return False
try:
expires_str = self.expires.replace("Z", "+00:00")
dt = datetime.fromisoformat(expires_str)
# Year 0001 (or earlier) is the SDK sentinel for "never expires"
return dt.year > 1
except (ValueError, AttributeError):
return False
def expires_within_days(self, days: int) -> bool:
"""Return True if the key expires within the given number of days from now."""
if not self.has_expiration():
return False
expires_str = self.expires.replace("Z", "+00:00")
dt = datetime.fromisoformat(expires_str)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc)
delta = dt - datetime.now(tz=timezone.utc)
return delta.days <= days
@@ -15,6 +15,7 @@ from colorama import Style
# loader and surfacing as a misleading empty report. # loader and surfacing as a misleading empty report.
from stackit.core.configuration import Configuration from stackit.core.configuration import Configuration
from stackit.iaas import DefaultApi as IaasDefaultApi from stackit.iaas import DefaultApi as IaasDefaultApi
from stackit.objectstorage import DefaultApi as ObjectStorageDefaultApi
from stackit.resourcemanager import DefaultApi as ResourceManagerDefaultApi from stackit.resourcemanager import DefaultApi as ResourceManagerDefaultApi
from prowler.config.config import ( from prowler.config.config import (
@@ -224,11 +225,17 @@ class StackitProvider(Provider):
return json_regions.intersection(audited_regions) return json_regions.intersection(audited_regions)
return json_regions return json_regions
_SERVICE_API_CLASS = {
"iaas": IaasDefaultApi,
"objectstorage": ObjectStorageDefaultApi,
}
def generate_regional_clients(self, service: str = "iaas") -> dict: def generate_regional_clients(self, service: str = "iaas") -> dict:
"""Generate regional API clients for the given service. """Generate regional API clients for the given service.
Returns dict: {"eu01": DefaultApi_client, "eu02": DefaultApi_client} Returns dict: {"eu01": DefaultApi_client, "eu02": DefaultApi_client}
""" """
api_class = self._SERVICE_API_CLASS.get(service, IaasDefaultApi)
regional_clients = {} regional_clients = {}
service_regions = self.get_available_service_regions( service_regions = self.get_available_service_regions(
service, self._audited_regions service, self._audited_regions
@@ -240,7 +247,7 @@ class StackitProvider(Provider):
self._service_account_key_path, self._service_account_key_path,
self._service_account_key, self._service_account_key,
) )
client = IaasDefaultApi(config) client = api_class(config)
client.region = region # Attach region attribute client.region = region # Attach region attribute
regional_clients[region] = client regional_clients[region] = client
@@ -5,6 +5,12 @@
"eu01", "eu01",
"eu02" "eu02"
] ]
},
"objectstorage": {
"regions": [
"eu01",
"eu02"
]
} }
} }
} }
+1
View File
@@ -95,6 +95,7 @@ dependencies = [
"slack-sdk==3.39.0", "slack-sdk==3.39.0",
"stackit-core==0.2.0", "stackit-core==0.2.0",
"stackit-iaas==1.4.0", "stackit-iaas==1.4.0",
"stackit-objectstorage==1.4.0",
"stackit-resourcemanager==0.8.0", "stackit-resourcemanager==0.8.0",
"tabulate==0.9.0", "tabulate==0.9.0",
"tzlocal==5.3.1", "tzlocal==5.3.1",
@@ -0,0 +1,150 @@
from unittest import mock
from prowler.providers.stackit.services.objectstorage.objectstorage_service import (
AccessKey,
)
from tests.providers.stackit.stackit_fixtures import (
STACKIT_PROJECT_ID,
set_mocked_stackit_provider,
)
class Test_objectstorage_access_key_expiration:
def test_no_access_keys(self):
objectstorage_client = mock.MagicMock
objectstorage_client.access_keys = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_access_key_expiration.objectstorage_access_key_expiration import (
objectstorage_access_key_expiration,
)
check = objectstorage_access_key_expiration()
result = check.execute()
assert len(result) == 0
def test_access_key_with_expiration(self):
objectstorage_client = mock.MagicMock
objectstorage_client.access_keys = [
AccessKey(
key_id="key-123",
display_name="my-key",
expires="2027-01-01T00:00:00+00:00",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_access_key_expiration.objectstorage_access_key_expiration import (
objectstorage_access_key_expiration,
)
check = objectstorage_access_key_expiration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "has an expiration date set" in result[0].status_extended
assert result[0].resource_id == "key-123"
assert result[0].resource_name == "my-key"
assert result[0].location == "eu01"
def test_access_key_no_expiration_none(self):
objectstorage_client = mock.MagicMock
objectstorage_client.access_keys = [
AccessKey(
key_id="key-456",
display_name="never-expiring-key",
expires=None,
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_access_key_expiration.objectstorage_access_key_expiration import (
objectstorage_access_key_expiration,
)
check = objectstorage_access_key_expiration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "no expiration date" in result[0].status_extended
assert result[0].resource_id == "key-456"
def test_access_key_no_expiration_sentinel(self):
"""Year-0001 date is the SDK sentinel for 'never expires'."""
objectstorage_client = mock.MagicMock
objectstorage_client.access_keys = [
AccessKey(
key_id="key-789",
display_name="sentinel-key",
expires="0001-01-01T00:00:00+00:00",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_access_key_expiration.objectstorage_access_key_expiration import (
objectstorage_access_key_expiration,
)
check = objectstorage_access_key_expiration()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "no expiration date" in result[0].status_extended
@@ -0,0 +1,111 @@
from unittest import mock
from prowler.providers.stackit.services.objectstorage.objectstorage_service import (
Bucket,
)
from tests.providers.stackit.stackit_fixtures import (
STACKIT_PROJECT_ID,
set_mocked_stackit_provider,
)
class Test_objectstorage_bucket_object_lock_enabled:
def test_no_buckets(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_object_lock_enabled.objectstorage_bucket_object_lock_enabled import (
objectstorage_bucket_object_lock_enabled,
)
check = objectstorage_bucket_object_lock_enabled()
result = check.execute()
assert len(result) == 0
def test_bucket_object_lock_enabled(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = [
Bucket(
name="my-bucket",
region="eu01",
project_id=STACKIT_PROJECT_ID,
object_lock_enabled=True,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_object_lock_enabled.objectstorage_bucket_object_lock_enabled import (
objectstorage_bucket_object_lock_enabled,
)
check = objectstorage_bucket_object_lock_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "has S3 Object Lock enabled" in result[0].status_extended
assert result[0].resource_id == "my-bucket"
assert result[0].resource_name == "my-bucket"
assert result[0].location == "eu01"
def test_bucket_object_lock_disabled(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = [
Bucket(
name="my-bucket",
region="eu01",
project_id=STACKIT_PROJECT_ID,
object_lock_enabled=False,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_object_lock_enabled.objectstorage_bucket_object_lock_enabled import (
objectstorage_bucket_object_lock_enabled,
)
check = objectstorage_bucket_object_lock_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "does not have S3 Object Lock enabled" in result[0].status_extended
assert result[0].resource_id == "my-bucket"
@@ -0,0 +1,153 @@
from unittest import mock
from prowler.providers.stackit.services.objectstorage.objectstorage_service import (
Bucket,
)
from tests.providers.stackit.stackit_fixtures import (
STACKIT_PROJECT_ID,
set_mocked_stackit_provider,
)
class Test_objectstorage_bucket_retention_policy:
def test_no_buckets(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_retention_policy.objectstorage_bucket_retention_policy import (
objectstorage_bucket_retention_policy,
)
check = objectstorage_bucket_retention_policy()
result = check.execute()
assert len(result) == 0
def test_bucket_with_retention_policy(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = [
Bucket(
name="my-bucket",
region="eu01",
project_id=STACKIT_PROJECT_ID,
object_lock_enabled=True,
retention_days=30,
retention_mode="COMPLIANCE",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_retention_policy.objectstorage_bucket_retention_policy import (
objectstorage_bucket_retention_policy,
)
check = objectstorage_bucket_retention_policy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "30 day(s)" in result[0].status_extended
assert "COMPLIANCE" in result[0].status_extended
assert result[0].resource_id == "my-bucket"
assert result[0].location == "eu01"
def test_bucket_without_retention_policy(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = [
Bucket(
name="my-bucket",
region="eu01",
project_id=STACKIT_PROJECT_ID,
object_lock_enabled=False,
retention_days=None,
retention_mode=None,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_retention_policy.objectstorage_bucket_retention_policy import (
objectstorage_bucket_retention_policy,
)
check = objectstorage_bucket_retention_policy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"does not have a default retention policy" in result[0].status_extended
)
assert result[0].resource_id == "my-bucket"
def test_bucket_retention_zero_days(self):
objectstorage_client = mock.MagicMock
objectstorage_client.buckets = [
Bucket(
name="my-bucket",
region="eu01",
project_id=STACKIT_PROJECT_ID,
object_lock_enabled=True,
retention_days=0,
retention_mode="GOVERNANCE",
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_stackit_provider(),
),
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_service.ObjectStorageService",
new=objectstorage_client,
) as service_client,
mock.patch(
"prowler.providers.stackit.services.objectstorage.objectstorage_client.objectstorage_client",
new=service_client,
),
):
from prowler.providers.stackit.services.objectstorage.objectstorage_bucket_retention_policy.objectstorage_bucket_retention_policy import (
objectstorage_bucket_retention_policy,
)
check = objectstorage_bucket_retention_policy()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
@@ -0,0 +1,645 @@
from types import SimpleNamespace
from unittest import mock
import pytest
from prowler.providers.stackit.services.objectstorage.objectstorage_service import (
AccessKey,
ObjectStorageService,
)
from tests.providers.stackit.stackit_fixtures import STACKIT_PROJECT_ID
class TestObjectStorageService:
def test_list_buckets_keeps_bucket_when_retention_not_configured(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
not_found_error = Exception("not found")
not_found_error.status = 404
client = mock.MagicMock()
client.list_buckets.return_value = SimpleNamespace(
buckets=[
SimpleNamespace(
name="my-bucket",
object_lock_enabled=True,
)
]
)
client.get_default_retention.side_effect = not_found_error
service._list_buckets(client, "eu01")
assert len(service.buckets) == 1
assert service.buckets[0].name == "my-bucket"
assert service.buckets[0].object_lock_enabled is True
assert service.buckets[0].retention_days is None
assert service.buckets[0].retention_mode is None
def test_list_buckets_propagates_unexpected_retention_api_errors(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
api_error = Exception("service unavailable")
api_error.status = 503
client = mock.MagicMock()
client.list_buckets.return_value = SimpleNamespace(
buckets=[
SimpleNamespace(
name="my-bucket",
object_lock_enabled=True,
)
]
)
client.get_default_retention.side_effect = api_error
with pytest.raises(Exception, match="service unavailable"):
service._list_buckets(client, "eu01")
assert service.buckets == []
service.provider.handle_api_error.assert_called_once_with(api_error)
def test_init_creates_service_with_no_regions(self):
provider = mock.MagicMock()
provider.identity.project_id = STACKIT_PROJECT_ID
provider.generate_regional_clients.return_value = {}
service = ObjectStorageService(provider)
assert service.project_id == STACKIT_PROJECT_ID
assert service.buckets == []
assert service.access_keys == []
provider.generate_regional_clients.assert_called_once_with("objectstorage")
def test_fetch_all_regions_skips_404_region(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
service.access_keys = []
not_found = Exception("not found")
not_found.status = 404
service.regional_clients = {"eu01": mock.MagicMock()}
with mock.patch.object(service, "_list_buckets", side_effect=not_found):
service._fetch_all_regions()
assert service.buckets == []
def test_fetch_all_regions_reraises_non_404_error(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
service.access_keys = []
server_error = Exception("internal server error")
server_error.status = 500
service.regional_clients = {"eu01": mock.MagicMock()}
with mock.patch.object(service, "_list_buckets", side_effect=server_error):
with pytest.raises(Exception, match="internal server error"):
service._fetch_all_regions()
def test_list_buckets_with_dict_api_response(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
not_found = Exception("not found")
not_found.status = 404
client = mock.MagicMock()
client.list_buckets.return_value = {
"buckets": [
SimpleNamespace(name="dict-response-bucket", object_lock_enabled=True)
]
}
client.get_default_retention.side_effect = not_found
service._list_buckets(client, "eu01")
assert len(service.buckets) == 1
assert service.buckets[0].name == "dict-response-bucket"
def test_list_buckets_with_dict_bucket_data(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
not_found = Exception("not found")
not_found.status = 404
client = mock.MagicMock()
client.list_buckets.return_value = SimpleNamespace(
buckets=[{"name": "dict-bucket", "objectLockEnabled": True}]
)
client.get_default_retention.side_effect = not_found
service._list_buckets(client, "eu01")
assert len(service.buckets) == 1
assert service.buckets[0].name == "dict-bucket"
assert service.buckets[0].object_lock_enabled is True
def test_list_buckets_skips_unknown_bucket_type(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
client = mock.MagicMock()
client.list_buckets.return_value = SimpleNamespace(buckets=[42])
service._list_buckets(client, "eu01")
assert len(service.buckets) == 0
def test_get_default_retention_with_dict_response(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
client = mock.MagicMock()
client.get_default_retention.return_value = {"days": 14, "mode": "GOVERNANCE"}
days, mode = service._get_default_retention(client, "eu01", "my-bucket")
assert days == 14
assert mode == "GOVERNANCE"
def test_list_access_keys_with_object_data(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-001", display_name="main-group")]
)
client.list_access_keys.return_value = SimpleNamespace(
access_keys=[
SimpleNamespace(
key_id="key-001",
display_name="my-key",
expires="2027-01-01T00:00:00+00:00",
)
]
)
service._list_access_keys(client, "eu01")
client.list_credentials_groups.assert_called_once_with(
project_id=STACKIT_PROJECT_ID, region="eu01"
)
client.list_access_keys.assert_called_once_with(
project_id=STACKIT_PROJECT_ID, region="eu01", credentials_group="cg-001"
)
assert len(service.access_keys) == 1
assert service.access_keys[0].key_id == "key-001"
assert service.access_keys[0].display_name == "my-key"
assert service.access_keys[0].region == "eu01"
assert service.access_keys[0].expires == "2027-01-01T00:00:00+00:00"
assert service.access_keys[0].credentials_group_id == "cg-001"
assert service.access_keys[0].credentials_group_name == "main-group"
def test_list_access_keys_with_credentials_group_id_object_data(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[
SimpleNamespace(
credentials_group_id="cg-sdk",
display_name="sdk-group",
)
]
)
client.list_access_keys.return_value = SimpleNamespace(access_keys=[])
service._list_access_keys(client, "eu01")
client.list_access_keys.assert_called_once_with(
project_id=STACKIT_PROJECT_ID, region="eu01", credentials_group="cg-sdk"
)
def test_list_access_keys_collects_keys_from_multiple_credentials_groups(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[
SimpleNamespace(id="cg-001", display_name="group-one"),
SimpleNamespace(id="cg-002", display_name="group-two"),
]
)
client.list_access_keys.side_effect = [
SimpleNamespace(
access_keys=[
SimpleNamespace(
key_id="key-001",
display_name="key-one",
expires="2027-01-01T00:00:00+00:00",
)
]
),
SimpleNamespace(
access_keys=[
SimpleNamespace(
key_id="key-002",
display_name="key-two",
expires=None,
)
]
),
]
service._list_access_keys(client, "eu01")
assert client.list_access_keys.call_args_list == [
mock.call(
project_id=STACKIT_PROJECT_ID,
region="eu01",
credentials_group="cg-001",
),
mock.call(
project_id=STACKIT_PROJECT_ID,
region="eu01",
credentials_group="cg-002",
),
]
assert [key.key_id for key in service.access_keys] == ["key-001", "key-002"]
assert service.access_keys[1].expires is None
assert service.access_keys[1].has_expiration() is False
assert [key.credentials_group_id for key in service.access_keys] == [
"cg-001",
"cg-002",
]
def test_list_access_keys_with_dict_api_response(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = {
"credentialsGroups": [{"id": "cg-dict", "displayName": "dict-group"}]
}
client.list_access_keys.return_value = {
"accessKeys": [
{"keyId": "key-dict", "displayName": "dict-key", "expires": None}
]
}
service._list_access_keys(client, "eu01")
assert len(service.access_keys) == 1
assert service.access_keys[0].key_id == "key-dict"
assert service.access_keys[0].display_name == "dict-key"
assert service.access_keys[0].expires is None
assert service.access_keys[0].has_expiration() is False
assert service.access_keys[0].credentials_group_id == "cg-dict"
assert service.access_keys[0].credentials_group_name == "dict-group"
def test_list_access_keys_with_raw_json_response_and_null_expires(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
class RawResponse:
status = 200
def json(self):
return {
"accessKeys": [
{
"keyId": "key-raw",
"displayName": "raw-key",
"expires": None,
}
]
}
class FakeClient:
def __init__(self):
self.list_credentials_groups = mock.MagicMock(
return_value=SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-raw")]
)
)
self.list_access_keys = mock.MagicMock()
self.raw_call = None
def list_access_keys_without_preload_content(self, **kwargs):
self.raw_call = kwargs
return RawResponse()
client = FakeClient()
service._list_access_keys(client, "eu01")
assert client.raw_call == {
"project_id": STACKIT_PROJECT_ID,
"region": "eu01",
"credentials_group": "cg-raw",
}
client.list_access_keys.assert_not_called()
assert len(service.access_keys) == 1
assert service.access_keys[0].key_id == "key-raw"
assert service.access_keys[0].expires is None
assert service.access_keys[0].has_expiration() is False
def test_list_access_keys_with_raw_data_response(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
class RawResponse:
status = 200
data = b'{"accessKeys":[{"keyId":"key-data","displayName":"data-key"}]}'
class FakeClient:
def __init__(self):
self.list_credentials_groups = mock.MagicMock(
return_value=SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-data")]
)
)
def list_access_keys_without_preload_content(self, **kwargs):
return RawResponse()
service._list_access_keys(FakeClient(), "eu01")
assert len(service.access_keys) == 1
assert service.access_keys[0].key_id == "key-data"
assert service.access_keys[0].display_name == "data-key"
def test_list_access_keys_raw_response_propagates_non_success_status(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
class RawResponse:
status = 503
class FakeClient:
def __init__(self):
self.list_credentials_groups = mock.MagicMock(
return_value=SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-error")]
)
)
def list_access_keys_without_preload_content(self, **kwargs):
return RawResponse()
with pytest.raises(Exception, match="status 503") as error:
service._list_access_keys(FakeClient(), "eu01")
assert error.value.status == 503
service.provider.handle_api_error.assert_called_once_with(error.value)
def test_list_access_keys_with_dict_key_data(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[{"id": "cg-456", "displayName": "group-456"}]
)
client.list_access_keys.return_value = SimpleNamespace(
access_keys=[
{
"keyId": "key-456",
"displayName": "my-dict-key",
"expires": "2028-06-01T00:00:00+00:00",
}
]
)
service._list_access_keys(client, "eu01")
assert len(service.access_keys) == 1
assert service.access_keys[0].key_id == "key-456"
assert service.access_keys[0].display_name == "my-dict-key"
assert service.access_keys[0].credentials_group_id == "cg-456"
def test_list_access_keys_skips_unknown_type(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-001")]
)
client.list_access_keys.return_value = SimpleNamespace(access_keys=[42])
service._list_access_keys(client, "eu01")
assert len(service.access_keys) == 0
def test_list_access_keys_no_keys(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-empty")]
)
client.list_access_keys.return_value = SimpleNamespace(access_keys=[])
service._list_access_keys(client, "eu01")
assert len(service.access_keys) == 0
def test_list_access_keys_no_credentials_groups(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[]
)
service._list_access_keys(client, "eu01")
assert len(service.access_keys) == 0
client.list_access_keys.assert_not_called()
def test_list_access_keys_skips_malformed_credentials_groups(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[
42,
{},
SimpleNamespace(id="cg-valid", display_name="valid-group"),
]
)
client.list_access_keys.return_value = SimpleNamespace(
access_keys=[SimpleNamespace(key_id="key-valid")]
)
service._list_access_keys(client, "eu01")
client.list_access_keys.assert_called_once_with(
project_id=STACKIT_PROJECT_ID, region="eu01", credentials_group="cg-valid"
)
assert len(service.access_keys) == 1
assert service.access_keys[0].key_id == "key-valid"
def test_fetch_all_regions_calls_both_list_methods(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
service.access_keys = []
service.regional_clients = {"eu01": mock.MagicMock()}
with (
mock.patch.object(service, "_list_buckets") as mock_buckets,
mock.patch.object(service, "_list_access_keys") as mock_keys,
):
service._fetch_all_regions()
mock_buckets.assert_called_once()
mock_keys.assert_called_once()
def test_list_buckets_handles_bucket_processing_error(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.buckets = []
class BrokenBucket:
@property
def name(self):
raise RuntimeError("broken bucket attribute")
client = mock.MagicMock()
client.list_buckets.return_value = SimpleNamespace(buckets=[BrokenBucket()])
service._list_buckets(client, "eu01")
assert len(service.buckets) == 0
def test_list_access_keys_handles_key_processing_error(self):
service = ObjectStorageService.__new__(ObjectStorageService)
service.provider = mock.MagicMock()
service.project_id = STACKIT_PROJECT_ID
service.access_keys = []
class BrokenKey:
@property
def key_id(self):
raise RuntimeError("broken key attribute")
client = mock.MagicMock()
client.list_credentials_groups.return_value = SimpleNamespace(
credentials_groups=[SimpleNamespace(id="cg-001")]
)
client.list_access_keys.return_value = SimpleNamespace(
access_keys=[BrokenKey()]
)
service._list_access_keys(client, "eu01")
assert len(service.access_keys) == 0
class TestAccessKeyModel:
def test_has_expiration_with_invalid_date_string(self):
key = AccessKey(
key_id="k",
display_name="k",
expires="not-a-valid-date",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
assert key.has_expiration() is False
def test_expires_within_days_when_no_expiration(self):
key = AccessKey(
key_id="k",
display_name="k",
expires=None,
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
assert key.expires is None
assert key.has_expiration() is False
assert key.expires_within_days(90) is False
def test_expires_within_days_when_expiring_soon(self):
key = AccessKey(
key_id="k",
display_name="k",
expires="2026-06-15T00:00:00+00:00",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
assert key.expires_within_days(90) is True
def test_expires_within_days_when_not_expiring_soon(self):
key = AccessKey(
key_id="k",
display_name="k",
expires="2030-01-01T00:00:00+00:00",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
assert key.expires_within_days(30) is False
def test_expires_within_days_with_naive_datetime(self):
key = AccessKey(
key_id="k",
display_name="k",
expires="2026-06-10T00:00:00",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
assert key.expires_within_days(90) is True
def test_expires_within_days_with_sentinel_key(self):
key = AccessKey(
key_id="k",
display_name="k",
expires="0001-01-01T00:00:00+00:00",
region="eu01",
project_id=STACKIT_PROJECT_ID,
)
assert key.expires_within_days(90) is False
@@ -411,3 +411,68 @@ class Test_StackitProvider_Handle_API_Error:
with pytest.raises(RuntimeError) as excinfo: with pytest.raises(RuntimeError) as excinfo:
StackitProvider.handle_api_error(original) StackitProvider.handle_api_error(original)
assert excinfo.value is original assert excinfo.value is original
class TestGenerateRegionalClients:
"""Tests for StackitProvider.generate_regional_clients."""
def _make_provider(self):
provider = object.__new__(StackitProvider)
provider._service_account_key_path = "/tmp/sa-key.json"
provider._service_account_key = None
provider._audited_regions = None
return provider
def _fake_classes(self):
class FakeConfig:
pass
class FakeIaasClient:
def __init__(self, config):
pass
class FakeObjStorageClient:
def __init__(self, config):
pass
return FakeConfig, FakeIaasClient, FakeObjStorageClient
def test_objectstorage_service_uses_objectstorage_api_class(self, monkeypatch):
FakeConfig, FakeIaasClient, FakeObjStorageClient = self._fake_classes()
monkeypatch.setattr(
StackitProvider,
"_SERVICE_API_CLASS",
{"iaas": FakeIaasClient, "objectstorage": FakeObjStorageClient},
)
provider = self._make_provider()
monkeypatch.setattr(
provider, "get_available_service_regions", lambda _s, _r: ["eu01"]
)
with patch.object(
StackitProvider, "_build_sdk_configuration", return_value=FakeConfig()
):
clients = provider.generate_regional_clients("objectstorage")
assert "eu01" in clients
assert isinstance(clients["eu01"], FakeObjStorageClient)
def test_iaas_service_uses_iaas_api_class(self, monkeypatch):
FakeConfig, FakeIaasClient, FakeObjStorageClient = self._fake_classes()
monkeypatch.setattr(
StackitProvider,
"_SERVICE_API_CLASS",
{"iaas": FakeIaasClient, "objectstorage": FakeObjStorageClient},
)
provider = self._make_provider()
monkeypatch.setattr(
provider, "get_available_service_regions", lambda _s, _r: ["eu01"]
)
with patch.object(
StackitProvider, "_build_sdk_configuration", return_value=FakeConfig()
):
clients = provider.generate_regional_clients("iaas")
assert "eu01" in clients
assert isinstance(clients["eu01"], FakeIaasClient)
Generated
+17
View File
@@ -3325,6 +3325,7 @@ dependencies = [
{ name = "slack-sdk" }, { name = "slack-sdk" },
{ name = "stackit-core" }, { name = "stackit-core" },
{ name = "stackit-iaas" }, { name = "stackit-iaas" },
{ name = "stackit-objectstorage" },
{ name = "stackit-resourcemanager" }, { name = "stackit-resourcemanager" },
{ name = "tabulate" }, { name = "tabulate" },
{ name = "tzlocal" }, { name = "tzlocal" },
@@ -3433,6 +3434,7 @@ requires-dist = [
{ name = "slack-sdk", specifier = "==3.39.0" }, { name = "slack-sdk", specifier = "==3.39.0" },
{ name = "stackit-core", specifier = "==0.2.0" }, { name = "stackit-core", specifier = "==0.2.0" },
{ name = "stackit-iaas", specifier = "==1.4.0" }, { name = "stackit-iaas", specifier = "==1.4.0" },
{ name = "stackit-objectstorage", specifier = "==1.4.0" },
{ name = "stackit-resourcemanager", specifier = "==0.8.0" }, { name = "stackit-resourcemanager", specifier = "==0.8.0" },
{ name = "tabulate", specifier = "==0.9.0" }, { name = "tabulate", specifier = "==0.9.0" },
{ name = "tzlocal", specifier = "==5.3.1" }, { name = "tzlocal", specifier = "==5.3.1" },
@@ -4298,6 +4300,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/08/51/2201164d7bfacf47539888c735f10f6320c188252384957aa1b23121a210/stackit_iaas-1.4.0-py3-none-any.whl", hash = "sha256:3f4a32321b57ac238f73e5d660c6428186b92cc0425c1f0783ba801e377149d9", size = 316588, upload-time = "2026-05-13T09:43:14.943Z" }, { url = "https://files.pythonhosted.org/packages/08/51/2201164d7bfacf47539888c735f10f6320c188252384957aa1b23121a210/stackit_iaas-1.4.0-py3-none-any.whl", hash = "sha256:3f4a32321b57ac238f73e5d660c6428186b92cc0425c1f0783ba801e377149d9", size = 316588, upload-time = "2026-05-13T09:43:14.943Z" },
] ]
[[package]]
name = "stackit-objectstorage"
version = "1.4.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "pydantic" },
{ name = "python-dateutil" },
{ name = "requests" },
{ name = "stackit-core" },
]
sdist = { url = "https://files.pythonhosted.org/packages/90/80/b790756af40a5c6d979dd688b2557394ac54b594eb4c08edc33157ba890f/stackit_objectstorage-1.4.0.tar.gz", hash = "sha256:4a3812b4de102b199f061706a802909f9e53ae9b0858769d5bd720f814c8bdbe", size = 31814, upload-time = "2026-05-13T09:43:05.027Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/68/f1/ffa8d5e2ec9f818c72a6f045691364eb4e927ee86641993a70882d00205a/stackit_objectstorage-1.4.0-py3-none-any.whl", hash = "sha256:1a3285c6840d95cff591d84fd21803575cb0d010c398e6575ed92987b9c39866", size = 65061, upload-time = "2026-05-13T09:43:04.13Z" },
]
[[package]] [[package]]
name = "stackit-resourcemanager" name = "stackit-resourcemanager"
version = "0.8.0" version = "0.8.0"