feat(openstack): object storage service with 7 new checks (#10258)

This commit is contained in:
Daniel Barranquero
2026-03-11 12:00:43 +01:00
committed by GitHub
parent cc0d83de91
commit e28bde797f
33 changed files with 2887 additions and 0 deletions

View File

@@ -0,0 +1,283 @@
"""Tests for objectstorage_container_acl_not_globally_shared check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_acl_not_globally_shared:
"""Test suite for objectstorage_container_acl_not_globally_shared check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import (
objectstorage_container_acl_not_globally_shared,
)
check = objectstorage_container_acl_not_globally_shared()
result = check.execute()
assert len(result) == 0
def test_container_not_globally_shared(self):
"""Test container without global sharing (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="project-scoped",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="project-123:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import (
objectstorage_container_acl_not_globally_shared,
)
check = objectstorage_container_acl_not_globally_shared()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container project-scoped read ACL is not globally shared."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "project-scoped"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_globally_shared(self):
"""Test container with global sharing (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="global-shared",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL="*:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import (
objectstorage_container_acl_not_globally_shared,
)
check = objectstorage_container_acl_not_globally_shared()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container global-shared has globally shared read ACL (*:*) allowing all authenticated users from any project."
)
assert result[0].resource_id == "container-2"
assert result[0].resource_name == "global-shared"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_globally_shared_bare_wildcard(self):
"""Test container with * (bare wildcard) read ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-3",
name="bare-wildcard",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import (
objectstorage_container_acl_not_globally_shared,
)
check = objectstorage_container_acl_not_globally_shared()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_container_star_colon_star_in_multi_entry_acl(self):
"""Test container with *:* in multi-entry read ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-4",
name="multi-entry-global",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="project-123:user-456,*:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import (
objectstorage_container_acl_not_globally_shared,
)
check = objectstorage_container_acl_not_globally_shared()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed ACLs."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Pass",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
ObjectStorageContainer(
id="container-fail",
name="Fail",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="*:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_acl_not_globally_shared.objectstorage_container_acl_not_globally_shared import (
objectstorage_container_acl_not_globally_shared,
)
check = objectstorage_container_acl_not_globally_shared()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,334 @@
"""Tests for objectstorage_container_listing_disabled check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_listing_disabled:
"""Test suite for objectstorage_container_listing_disabled check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 0
def test_container_no_listing(self):
"""Test container without public listing (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="no-listing",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container no-listing does not have public listing enabled."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "no-listing"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_with_listing(self):
"""Test container with public listing (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="public-listing",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL=".r:*,.rlistings",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container public-listing has public listing enabled (.rlistings) allowing anonymous object enumeration."
)
assert result[0].resource_id == "container-2"
assert result[0].resource_name == "public-listing"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_listing_via_global_acl_star_colon_star(self):
"""Test container with *:* read ACL enabling listing (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-3",
name="global-acl-listing",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL="*:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container global-acl-listing has listing enabled via global read ACL (*:*) allowing all authenticated users to list objects."
)
assert result[0].resource_id == "container-3"
assert result[0].resource_name == "global-acl-listing"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_listing_via_bare_wildcard(self):
"""Test container with * read ACL enabling listing (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-4",
name="bare-wildcard-listing",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_container_rlistings_takes_priority_over_global(self):
"""Test that .rlistings is reported when both .rlistings and *:* are present."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-5",
name="both-patterns",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL=".rlistings,*:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert ".rlistings" in result[0].status_extended
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed listing status."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Pass",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL=".r:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
ObjectStorageContainer(
id="container-fail",
name="Fail",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL=".r:*,.rlistings",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_listing_disabled.objectstorage_container_listing_disabled import (
objectstorage_container_listing_disabled,
)
check = objectstorage_container_listing_disabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,243 @@
"""Tests for objectstorage_container_metadata_sensitive_data check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_metadata_sensitive_data:
"""Test suite for objectstorage_container_metadata_sensitive_data check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
objectstorage_client.audit_config = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import (
objectstorage_container_metadata_sensitive_data,
)
check = objectstorage_container_metadata_sensitive_data()
result = check.execute()
assert len(result) == 0
def test_container_no_metadata(self):
"""Test container with no metadata (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.audit_config = {}
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="no-metadata",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import (
objectstorage_container_metadata_sensitive_data,
)
check = objectstorage_container_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container no-metadata has no metadata (no sensitive data exposure risk)."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "no-metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_safe_metadata(self):
"""Test container with safe metadata (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.audit_config = {}
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="safe-metadata",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={"environment": "production", "application": "web-app"},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import (
objectstorage_container_metadata_sensitive_data,
)
check = objectstorage_container_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container safe-metadata metadata does not contain sensitive data."
)
def test_container_password_in_metadata(self):
"""Test container with password in metadata (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.audit_config = {}
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-3",
name="password-metadata",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={"db_password": "supersecret123"},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import (
objectstorage_container_metadata_sensitive_data,
)
check = objectstorage_container_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "contains potential secrets" in result[0].status_extended
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed metadata."""
objectstorage_client = mock.MagicMock()
objectstorage_client.audit_config = {}
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Safe",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={"tier": "web"},
),
ObjectStorageContainer(
id="container-fail",
name="Unsafe",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={"admin_password": "secret123"},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_metadata_sensitive_data.objectstorage_container_metadata_sensitive_data import (
objectstorage_container_metadata_sensitive_data,
)
check = objectstorage_container_metadata_sensitive_data()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,245 @@
"""Tests for objectstorage_container_public_read_acl_disabled check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_public_read_acl_disabled:
"""Test suite for objectstorage_container_public_read_acl_disabled check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import (
objectstorage_container_public_read_acl_disabled,
)
check = objectstorage_container_public_read_acl_disabled()
result = check.execute()
assert len(result) == 0
def test_container_no_public_read(self):
"""Test container without public read ACL (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="private-container",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import (
objectstorage_container_public_read_acl_disabled,
)
check = objectstorage_container_public_read_acl_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container private-container does not have public read ACL."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "private-container"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_public_read(self):
"""Test container with public read ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="public-container",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL=".r:*,.rlistings",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import (
objectstorage_container_public_read_acl_disabled,
)
check = objectstorage_container_public_read_acl_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container public-container has public read ACL (.r:*) allowing anonymous access."
)
assert result[0].resource_id == "container-2"
assert result[0].resource_name == "public-container"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_domain_restricted_referrer_not_flagged(self):
"""Test container with domain-restricted referrer ACL is not flagged (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-3",
name="domain-restricted",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL=".r:*.example.com,.rlistings",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import (
objectstorage_container_public_read_acl_disabled,
)
check = objectstorage_container_public_read_acl_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container domain-restricted does not have public read ACL."
)
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed ACLs."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Pass",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
ObjectStorageContainer(
id="container-fail",
name="Fail",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL=".r:*",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_public_read_acl_disabled.objectstorage_container_public_read_acl_disabled import (
objectstorage_container_public_read_acl_disabled,
)
check = objectstorage_container_public_read_acl_disabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,199 @@
"""Tests for objectstorage_container_sync_not_enabled check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_sync_not_enabled:
"""Test suite for objectstorage_container_sync_not_enabled check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import (
objectstorage_container_sync_not_enabled,
)
check = objectstorage_container_sync_not_enabled()
result = check.execute()
assert len(result) == 0
def test_container_no_sync(self):
"""Test container without sync (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="no-sync",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import (
objectstorage_container_sync_not_enabled,
)
check = objectstorage_container_sync_not_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container no-sync does not have container sync enabled."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "no-sync"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_with_sync(self):
"""Test container with sync enabled (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="synced-container",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="https://other-cluster/v1/AUTH_test/container-2",
sync_key="shared-secret",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import (
objectstorage_container_sync_not_enabled,
)
check = objectstorage_container_sync_not_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container synced-container has container sync enabled (sync target: https://other-cluster/v1/AUTH_test/container-2)."
)
assert result[0].resource_id == "container-2"
assert result[0].resource_name == "synced-container"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed sync status."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Pass",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
ObjectStorageContainer(
id="container-fail",
name="Fail",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="https://external/v1/AUTH_test/container",
sync_key="key",
metadata={},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_sync_not_enabled.objectstorage_container_sync_not_enabled import (
objectstorage_container_sync_not_enabled,
)
check = objectstorage_container_sync_not_enabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,249 @@
"""Tests for objectstorage_container_versioning_enabled check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_versioning_enabled:
"""Test suite for objectstorage_container_versioning_enabled check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import (
objectstorage_container_versioning_enabled,
)
check = objectstorage_container_versioning_enabled()
result = check.execute()
assert len(result) == 0
def test_container_versioning_enabled_versions_location(self):
"""Test container with versioning enabled via X-Versions-Location (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="versioned-container",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="",
versioning_enabled=True,
versions_location="versioned-container_versions",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import (
objectstorage_container_versioning_enabled,
)
check = objectstorage_container_versioning_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container versioned-container has versioning enabled (versions location: versioned-container_versions)."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "versioned-container"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_versioning_enabled_history_location(self):
"""Test container with versioning enabled via X-History-Location (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="history-container",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="",
versioning_enabled=True,
versions_location="",
history_location="history-container_versions",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import (
objectstorage_container_versioning_enabled,
)
check = objectstorage_container_versioning_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container history-container has versioning enabled (history location: history-container_versions)."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "history-container"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_versioning_disabled(self):
"""Test container without versioning (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="no-versioning",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import (
objectstorage_container_versioning_enabled,
)
check = objectstorage_container_versioning_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container no-versioning does not have versioning enabled."
)
assert result[0].resource_id == "container-2"
assert result[0].resource_name == "no-versioning"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed versioning status."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Pass",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=True,
versions_location="Pass_versions",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
ObjectStorageContainer(
id="container-fail",
name="Fail",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_versioning_enabled.objectstorage_container_versioning_enabled import (
objectstorage_container_versioning_enabled,
)
check = objectstorage_container_versioning_enabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,329 @@
"""Tests for objectstorage_container_write_acl_restricted check."""
from unittest import mock
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_objectstorage_container_write_acl_restricted:
"""Test suite for objectstorage_container_write_acl_restricted check."""
def test_no_containers(self):
"""Test when no containers exist."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 0
def test_container_restricted_write(self):
"""Test container with restricted write ACL (PASS)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-1",
name="restricted-write",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=10,
bytes_used=1024,
read_ACL="",
write_ACL="project-123:user-456",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Container restricted-write has restricted write ACL."
)
assert result[0].resource_id == "container-1"
assert result[0].resource_name == "restricted-write"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_unrestricted_write_star_colon_star(self):
"""Test container with *:* write ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-2",
name="unrestricted-write",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=5,
bytes_used=512,
read_ACL="",
write_ACL="*:*",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container unrestricted-write has unrestricted write ACL allowing all authenticated users to write."
)
assert result[0].resource_id == "container-2"
assert result[0].resource_name == "unrestricted-write"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_container_unrestricted_write_star_only(self):
"""Test container with * write ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-3",
name="star-write",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="*",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_container_star_in_multi_entry_acl(self):
"""Test container with * in multi-entry write ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-4",
name="star-multi-entry",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="*,project-123:user-456",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Container star-multi-entry has unrestricted write ACL allowing all authenticated users to write."
)
def test_container_star_colon_star_in_multi_entry_acl(self):
"""Test container with *:* in multi-entry write ACL (FAIL)."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-5",
name="star-colon-multi",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="project-123:user-456,*:*",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
def test_multiple_containers_mixed(self):
"""Test multiple containers with mixed write ACLs."""
objectstorage_client = mock.MagicMock()
objectstorage_client.containers = [
ObjectStorageContainer(
id="container-pass",
name="Pass",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
ObjectStorageContainer(
id="container-fail",
name="Fail",
region=OPENSTACK_REGION,
project_id=OPENSTACK_PROJECT_ID,
object_count=0,
bytes_used=0,
read_ACL="",
write_ACL="*:*",
versioning_enabled=False,
versions_location="",
history_location="",
sync_to="",
sync_key="",
metadata={},
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted.objectstorage_client",
new=objectstorage_client,
),
):
from prowler.providers.openstack.services.objectstorage.objectstorage_container_write_acl_restricted.objectstorage_container_write_acl_restricted import (
objectstorage_container_write_acl_restricted,
)
check = objectstorage_container_write_acl_restricted()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,403 @@
"""Tests for OpenStack ObjectStorage service."""
from unittest.mock import MagicMock, patch
from openstack import exceptions as openstack_exceptions
from prowler.providers.openstack.services.objectstorage.objectstorage_service import (
ObjectStorage,
ObjectStorageContainer,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class TestObjectStorageService:
"""Test suite for ObjectStorage service."""
def test_objectstorage_service_initialization(self):
"""Test ObjectStorage service initializes correctly."""
provider = set_mocked_openstack_provider()
with patch.object(
ObjectStorage, "_list_containers", return_value=[]
) as mock_list:
service = ObjectStorage(provider)
assert service.service_name == "ObjectStorage"
assert service.provider == provider
assert service.connection == provider.connection
assert service.regional_connections == provider.regional_connections
assert service.audited_regions == [OPENSTACK_REGION]
assert service.region == OPENSTACK_REGION
assert service.project_id == OPENSTACK_PROJECT_ID
assert service.containers == []
mock_list.assert_called_once()
def test_objectstorage_list_containers_success(self):
"""Test listing containers successfully."""
provider = set_mocked_openstack_provider()
mock_container1 = MagicMock()
mock_container1.name = "container-1"
mock_container1.count = 10
mock_container1.bytes = 1024
mock_container1.read_ACL = ".r:*,.rlistings"
mock_container1.write_ACL = "*:*"
mock_container1.versions_location = "container-1_versions"
mock_container1.history_location = ""
mock_container1.sync_to = "https://other-cluster/v1/AUTH_test/container-1"
mock_container1.sync_key = "shared-secret"
mock_container1.metadata = {"environment": "production"}
mock_container2 = MagicMock()
mock_container2.name = "container-2"
mock_container2.count = 0
mock_container2.bytes = 0
mock_container2.read_ACL = ""
mock_container2.write_ACL = ""
mock_container2.versions_location = ""
mock_container2.history_location = ""
mock_container2.sync_to = ""
mock_container2.sync_key = ""
mock_container2.metadata = {}
provider.connection.object_store.containers.return_value = [
mock_container1,
mock_container2,
]
# get_container_metadata returns the detailed mock for each container
def mock_get_metadata(name):
return {"container-1": mock_container1, "container-2": mock_container2}[
name
]
provider.connection.object_store.get_container_metadata.side_effect = (
mock_get_metadata
)
service = ObjectStorage(provider)
assert len(service.containers) == 2
assert isinstance(service.containers[0], ObjectStorageContainer)
assert service.containers[0].id == "container-1"
assert service.containers[0].name == "container-1"
assert service.containers[0].region == OPENSTACK_REGION
assert service.containers[0].project_id == OPENSTACK_PROJECT_ID
assert service.containers[0].object_count == 10
assert service.containers[0].bytes_used == 1024
assert service.containers[0].read_ACL == ".r:*,.rlistings"
assert service.containers[0].write_ACL == "*:*"
assert service.containers[0].versioning_enabled is True
assert service.containers[0].versions_location == "container-1_versions"
assert service.containers[0].history_location == ""
assert (
service.containers[0].sync_to
== "https://other-cluster/v1/AUTH_test/container-1"
)
assert service.containers[0].sync_key == "shared-secret"
assert service.containers[0].metadata == {"environment": "production"}
assert service.containers[1].id == "container-2"
assert service.containers[1].versioning_enabled is False
assert service.containers[1].sync_to == ""
assert service.containers[1].metadata == {}
def test_objectstorage_list_containers_empty(self):
"""Test listing containers when none exist."""
provider = set_mocked_openstack_provider()
provider.connection.object_store.containers.return_value = []
service = ObjectStorage(provider)
assert service.containers == []
def test_objectstorage_list_containers_missing_attributes(self):
"""Test listing containers with missing attributes uses fallback to list data."""
provider = set_mocked_openstack_provider()
mock_container = MagicMock()
mock_container.name = "container-1"
del mock_container.count
del mock_container.bytes
del mock_container.read_ACL
del mock_container.write_ACL
del mock_container.versions_location
del mock_container.history_location
del mock_container.sync_to
del mock_container.sync_key
del mock_container.metadata
provider.connection.object_store.containers.return_value = [mock_container]
# HEAD also returns missing attributes (same mock)
provider.connection.object_store.get_container_metadata.return_value = (
mock_container
)
service = ObjectStorage(provider)
assert len(service.containers) == 1
assert service.containers[0].id == "container-1"
assert service.containers[0].name == "container-1"
assert service.containers[0].object_count == 0
assert service.containers[0].bytes_used == 0
assert service.containers[0].read_ACL == ""
assert service.containers[0].write_ACL == ""
assert service.containers[0].versioning_enabled is False
assert service.containers[0].versions_location == ""
assert service.containers[0].history_location == ""
assert service.containers[0].sync_to == ""
assert service.containers[0].sync_key == ""
assert service.containers[0].metadata == {}
def test_objectstorage_list_containers_head_failure_falls_back(self):
"""Test that HEAD failure falls back to list data gracefully."""
provider = set_mocked_openstack_provider()
mock_container = MagicMock()
mock_container.name = "container-1"
mock_container.count = 5
mock_container.bytes = 256
mock_container.read_ACL = None
mock_container.write_ACL = None
mock_container.versions_location = None
mock_container.history_location = None
mock_container.sync_to = None
mock_container.sync_key = None
mock_container.metadata = {}
provider.connection.object_store.containers.return_value = [mock_container]
provider.connection.object_store.get_container_metadata.side_effect = Exception(
"HEAD failed"
)
service = ObjectStorage(provider)
# Should still create the container using list data as fallback
assert len(service.containers) == 1
assert service.containers[0].name == "container-1"
assert service.containers[0].object_count == 5
assert service.containers[0].bytes_used == 256
def test_objectstorage_list_containers_sdk_exception(self):
"""Test handling SDKException when listing containers."""
provider = set_mocked_openstack_provider()
provider.connection.object_store.containers.side_effect = (
openstack_exceptions.SDKException("API error")
)
service = ObjectStorage(provider)
assert service.containers == []
def test_objectstorage_list_containers_generic_exception(self):
"""Test handling generic exception when listing containers."""
provider = set_mocked_openstack_provider()
provider.connection.object_store.containers.side_effect = Exception(
"Unexpected error"
)
service = ObjectStorage(provider)
assert service.containers == []
def test_objectstorage_container_dataclass_attributes(self):
"""Test ObjectStorageContainer dataclass has all required attributes."""
container = ObjectStorageContainer(
id="container-1",
name="container-1",
region="RegionOne",
project_id="project-1",
object_count=10,
bytes_used=1024,
read_ACL=".r:*",
write_ACL="*:*",
versioning_enabled=True,
versions_location="container-1_versions",
history_location="",
sync_to="https://other-cluster/v1/AUTH_test/container-1",
sync_key="shared-secret",
metadata={"environment": "production"},
)
assert container.id == "container-1"
assert container.name == "container-1"
assert container.region == "RegionOne"
assert container.project_id == "project-1"
assert container.object_count == 10
assert container.bytes_used == 1024
assert container.read_ACL == ".r:*"
assert container.write_ACL == "*:*"
assert container.versioning_enabled is True
assert container.versions_location == "container-1_versions"
assert container.history_location == ""
assert container.sync_to == "https://other-cluster/v1/AUTH_test/container-1"
assert container.sync_key == "shared-secret"
assert container.metadata == {"environment": "production"}
def test_objectstorage_service_inherits_from_base(self):
"""Test ObjectStorage service inherits from OpenStackService."""
provider = set_mocked_openstack_provider()
with patch.object(ObjectStorage, "_list_containers", return_value=[]):
service = ObjectStorage(provider)
assert hasattr(service, "service_name")
assert hasattr(service, "provider")
assert hasattr(service, "connection")
assert hasattr(service, "regional_connections")
assert hasattr(service, "audited_regions")
assert hasattr(service, "session")
assert hasattr(service, "region")
assert hasattr(service, "project_id")
assert hasattr(service, "identity")
assert hasattr(service, "audit_config")
assert hasattr(service, "fixer_config")
def test_objectstorage_list_containers_multi_region(self):
"""Test listing containers across multiple regions."""
provider = set_mocked_openstack_provider()
# Create two mock connections for two regions
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_container_uk = MagicMock()
mock_container_uk.name = "container-uk"
mock_container_uk.count = 5
mock_container_uk.bytes = 512
mock_container_uk.read_ACL = ""
mock_container_uk.write_ACL = ""
mock_container_uk.versions_location = ""
mock_container_uk.history_location = ""
mock_container_uk.sync_to = ""
mock_container_uk.sync_key = ""
mock_container_uk.metadata = {}
mock_container_de = MagicMock()
mock_container_de.name = "container-de"
mock_container_de.count = 10
mock_container_de.bytes = 1024
mock_container_de.read_ACL = ".r:*"
mock_container_de.write_ACL = ""
mock_container_de.versions_location = ""
mock_container_de.history_location = ""
mock_container_de.sync_to = ""
mock_container_de.sync_key = ""
mock_container_de.metadata = {}
mock_conn_uk1.object_store.containers.return_value = [mock_container_uk]
mock_conn_uk1.object_store.get_container_metadata.return_value = (
mock_container_uk
)
mock_conn_de1.object_store.containers.return_value = [mock_container_de]
mock_conn_de1.object_store.get_container_metadata.return_value = (
mock_container_de
)
service = ObjectStorage(provider)
assert len(service.containers) == 2
uk_container = next(c for c in service.containers if c.id == "container-uk")
de_container = next(c for c in service.containers if c.id == "container-de")
assert uk_container.region == "UK1"
assert de_container.region == "DE1"
def test_objectstorage_list_containers_multi_region_partial_failure(self):
"""Test that a failing region doesn't prevent other regions from being listed."""
provider = set_mocked_openstack_provider()
mock_conn_ok = MagicMock()
mock_conn_fail = MagicMock()
provider.regional_connections = {"UK1": mock_conn_ok, "DE1": mock_conn_fail}
mock_container = MagicMock()
mock_container.name = "container-uk"
mock_container.count = 5
mock_container.bytes = 512
mock_container.read_ACL = ""
mock_container.write_ACL = ""
mock_container.versions_location = ""
mock_container.history_location = ""
mock_container.sync_to = ""
mock_container.sync_key = ""
mock_container.metadata = {}
mock_conn_ok.object_store.containers.return_value = [mock_container]
mock_conn_ok.object_store.get_container_metadata.return_value = mock_container
mock_conn_fail.object_store.containers.side_effect = (
openstack_exceptions.SDKException("API error in DE1")
)
service = ObjectStorage(provider)
assert len(service.containers) == 1
assert service.containers[0].id == "container-uk"
assert service.containers[0].region == "UK1"
def test_objectstorage_list_containers_multi_region_one_empty(self):
"""Test multi-region where one region has containers and the other is empty."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_container = MagicMock()
mock_container.name = "container-uk"
mock_container.count = 5
mock_container.bytes = 512
mock_container.read_ACL = ""
mock_container.write_ACL = ""
mock_container.versions_location = ""
mock_container.history_location = ""
mock_container.sync_to = ""
mock_container.sync_key = ""
mock_container.metadata = {}
mock_conn_uk1.object_store.containers.return_value = [mock_container]
mock_conn_uk1.object_store.get_container_metadata.return_value = mock_container
mock_conn_de1.object_store.containers.return_value = []
service = ObjectStorage(provider)
assert len(service.containers) == 1
assert service.containers[0].id == "container-uk"
assert service.containers[0].region == "UK1"
def test_objectstorage_list_containers_history_location_versioning(self):
"""Test that history_location (X-History-Location) enables versioning."""
provider = set_mocked_openstack_provider()
mock_container = MagicMock()
mock_container.name = "history-container"
mock_container.count = 3
mock_container.bytes = 256
mock_container.read_ACL = ""
mock_container.write_ACL = ""
mock_container.versions_location = ""
mock_container.history_location = "history-container_versions"
mock_container.sync_to = ""
mock_container.sync_key = ""
mock_container.metadata = {}
provider.connection.object_store.containers.return_value = [mock_container]
provider.connection.object_store.get_container_metadata.return_value = (
mock_container
)
service = ObjectStorage(provider)
assert len(service.containers) == 1
assert service.containers[0].versioning_enabled is True
assert service.containers[0].versions_location == ""
assert service.containers[0].history_location == "history-container_versions"