From e28bde797fbfcbe1fc9a1322a9c5d6afb46e6d45 Mon Sep 17 00:00:00 2001 From: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com> Date: Wed, 11 Mar 2026 12:00:43 +0100 Subject: [PATCH] feat(openstack): object storage service with 7 new checks (#10258) --- prowler/CHANGELOG.md | 1 + .../services/objectstorage/__init__.py | 0 .../objectstorage/objectstorage_client.py | 6 + .../__init__.py | 0 ...iner_acl_not_globally_shared.metadata.json | 37 ++ ...orage_container_acl_not_globally_shared.py | 29 ++ .../__init__.py | 0 ...e_container_listing_disabled.metadata.json | 37 ++ ...bjectstorage_container_listing_disabled.py | 32 ++ .../__init__.py | 0 ...iner_metadata_sensitive_data.metadata.json | 39 ++ ...orage_container_metadata_sensitive_data.py | 64 +++ .../__init__.py | 0 ...ner_public_read_acl_disabled.metadata.json | 37 ++ ...rage_container_public_read_acl_disabled.py | 29 ++ .../__init__.py | 0 ...e_container_sync_not_enabled.metadata.json | 37 ++ ...bjectstorage_container_sync_not_enabled.py | 28 ++ .../__init__.py | 0 ...container_versioning_enabled.metadata.json | 37 ++ ...ectstorage_container_versioning_enabled.py | 30 ++ .../__init__.py | 0 ...ntainer_write_acl_restricted.metadata.json | 38 ++ ...tstorage_container_write_acl_restricted.py | 29 ++ .../objectstorage/objectstorage_service.py | 92 ++++ ..._container_acl_not_globally_shared_test.py | 283 ++++++++++++ ...storage_container_listing_disabled_test.py | 334 +++++++++++++++ ..._container_metadata_sensitive_data_test.py | 243 +++++++++++ ...container_public_read_acl_disabled_test.py | 245 +++++++++++ ...storage_container_sync_not_enabled_test.py | 199 +++++++++ ...orage_container_versioning_enabled_test.py | 249 +++++++++++ ...age_container_write_acl_restricted_test.py | 329 ++++++++++++++ .../openstack_objectstorage_service_test.py | 403 ++++++++++++++++++ 33 files changed, 2887 insertions(+) create mode 100644 prowler/providers/openstack/services/objectstorage/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_client.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/__init__.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.metadata.json create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.py create mode 100644 prowler/providers/openstack/services/objectstorage/objectstorage_service.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared_test.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled_test.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data_test.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled_test.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled_test.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled_test.py create mode 100644 tests/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted_test.py create mode 100644 tests/providers/openstack/services/objectstorage/openstack_objectstorage_service_test.py diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index f64f8f8fde..6ce203d0b2 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -10,6 +10,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - `entra_conditional_access_policy_compliant_device_hybrid_joined_device_mfa_required` check for M365 provider [(#10197)](https://github.com/prowler-cloud/prowler/pull/10197) - Add `trusted_ips` configurable option to `opensearch_service_domains_not_publicly_accessible` check to reduce false positives on IP-restricted policies [(#8631)](https://github.com/prowler-cloud/prowler/pull/8631) - `guardduty_delegated_admin_enabled_all_regions` check for AWS provider [(#9867)](https://github.com/prowler-cloud/prowler/pull/9867) +- OpenStack object storage service with 7 checks [(#10258)](https://github.com/prowler-cloud/prowler/pull/10258) ### 🔄 Changed diff --git a/prowler/providers/openstack/services/objectstorage/__init__.py b/prowler/providers/openstack/services/objectstorage/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_client.py b/prowler/providers/openstack/services/objectstorage/objectstorage_client.py new file mode 100644 index 0000000000..b858f90176 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_client.py @@ -0,0 +1,6 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorage, +) + +objectstorage_client = ObjectStorage(Provider.get_global_provider()) diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.metadata.json new file mode 100644 index 0000000000..02d66edfa5 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_acl_not_globally_shared", + "CheckTitle": "Object storage container read ACL is not globally shared", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift containers** are evaluated to verify that the **read ACL** does not use `*:*` (global sharing). The `*:*` ACL grants read access to all authenticated users from any project in the OpenStack deployment, violating project isolation and the principle of least privilege.", + "Risk": "Containers with `*:*` read ACL are accessible to **every authenticated user** in the OpenStack deployment, regardless of their project. This breaks **multi-tenant isolation**, exposing data to users in other projects. **Compromised credentials** from any project can access the container's contents.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_acl.html", + "https://docs.openstack.org/security-guide/object-storage.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post --read-acl ':*'", + "NativeIaC": "", + "Other": "1. Navigate to **Object Store > Containers**\n2. Select the container with global read ACL\n3. Edit the container metadata\n4. Replace `*:*` with specific project-scoped ACLs\n5. Save changes", + "Terraform": "" + }, + "Recommendation": { + "Text": "Replace `*:*` read ACLs with **project-scoped ACLs** (e.g., `project-id:*` or `project-id:user-id`). Use the **most restrictive ACL** that meets access requirements. Implement regular ACL audits to detect overly permissive configurations. Consider using **Keystone role-based access control** for fine-grained permissions.", + "Url": "https://hub.prowler.com/check/objectstorage_container_acl_not_globally_shared" + } + }, + "Categories": [ + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "The `*:*` ACL means 'any user in any project'. This is different from `.r:*` which grants anonymous (unauthenticated) access. Both are overly permissive but target different audiences: `*:*` targets all authenticated users while `.r:*` targets everyone including unauthenticated users." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.py new file mode 100644 index 0000000000..e5c21616d8 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared.py @@ -0,0 +1,29 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_acl_not_globally_shared(Check): + """Ensure object storage container read ACL does not use global sharing.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + acl_entries = [entry.strip() for entry in container.read_ACL.split(",")] + if "*:*" in acl_entries or "*" in acl_entries: + report.status = "FAIL" + report.status_extended = f"Container {container.name} has globally shared read ACL (*:*) allowing all authenticated users from any project." + else: + report.status = "PASS" + report.status_extended = ( + f"Container {container.name} read ACL is not globally shared." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.metadata.json new file mode 100644 index 0000000000..446d01a74b --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_listing_disabled", + "CheckTitle": "Object storage container listings are not publicly accessible", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift containers** are evaluated to verify that **public container listings** are disabled. The `.rlistings` ACL allows users with read access to list all objects within the container. When combined with `.r:*`, this enables unauthenticated users to enumerate all objects, facilitating targeted data exfiltration.", + "Risk": "Containers with `.rlistings` enabled allow attackers to enumerate all object names, sizes, and modification dates. This **metadata exposure** aids **targeted attacks**, reveals application structure, and can expose sensitive file names. Combined with public read access, it enables complete **data exfiltration**.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_acl.html", + "https://docs.openstack.org/security-guide/object-storage.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post --read-acl ''", + "NativeIaC": "", + "Other": "1. Navigate to **Object Store > Containers**\n2. Select the container with public listing\n3. Edit the container metadata\n4. Remove `.rlistings` from the Read ACL\n5. Save changes", + "Terraform": "" + }, + "Recommendation": { + "Text": "Remove `.rlistings` from container read ACLs to prevent **public object enumeration**. Listings should only be available to **authenticated project members**. If external access is needed, implement application-level access control with authenticated APIs.", + "Url": "https://hub.prowler.com/check/objectstorage_container_listing_disabled" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "The `.rlistings` ACL controls whether container listings (GET on container) are allowed. Even without `.rlistings`, individual objects may still be readable if `.r:*` is set. Both ACLs should be removed for full protection." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.py new file mode 100644 index 0000000000..a541b9866e --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled.py @@ -0,0 +1,32 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_listing_disabled(Check): + """Ensure object storage container object listings are not publicly accessible.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + acl_entries = [entry.strip() for entry in container.read_ACL.split(",")] + if ".rlistings" in acl_entries: + report.status = "FAIL" + report.status_extended = f"Container {container.name} has public listing enabled (.rlistings) allowing anonymous object enumeration." + elif "*:*" in acl_entries or "*" in acl_entries: + report.status = "FAIL" + report.status_extended = f"Container {container.name} has listing enabled via global read ACL (*:*) allowing all authenticated users to list objects." + else: + report.status = "PASS" + report.status_extended = ( + f"Container {container.name} does not have public listing enabled." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.metadata.json new file mode 100644 index 0000000000..b3a39bd3f8 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.metadata.json @@ -0,0 +1,39 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_metadata_sensitive_data", + "CheckTitle": "Object storage container metadata does not contain sensitive data", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "critical", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift container metadata** is evaluated to detect **sensitive data** such as passwords, API keys, secrets, and private keys. Container metadata is accessible to any user with read access to the container. Storing secrets in metadata exposes them to unauthorized access and credential theft.", + "Risk": "Container metadata containing **sensitive data** exposes credentials to any user with container read access. Attackers with read permissions can extract **passwords**, **API keys**, and **private keys**. Stolen credentials enable unauthorized access to other systems, **data exfiltration**, and **privilege escalation**.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_acl.html", + "https://cheatsheetseries.owasp.org/cheatsheets/Secrets_Management_Cheat_Sheet.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post --meta ':'", + "NativeIaC": "", + "Other": "1. Identify containers with sensitive metadata\n2. Remove sensitive metadata keys using CLI\n3. Rotate exposed credentials immediately\n4. Store secrets in Barbican or external secrets manager instead", + "Terraform": "" + }, + "Recommendation": { + "Text": "Never store secrets in container metadata. Use **Barbican** (OpenStack Key Manager), **Vault**, or external secrets management instead. Remove existing sensitive metadata and **rotate any exposed credentials**. Implement metadata policies to prevent future secret storage.", + "Url": "https://hub.prowler.com/check/objectstorage_container_metadata_sensitive_data" + } + }, + "Categories": [ + "threat-detection", + "secrets", + "encryption" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "This check uses the detect-secrets library to scan for credentials. May produce false positives on metadata keys containing secret-like keywords. Findings should be reviewed manually. The audit_config allows configuring secrets_ignore_patterns to exclude specific patterns and detect_secrets_plugins to customize detection." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.py new file mode 100644 index 0000000000..94d281bf32 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data.py @@ -0,0 +1,64 @@ +import json +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.lib.utils.utils import detect_secrets_scan +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_metadata_sensitive_data(Check): + """Ensure object storage container metadata does not contain sensitive data like passwords or API keys.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + secrets_ignore_patterns = objectstorage_client.audit_config.get( + "secrets_ignore_patterns", [] + ) + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + report.status = "PASS" + report.status_extended = ( + f"Container {container.name} metadata does not contain sensitive data." + ) + + if container.metadata: + # Build metadata dict and parallel list of keys + dump_metadata = {} + original_metadata_keys = [] + for key, value in container.metadata.items(): + dump_metadata[key] = value + original_metadata_keys.append(key) + + # Convert metadata dict to JSON string for detect-secrets scanning + metadata_json = json.dumps(dump_metadata, indent=2) + detect_secrets_output = detect_secrets_scan( + data=metadata_json, + excluded_secrets=secrets_ignore_patterns, + detect_secrets_plugins=objectstorage_client.audit_config.get( + "detect_secrets_plugins" + ), + ) + + if detect_secrets_output: + # Map line numbers back to metadata keys using the parallel list + # Line numbering: line 1 = "{", line 2 = first key-value, etc. + secrets_string = ", ".join( + [ + f"{secret['type']} in metadata key '{original_metadata_keys[secret['line_number'] - 2]}'" + for secret in detect_secrets_output + if 0 + <= secret["line_number"] - 2 + < len(original_metadata_keys) + ] + ) + report.status = "FAIL" + report.status_extended = f"Container {container.name} metadata contains potential secrets -> {secrets_string}." + else: + report.status_extended = f"Container {container.name} has no metadata (no sensitive data exposure risk)." + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.metadata.json new file mode 100644 index 0000000000..f15becbcf5 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_public_read_acl_disabled", + "CheckTitle": "Object storage containers do not grant anonymous read access", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift containers** are evaluated to verify that **anonymous read access** is not granted. The `.r:*` ACL allows any unauthenticated user on the internet to read objects in the container. This is a critical exposure that can lead to data leaks, especially when containers hold sensitive information such as backups, logs, or application data.", + "Risk": "Containers with `.r:*` in the read ACL are **publicly accessible** without authentication. Attackers can enumerate and download all objects, leading to **data breaches**, **credential exposure**, and **compliance violations**. Public containers are frequently targeted by automated scanners.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_acl.html", + "https://docs.openstack.org/security-guide/object-storage.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post --read-acl ''", + "NativeIaC": "", + "Other": "1. Navigate to **Object Store > Containers**\n2. Select the container with public read ACL\n3. Edit the container metadata\n4. Remove the `.r:*` entry from the Read ACL\n5. Save changes", + "Terraform": "" + }, + "Recommendation": { + "Text": "Remove `.r:*` from container read ACLs to prevent **anonymous access**. Use **project-scoped ACLs** (e.g., `project-id:*`) or specific user ACLs instead. If public access is required, consider using a **CDN** or **signed URLs** with time-limited tokens.", + "Url": "https://hub.prowler.com/check/objectstorage_container_public_read_acl_disabled" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Swift ACLs use a specific syntax where `.r:*` grants read access to all referrers (anonymous access). The `.r:` prefix indicates referrer-based ACL. This check specifically looks for the wildcard referrer pattern." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.py new file mode 100644 index 0000000000..6a3b5c03ec --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled.py @@ -0,0 +1,29 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_public_read_acl_disabled(Check): + """Ensure object storage containers do not grant anonymous read access.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + acl_entries = [entry.strip() for entry in container.read_ACL.split(",")] + if ".r:*" in acl_entries: + report.status = "FAIL" + report.status_extended = f"Container {container.name} has public read ACL (.r:*) allowing anonymous access." + else: + report.status = "PASS" + report.status_extended = ( + f"Container {container.name} does not have public read ACL." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.metadata.json new file mode 100644 index 0000000000..35925433c5 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_sync_not_enabled", + "CheckTitle": "Object storage containers do not have container sync enabled", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift containers** are evaluated to verify that **container sync** is not configured. Container sync replicates objects to another Swift cluster, potentially outside the organization's control. Unauthorized sync configurations can be used for data exfiltration to external clusters.", + "Risk": "Container sync replicates all objects to a **remote Swift cluster**. If misconfigured or set up by an attacker, sensitive data is continuously **exfiltrated** to an external location. The sync operates automatically and silently, making it difficult to detect ongoing **data theft**.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_container_sync.html", + "https://docs.openstack.org/security-guide/object-storage.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post -H 'X-Container-Sync-To:' -H 'X-Container-Sync-Key:'", + "NativeIaC": "", + "Other": "1. Identify containers with sync enabled\n2. Remove sync configuration using CLI\n3. Verify sync is disabled: `swift stat `\n4. Review sync logs for unauthorized data transfers\n5. Investigate who configured the sync and when", + "Terraform": "" + }, + "Recommendation": { + "Text": "Disable container sync unless explicitly required for **disaster recovery**. If sync is needed, ensure the target cluster is within your organization's **trust boundary**. Monitor sync configurations for unauthorized changes. Implement **RBAC policies** to restrict who can configure container sync.", + "Url": "https://hub.prowler.com/check/objectstorage_container_sync_not_enabled" + } + }, + "Categories": [ + "trust-boundaries" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Container sync is configured via X-Container-Sync-To (target URL) and X-Container-Sync-Key (shared secret) headers. The sync middleware must be enabled in the Swift pipeline for sync to function. This check flags any container with a non-empty sync_to value." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.py new file mode 100644 index 0000000000..ac7545e69e --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled.py @@ -0,0 +1,28 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_sync_not_enabled(Check): + """Ensure object storage containers do not have container sync configured.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + if container.sync_to: + report.status = "FAIL" + report.status_extended = f"Container {container.name} has container sync enabled (sync target: {container.sync_to})." + else: + report.status = "PASS" + report.status_extended = ( + f"Container {container.name} does not have container sync enabled." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.metadata.json new file mode 100644 index 0000000000..568e7c252f --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_versioning_enabled", + "CheckTitle": "Object storage containers have versioning enabled", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift containers** are evaluated to verify that **object versioning** is enabled. Versioning preserves previous versions of objects when they are overwritten or deleted, enabling recovery from accidental modifications, ransomware attacks, and data corruption.", + "Risk": "Without versioning, overwritten or deleted objects **cannot be recovered**. Accidental deletions, **ransomware** encrypting objects, or malicious modifications result in **permanent data loss**. Versioning provides a safety net for forensic investigation and disaster recovery.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_object_versioning.html", + "https://docs.openstack.org/security-guide/object-storage.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post -H 'X-Versions-Location: '", + "NativeIaC": "", + "Other": "1. Create a versions container: `swift post _versions`\n2. Enable versioning (stack mode): `swift post -H 'X-Versions-Location: _versions'`\n Or enable versioning (history mode): `swift post -H 'X-History-Location: _versions'`\n3. Verify: `swift stat ` shows Versions header", + "Terraform": "" + }, + "Recommendation": { + "Text": "Enable **object versioning** on all containers storing important data. Create a dedicated versions container and set the `X-Versions-Location` (stack mode) or `X-History-Location` (history mode) header. Implement **lifecycle policies** to manage version retention and storage costs. Combine with container backups for comprehensive data protection.", + "Url": "https://hub.prowler.com/check/objectstorage_container_versioning_enabled" + } + }, + "Categories": [ + "forensics-ready" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "Swift supports two versioning modes: X-Versions-Location (stack mode, stores versions in a separate container) and X-History-Location (history mode). This check verifies that either versions_location or history_location is set, indicating versioning is configured." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.py new file mode 100644 index 0000000000..f2e811093f --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled.py @@ -0,0 +1,30 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_versioning_enabled(Check): + """Ensure object storage containers have versioning enabled for data protection.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + if container.versioning_enabled: + report.status = "PASS" + location = container.versions_location or container.history_location + mode = "versions" if container.versions_location else "history" + report.status_extended = f"Container {container.name} has versioning enabled ({mode} location: {location})." + else: + report.status = "FAIL" + report.status_extended = ( + f"Container {container.name} does not have versioning enabled." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/__init__.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.metadata.json b/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.metadata.json new file mode 100644 index 0000000000..fa2af1334a --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.metadata.json @@ -0,0 +1,38 @@ +{ + "Provider": "openstack", + "CheckID": "objectstorage_container_write_acl_restricted", + "CheckTitle": "Object storage container write ACL is restricted", + "CheckType": [], + "ServiceName": "objectstorage", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "critical", + "ResourceType": "OS::Swift::Container", + "ResourceGroup": "storage", + "Description": "**OpenStack Swift containers** are evaluated to verify that **write access** is restricted. The `*:*` write ACL allows any authenticated user from any project to create, modify, or delete objects. The `*` write ACL similarly grants unrestricted write access. Both configurations violate the principle of least privilege.", + "Risk": "Containers with **unrestricted write ACLs** allow any authenticated OpenStack user to upload, overwrite, or delete objects. This enables **data tampering**, **malware injection**, **ransomware attacks** (encrypting objects), and **resource abuse** (cryptomining payloads). Compromised credentials from any project can be used to modify data.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/swift/latest/overview_acl.html", + "https://docs.openstack.org/security-guide/object-storage.html" + ], + "Remediation": { + "Code": { + "CLI": "swift post --write-acl ':'", + "NativeIaC": "", + "Other": "1. Navigate to **Object Store > Containers**\n2. Select the container with unrestricted write ACL\n3. Edit the container metadata\n4. Replace `*:*` or `*` with specific `project-id:user-id` entries\n5. Save changes", + "Terraform": "" + }, + "Recommendation": { + "Text": "Restrict write ACLs to specific project and user combinations (e.g., `project-id:user-id`). Never use `*:*` or `*` as write ACLs. Implement **RBAC policies** to control write access. Regularly audit container ACLs to detect **overly permissive configurations**.", + "Url": "https://hub.prowler.com/check/objectstorage_container_write_acl_restricted" + } + }, + "Categories": [ + "internet-exposed", + "identity-access" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "The `*:*` ACL means 'any user in any project' while `*` alone also grants unrestricted access. Both are dangerous for write operations as they allow data modification by any authenticated user in the OpenStack deployment." +} diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.py b/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.py new file mode 100644 index 0000000000..0693ebedce --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted.py @@ -0,0 +1,29 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.objectstorage.objectstorage_client import ( + objectstorage_client, +) + + +class objectstorage_container_write_acl_restricted(Check): + """Ensure object storage container write ACL does not allow all authenticated users.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for container in objectstorage_client.containers: + report = CheckReportOpenStack(metadata=self.metadata(), resource=container) + acl_entries = [entry.strip() for entry in container.write_ACL.split(",")] + if "*:*" in acl_entries or "*" in acl_entries: + report.status = "FAIL" + report.status_extended = f"Container {container.name} has unrestricted write ACL allowing all authenticated users to write." + else: + report.status = "PASS" + report.status_extended = ( + f"Container {container.name} has restricted write ACL." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/objectstorage/objectstorage_service.py b/prowler/providers/openstack/services/objectstorage/objectstorage_service.py new file mode 100644 index 0000000000..6a2a73d5a0 --- /dev/null +++ b/prowler/providers/openstack/services/objectstorage/objectstorage_service.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, List + +from openstack import exceptions as openstack_exceptions + +from prowler.lib.logger import logger +from prowler.providers.openstack.lib.service.service import OpenStackService + + +class ObjectStorage(OpenStackService): + """Service wrapper using openstacksdk object-store APIs.""" + + def __init__(self, provider) -> None: + super().__init__(__class__.__name__, provider) + self.containers: List[ObjectStorageContainer] = [] + self._list_containers() + + def _list_containers(self) -> None: + """List all object storage containers across all audited regions.""" + logger.info("ObjectStorage - Listing containers...") + for region, conn in self.regional_connections.items(): + try: + for container in conn.object_store.containers(): + # The list API only returns name/count/bytes; HEAD each + # container to retrieve ACLs, metadata, and versioning info. + try: + detail = conn.object_store.get_container_metadata( + getattr(container, "name", "") + ) + except Exception as head_error: + logger.warning( + f"Could not HEAD container {getattr(container, 'name', '')}: {head_error}" + ) + detail = container + + metadata = getattr(detail, "metadata", None) or {} + + # Extract versioning info (Swift supports two modes) + versions_location = getattr(detail, "versions_location", "") or "" + history_location = getattr(detail, "history_location", "") or "" + versioning_enabled = bool(versions_location or history_location) + + self.containers.append( + ObjectStorageContainer( + id=getattr(container, "name", ""), + name=getattr(container, "name", ""), + region=region, + project_id=self.project_id, + object_count=getattr(detail, "count", 0), + bytes_used=getattr(detail, "bytes", 0), + read_ACL=getattr(detail, "read_ACL", "") or "", + write_ACL=getattr(detail, "write_ACL", "") or "", + versioning_enabled=versioning_enabled, + versions_location=versions_location, + history_location=history_location, + sync_to=getattr(detail, "sync_to", "") or "", + sync_key=getattr(detail, "sync_key", "") or "", + metadata=metadata if isinstance(metadata, dict) else {}, + ) + ) + except openstack_exceptions.SDKException as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Failed to list object storage containers in region {region}: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Unexpected error listing object storage containers in region {region}: {error}" + ) + + +@dataclass +class ObjectStorageContainer: + """Represents an OpenStack Swift container.""" + + id: str + name: str + region: str + project_id: str + object_count: int + bytes_used: int + read_ACL: str + write_ACL: str + versioning_enabled: bool + versions_location: str + history_location: str + sync_to: str + sync_key: str + metadata: Dict[str, str] diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared_test.py new file mode 100644 index 0000000000..3e2cdd6f9b --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_acl_not_globally_shared/objectstorage_container_acl_not_globally_shared_test.py @@ -0,0 +1,283 @@ +"""Tests for objectstorage_container_acl_not_globally_shared check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_acl_not_globally_shared: + """Test suite for objectstorage_container_acl_not_globally_shared check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import ( + objectstorage_container_acl_not_globally_shared, + ) + + check = objectstorage_container_acl_not_globally_shared() + result = check.execute() + + assert len(result) == 0 + + def test_container_not_globally_shared(self): + """Test container without global sharing (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="project-scoped", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="project-123:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import ( + objectstorage_container_acl_not_globally_shared, + ) + + check = objectstorage_container_acl_not_globally_shared() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container project-scoped read ACL is not globally shared." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "project-scoped" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_globally_shared(self): + """Test container with global sharing (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="global-shared", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL="*:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import ( + objectstorage_container_acl_not_globally_shared, + ) + + check = objectstorage_container_acl_not_globally_shared() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container global-shared has globally shared read ACL (*:*) allowing all authenticated users from any project." + ) + assert result[0].resource_id == "container-2" + assert result[0].resource_name == "global-shared" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_globally_shared_bare_wildcard(self): + """Test container with * (bare wildcard) read ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-3", + name="bare-wildcard", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import ( + objectstorage_container_acl_not_globally_shared, + ) + + check = objectstorage_container_acl_not_globally_shared() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + + def test_container_star_colon_star_in_multi_entry_acl(self): + """Test container with *:* in multi-entry read ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-4", + name="multi-entry-global", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="project-123:user-456,*:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import ( + objectstorage_container_acl_not_globally_shared, + ) + + check = objectstorage_container_acl_not_globally_shared() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed ACLs.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Pass", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ObjectStorageContainer( + id="container-fail", + name="Fail", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="*:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import ( + objectstorage_container_acl_not_globally_shared, + ) + + check = objectstorage_container_acl_not_globally_shared() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled_test.py new file mode 100644 index 0000000000..73bd1bdaba --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_listing_disabled/objectstorage_container_listing_disabled_test.py @@ -0,0 +1,334 @@ +"""Tests for objectstorage_container_listing_disabled check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_listing_disabled: + """Test suite for objectstorage_container_listing_disabled check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 0 + + def test_container_no_listing(self): + """Test container without public listing (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="no-listing", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container no-listing does not have public listing enabled." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "no-listing" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_with_listing(self): + """Test container with public listing (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="public-listing", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL=".r:*,.rlistings", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container public-listing has public listing enabled (.rlistings) allowing anonymous object enumeration." + ) + assert result[0].resource_id == "container-2" + assert result[0].resource_name == "public-listing" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_listing_via_global_acl_star_colon_star(self): + """Test container with *:* read ACL enabling listing (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-3", + name="global-acl-listing", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL="*:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container global-acl-listing has listing enabled via global read ACL (*:*) allowing all authenticated users to list objects." + ) + assert result[0].resource_id == "container-3" + assert result[0].resource_name == "global-acl-listing" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_listing_via_bare_wildcard(self): + """Test container with * read ACL enabling listing (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-4", + name="bare-wildcard-listing", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + + def test_container_rlistings_takes_priority_over_global(self): + """Test that .rlistings is reported when both .rlistings and *:* are present.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-5", + name="both-patterns", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL=".rlistings,*:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ".rlistings" in result[0].status_extended + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed listing status.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Pass", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL=".r:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ObjectStorageContainer( + id="container-fail", + name="Fail", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL=".r:*,.rlistings", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import ( + objectstorage_container_listing_disabled, + ) + + check = objectstorage_container_listing_disabled() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data_test.py new file mode 100644 index 0000000000..6cae6432c7 --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_metadata_sensitive_data/objectstorage_container_metadata_sensitive_data_test.py @@ -0,0 +1,243 @@ +"""Tests for objectstorage_container_metadata_sensitive_data check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_metadata_sensitive_data: + """Test suite for objectstorage_container_metadata_sensitive_data check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + objectstorage_client.audit_config = {} + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import ( + objectstorage_container_metadata_sensitive_data, + ) + + check = objectstorage_container_metadata_sensitive_data() + result = check.execute() + + assert len(result) == 0 + + def test_container_no_metadata(self): + """Test container with no metadata (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.audit_config = {} + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="no-metadata", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import ( + objectstorage_container_metadata_sensitive_data, + ) + + check = objectstorage_container_metadata_sensitive_data() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container no-metadata has no metadata (no sensitive data exposure risk)." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "no-metadata" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_safe_metadata(self): + """Test container with safe metadata (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.audit_config = {} + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="safe-metadata", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={"environment": "production", "application": "web-app"}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import ( + objectstorage_container_metadata_sensitive_data, + ) + + check = objectstorage_container_metadata_sensitive_data() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container safe-metadata metadata does not contain sensitive data." + ) + + def test_container_password_in_metadata(self): + """Test container with password in metadata (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.audit_config = {} + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-3", + name="password-metadata", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={"db_password": "supersecret123"}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import ( + objectstorage_container_metadata_sensitive_data, + ) + + check = objectstorage_container_metadata_sensitive_data() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert "contains potential secrets" in result[0].status_extended + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed metadata.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.audit_config = {} + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Safe", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={"tier": "web"}, + ), + ObjectStorageContainer( + id="container-fail", + name="Unsafe", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={"admin_password": "secret123"}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import ( + objectstorage_container_metadata_sensitive_data, + ) + + check = objectstorage_container_metadata_sensitive_data() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled_test.py new file mode 100644 index 0000000000..a8b484b8ec --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_public_read_acl_disabled/objectstorage_container_public_read_acl_disabled_test.py @@ -0,0 +1,245 @@ +"""Tests for objectstorage_container_public_read_acl_disabled check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_public_read_acl_disabled: + """Test suite for objectstorage_container_public_read_acl_disabled check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import ( + objectstorage_container_public_read_acl_disabled, + ) + + check = objectstorage_container_public_read_acl_disabled() + result = check.execute() + + assert len(result) == 0 + + def test_container_no_public_read(self): + """Test container without public read ACL (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="private-container", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import ( + objectstorage_container_public_read_acl_disabled, + ) + + check = objectstorage_container_public_read_acl_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container private-container does not have public read ACL." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "private-container" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_public_read(self): + """Test container with public read ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="public-container", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL=".r:*,.rlistings", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import ( + objectstorage_container_public_read_acl_disabled, + ) + + check = objectstorage_container_public_read_acl_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container public-container has public read ACL (.r:*) allowing anonymous access." + ) + assert result[0].resource_id == "container-2" + assert result[0].resource_name == "public-container" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_domain_restricted_referrer_not_flagged(self): + """Test container with domain-restricted referrer ACL is not flagged (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-3", + name="domain-restricted", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL=".r:*.example.com,.rlistings", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import ( + objectstorage_container_public_read_acl_disabled, + ) + + check = objectstorage_container_public_read_acl_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container domain-restricted does not have public read ACL." + ) + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed ACLs.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Pass", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ObjectStorageContainer( + id="container-fail", + name="Fail", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL=".r:*", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import ( + objectstorage_container_public_read_acl_disabled, + ) + + check = objectstorage_container_public_read_acl_disabled() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled_test.py new file mode 100644 index 0000000000..24e4cdfbec --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_sync_not_enabled/objectstorage_container_sync_not_enabled_test.py @@ -0,0 +1,199 @@ +"""Tests for objectstorage_container_sync_not_enabled check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_sync_not_enabled: + """Test suite for objectstorage_container_sync_not_enabled check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import ( + objectstorage_container_sync_not_enabled, + ) + + check = objectstorage_container_sync_not_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_container_no_sync(self): + """Test container without sync (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="no-sync", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import ( + objectstorage_container_sync_not_enabled, + ) + + check = objectstorage_container_sync_not_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container no-sync does not have container sync enabled." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "no-sync" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_with_sync(self): + """Test container with sync enabled (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="synced-container", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="https://other-cluster/v1/AUTH_test/container-2", + sync_key="shared-secret", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import ( + objectstorage_container_sync_not_enabled, + ) + + check = objectstorage_container_sync_not_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container synced-container has container sync enabled (sync target: https://other-cluster/v1/AUTH_test/container-2)." + ) + assert result[0].resource_id == "container-2" + assert result[0].resource_name == "synced-container" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed sync status.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Pass", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ObjectStorageContainer( + id="container-fail", + name="Fail", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="https://external/v1/AUTH_test/container", + sync_key="key", + metadata={}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import ( + objectstorage_container_sync_not_enabled, + ) + + check = objectstorage_container_sync_not_enabled() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled_test.py new file mode 100644 index 0000000000..5e66f27c6d --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_versioning_enabled/objectstorage_container_versioning_enabled_test.py @@ -0,0 +1,249 @@ +"""Tests for objectstorage_container_versioning_enabled check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_versioning_enabled: + """Test suite for objectstorage_container_versioning_enabled check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import ( + objectstorage_container_versioning_enabled, + ) + + check = objectstorage_container_versioning_enabled() + result = check.execute() + + assert len(result) == 0 + + def test_container_versioning_enabled_versions_location(self): + """Test container with versioning enabled via X-Versions-Location (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="versioned-container", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="", + versioning_enabled=True, + versions_location="versioned-container_versions", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import ( + objectstorage_container_versioning_enabled, + ) + + check = objectstorage_container_versioning_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container versioned-container has versioning enabled (versions location: versioned-container_versions)." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "versioned-container" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_versioning_enabled_history_location(self): + """Test container with versioning enabled via X-History-Location (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="history-container", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="", + versioning_enabled=True, + versions_location="", + history_location="history-container_versions", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import ( + objectstorage_container_versioning_enabled, + ) + + check = objectstorage_container_versioning_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container history-container has versioning enabled (history location: history-container_versions)." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "history-container" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_versioning_disabled(self): + """Test container without versioning (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="no-versioning", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import ( + objectstorage_container_versioning_enabled, + ) + + check = objectstorage_container_versioning_enabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container no-versioning does not have versioning enabled." + ) + assert result[0].resource_id == "container-2" + assert result[0].resource_name == "no-versioning" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed versioning status.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Pass", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=True, + versions_location="Pass_versions", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ObjectStorageContainer( + id="container-fail", + name="Fail", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import ( + objectstorage_container_versioning_enabled, + ) + + check = objectstorage_container_versioning_enabled() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted_test.py b/tests/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted_test.py new file mode 100644 index 0000000000..dc55902b78 --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/objectstorage_container_write_acl_restricted/objectstorage_container_write_acl_restricted_test.py @@ -0,0 +1,329 @@ +"""Tests for objectstorage_container_write_acl_restricted check.""" + +from unittest import mock + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_objectstorage_container_write_acl_restricted: + """Test suite for objectstorage_container_write_acl_restricted check.""" + + def test_no_containers(self): + """Test when no containers exist.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 0 + + def test_container_restricted_write(self): + """Test container with restricted write ACL (PASS).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-1", + name="restricted-write", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=10, + bytes_used=1024, + read_ACL="", + write_ACL="project-123:user-456", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Container restricted-write has restricted write ACL." + ) + assert result[0].resource_id == "container-1" + assert result[0].resource_name == "restricted-write" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_unrestricted_write_star_colon_star(self): + """Test container with *:* write ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-2", + name="unrestricted-write", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=5, + bytes_used=512, + read_ACL="", + write_ACL="*:*", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container unrestricted-write has unrestricted write ACL allowing all authenticated users to write." + ) + assert result[0].resource_id == "container-2" + assert result[0].resource_name == "unrestricted-write" + assert result[0].region == OPENSTACK_REGION + assert result[0].project_id == OPENSTACK_PROJECT_ID + + def test_container_unrestricted_write_star_only(self): + """Test container with * write ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-3", + name="star-write", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="*", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + + def test_container_star_in_multi_entry_acl(self): + """Test container with * in multi-entry write ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-4", + name="star-multi-entry", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="*,project-123:user-456", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Container star-multi-entry has unrestricted write ACL allowing all authenticated users to write." + ) + + def test_container_star_colon_star_in_multi_entry_acl(self): + """Test container with *:* in multi-entry write ACL (FAIL).""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-5", + name="star-colon-multi", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="project-123:user-456,*:*", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + + def test_multiple_containers_mixed(self): + """Test multiple containers with mixed write ACLs.""" + objectstorage_client = mock.MagicMock() + objectstorage_client.containers = [ + ObjectStorageContainer( + id="container-pass", + name="Pass", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ObjectStorageContainer( + id="container-fail", + name="Fail", + region=OPENSTACK_REGION, + project_id=OPENSTACK_PROJECT_ID, + object_count=0, + bytes_used=0, + read_ACL="", + write_ACL="*:*", + versioning_enabled=False, + versions_location="", + history_location="", + sync_to="", + sync_key="", + metadata={}, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client", + new=objectstorage_client, + ), + ): + from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import ( + objectstorage_container_write_acl_restricted, + ) + + check = objectstorage_container_write_acl_restricted() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/objectstorage/openstack_objectstorage_service_test.py b/tests/providers/openstack/services/objectstorage/openstack_objectstorage_service_test.py new file mode 100644 index 0000000000..a7e07c97f9 --- /dev/null +++ b/tests/providers/openstack/services/objectstorage/openstack_objectstorage_service_test.py @@ -0,0 +1,403 @@ +"""Tests for OpenStack ObjectStorage service.""" + +from unittest.mock import MagicMock, patch + +from openstack import exceptions as openstack_exceptions + +from prowler.providers.openstack.services.objectstorage.objectstorage_service import ( + ObjectStorage, + ObjectStorageContainer, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class TestObjectStorageService: + """Test suite for ObjectStorage service.""" + + def test_objectstorage_service_initialization(self): + """Test ObjectStorage service initializes correctly.""" + provider = set_mocked_openstack_provider() + + with patch.object( + ObjectStorage, "_list_containers", return_value=[] + ) as mock_list: + service = ObjectStorage(provider) + + assert service.service_name == "ObjectStorage" + assert service.provider == provider + assert service.connection == provider.connection + assert service.regional_connections == provider.regional_connections + assert service.audited_regions == [OPENSTACK_REGION] + assert service.region == OPENSTACK_REGION + assert service.project_id == OPENSTACK_PROJECT_ID + assert service.containers == [] + mock_list.assert_called_once() + + def test_objectstorage_list_containers_success(self): + """Test listing containers successfully.""" + provider = set_mocked_openstack_provider() + + mock_container1 = MagicMock() + mock_container1.name = "container-1" + mock_container1.count = 10 + mock_container1.bytes = 1024 + mock_container1.read_ACL = ".r:*,.rlistings" + mock_container1.write_ACL = "*:*" + mock_container1.versions_location = "container-1_versions" + mock_container1.history_location = "" + mock_container1.sync_to = "https://other-cluster/v1/AUTH_test/container-1" + mock_container1.sync_key = "shared-secret" + mock_container1.metadata = {"environment": "production"} + + mock_container2 = MagicMock() + mock_container2.name = "container-2" + mock_container2.count = 0 + mock_container2.bytes = 0 + mock_container2.read_ACL = "" + mock_container2.write_ACL = "" + mock_container2.versions_location = "" + mock_container2.history_location = "" + mock_container2.sync_to = "" + mock_container2.sync_key = "" + mock_container2.metadata = {} + + provider.connection.object_store.containers.return_value = [ + mock_container1, + mock_container2, + ] + + # get_container_metadata returns the detailed mock for each container + def mock_get_metadata(name): + return {"container-1": mock_container1, "container-2": mock_container2}[ + name + ] + + provider.connection.object_store.get_container_metadata.side_effect = ( + mock_get_metadata + ) + + service = ObjectStorage(provider) + + assert len(service.containers) == 2 + assert isinstance(service.containers[0], ObjectStorageContainer) + assert service.containers[0].id == "container-1" + assert service.containers[0].name == "container-1" + assert service.containers[0].region == OPENSTACK_REGION + assert service.containers[0].project_id == OPENSTACK_PROJECT_ID + assert service.containers[0].object_count == 10 + assert service.containers[0].bytes_used == 1024 + assert service.containers[0].read_ACL == ".r:*,.rlistings" + assert service.containers[0].write_ACL == "*:*" + assert service.containers[0].versioning_enabled is True + assert service.containers[0].versions_location == "container-1_versions" + assert service.containers[0].history_location == "" + assert ( + service.containers[0].sync_to + == "https://other-cluster/v1/AUTH_test/container-1" + ) + assert service.containers[0].sync_key == "shared-secret" + assert service.containers[0].metadata == {"environment": "production"} + + assert service.containers[1].id == "container-2" + assert service.containers[1].versioning_enabled is False + assert service.containers[1].sync_to == "" + assert service.containers[1].metadata == {} + + def test_objectstorage_list_containers_empty(self): + """Test listing containers when none exist.""" + provider = set_mocked_openstack_provider() + provider.connection.object_store.containers.return_value = [] + + service = ObjectStorage(provider) + + assert service.containers == [] + + def test_objectstorage_list_containers_missing_attributes(self): + """Test listing containers with missing attributes uses fallback to list data.""" + provider = set_mocked_openstack_provider() + + mock_container = MagicMock() + mock_container.name = "container-1" + del mock_container.count + del mock_container.bytes + del mock_container.read_ACL + del mock_container.write_ACL + del mock_container.versions_location + del mock_container.history_location + del mock_container.sync_to + del mock_container.sync_key + del mock_container.metadata + + provider.connection.object_store.containers.return_value = [mock_container] + + # HEAD also returns missing attributes (same mock) + provider.connection.object_store.get_container_metadata.return_value = ( + mock_container + ) + + service = ObjectStorage(provider) + + assert len(service.containers) == 1 + assert service.containers[0].id == "container-1" + assert service.containers[0].name == "container-1" + assert service.containers[0].object_count == 0 + assert service.containers[0].bytes_used == 0 + assert service.containers[0].read_ACL == "" + assert service.containers[0].write_ACL == "" + assert service.containers[0].versioning_enabled is False + assert service.containers[0].versions_location == "" + assert service.containers[0].history_location == "" + assert service.containers[0].sync_to == "" + assert service.containers[0].sync_key == "" + assert service.containers[0].metadata == {} + + def test_objectstorage_list_containers_head_failure_falls_back(self): + """Test that HEAD failure falls back to list data gracefully.""" + provider = set_mocked_openstack_provider() + + mock_container = MagicMock() + mock_container.name = "container-1" + mock_container.count = 5 + mock_container.bytes = 256 + mock_container.read_ACL = None + mock_container.write_ACL = None + mock_container.versions_location = None + mock_container.history_location = None + mock_container.sync_to = None + mock_container.sync_key = None + mock_container.metadata = {} + + provider.connection.object_store.containers.return_value = [mock_container] + provider.connection.object_store.get_container_metadata.side_effect = Exception( + "HEAD failed" + ) + + service = ObjectStorage(provider) + + # Should still create the container using list data as fallback + assert len(service.containers) == 1 + assert service.containers[0].name == "container-1" + assert service.containers[0].object_count == 5 + assert service.containers[0].bytes_used == 256 + + def test_objectstorage_list_containers_sdk_exception(self): + """Test handling SDKException when listing containers.""" + provider = set_mocked_openstack_provider() + provider.connection.object_store.containers.side_effect = ( + openstack_exceptions.SDKException("API error") + ) + + service = ObjectStorage(provider) + + assert service.containers == [] + + def test_objectstorage_list_containers_generic_exception(self): + """Test handling generic exception when listing containers.""" + provider = set_mocked_openstack_provider() + provider.connection.object_store.containers.side_effect = Exception( + "Unexpected error" + ) + + service = ObjectStorage(provider) + + assert service.containers == [] + + def test_objectstorage_container_dataclass_attributes(self): + """Test ObjectStorageContainer dataclass has all required attributes.""" + container = ObjectStorageContainer( + id="container-1", + name="container-1", + region="RegionOne", + project_id="project-1", + object_count=10, + bytes_used=1024, + read_ACL=".r:*", + write_ACL="*:*", + versioning_enabled=True, + versions_location="container-1_versions", + history_location="", + sync_to="https://other-cluster/v1/AUTH_test/container-1", + sync_key="shared-secret", + metadata={"environment": "production"}, + ) + + assert container.id == "container-1" + assert container.name == "container-1" + assert container.region == "RegionOne" + assert container.project_id == "project-1" + assert container.object_count == 10 + assert container.bytes_used == 1024 + assert container.read_ACL == ".r:*" + assert container.write_ACL == "*:*" + assert container.versioning_enabled is True + assert container.versions_location == "container-1_versions" + assert container.history_location == "" + assert container.sync_to == "https://other-cluster/v1/AUTH_test/container-1" + assert container.sync_key == "shared-secret" + assert container.metadata == {"environment": "production"} + + def test_objectstorage_service_inherits_from_base(self): + """Test ObjectStorage service inherits from OpenStackService.""" + provider = set_mocked_openstack_provider() + + with patch.object(ObjectStorage, "_list_containers", return_value=[]): + service = ObjectStorage(provider) + + assert hasattr(service, "service_name") + assert hasattr(service, "provider") + assert hasattr(service, "connection") + assert hasattr(service, "regional_connections") + assert hasattr(service, "audited_regions") + assert hasattr(service, "session") + assert hasattr(service, "region") + assert hasattr(service, "project_id") + assert hasattr(service, "identity") + assert hasattr(service, "audit_config") + assert hasattr(service, "fixer_config") + + def test_objectstorage_list_containers_multi_region(self): + """Test listing containers across multiple regions.""" + provider = set_mocked_openstack_provider() + + # Create two mock connections for two regions + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_container_uk = MagicMock() + mock_container_uk.name = "container-uk" + mock_container_uk.count = 5 + mock_container_uk.bytes = 512 + mock_container_uk.read_ACL = "" + mock_container_uk.write_ACL = "" + mock_container_uk.versions_location = "" + mock_container_uk.history_location = "" + mock_container_uk.sync_to = "" + mock_container_uk.sync_key = "" + mock_container_uk.metadata = {} + + mock_container_de = MagicMock() + mock_container_de.name = "container-de" + mock_container_de.count = 10 + mock_container_de.bytes = 1024 + mock_container_de.read_ACL = ".r:*" + mock_container_de.write_ACL = "" + mock_container_de.versions_location = "" + mock_container_de.history_location = "" + mock_container_de.sync_to = "" + mock_container_de.sync_key = "" + mock_container_de.metadata = {} + + mock_conn_uk1.object_store.containers.return_value = [mock_container_uk] + mock_conn_uk1.object_store.get_container_metadata.return_value = ( + mock_container_uk + ) + mock_conn_de1.object_store.containers.return_value = [mock_container_de] + mock_conn_de1.object_store.get_container_metadata.return_value = ( + mock_container_de + ) + + service = ObjectStorage(provider) + + assert len(service.containers) == 2 + uk_container = next(c for c in service.containers if c.id == "container-uk") + de_container = next(c for c in service.containers if c.id == "container-de") + assert uk_container.region == "UK1" + assert de_container.region == "DE1" + + def test_objectstorage_list_containers_multi_region_partial_failure(self): + """Test that a failing region doesn't prevent other regions from being listed.""" + provider = set_mocked_openstack_provider() + + mock_conn_ok = MagicMock() + mock_conn_fail = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_ok, "DE1": mock_conn_fail} + + mock_container = MagicMock() + mock_container.name = "container-uk" + mock_container.count = 5 + mock_container.bytes = 512 + mock_container.read_ACL = "" + mock_container.write_ACL = "" + mock_container.versions_location = "" + mock_container.history_location = "" + mock_container.sync_to = "" + mock_container.sync_key = "" + mock_container.metadata = {} + + mock_conn_ok.object_store.containers.return_value = [mock_container] + mock_conn_ok.object_store.get_container_metadata.return_value = mock_container + mock_conn_fail.object_store.containers.side_effect = ( + openstack_exceptions.SDKException("API error in DE1") + ) + + service = ObjectStorage(provider) + + assert len(service.containers) == 1 + assert service.containers[0].id == "container-uk" + assert service.containers[0].region == "UK1" + + def test_objectstorage_list_containers_multi_region_one_empty(self): + """Test multi-region where one region has containers and the other is empty.""" + provider = set_mocked_openstack_provider() + + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_container = MagicMock() + mock_container.name = "container-uk" + mock_container.count = 5 + mock_container.bytes = 512 + mock_container.read_ACL = "" + mock_container.write_ACL = "" + mock_container.versions_location = "" + mock_container.history_location = "" + mock_container.sync_to = "" + mock_container.sync_key = "" + mock_container.metadata = {} + + mock_conn_uk1.object_store.containers.return_value = [mock_container] + mock_conn_uk1.object_store.get_container_metadata.return_value = mock_container + mock_conn_de1.object_store.containers.return_value = [] + + service = ObjectStorage(provider) + + assert len(service.containers) == 1 + assert service.containers[0].id == "container-uk" + assert service.containers[0].region == "UK1" + + def test_objectstorage_list_containers_history_location_versioning(self): + """Test that history_location (X-History-Location) enables versioning.""" + provider = set_mocked_openstack_provider() + + mock_container = MagicMock() + mock_container.name = "history-container" + mock_container.count = 3 + mock_container.bytes = 256 + mock_container.read_ACL = "" + mock_container.write_ACL = "" + mock_container.versions_location = "" + mock_container.history_location = "history-container_versions" + mock_container.sync_to = "" + mock_container.sync_key = "" + mock_container.metadata = {} + + provider.connection.object_store.containers.return_value = [mock_container] + provider.connection.object_store.get_container_metadata.return_value = ( + mock_container + ) + + service = ObjectStorage(provider) + + assert len(service.containers) == 1 + assert service.containers[0].versioning_enabled is True + assert service.containers[0].versions_location == "" + assert service.containers[0].history_location == "history-container_versions"