feat(openstack): add blockstorage service with 7 checks (#10120)

Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Daniel Barranquero
2026-03-02 12:08:08 +01:00
committed by GitHub
parent d3ba93f0c0
commit 8eddb48b16
33 changed files with 3111 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `entra_seamless_sso_disabled` check for m365 provider [(#10086)](https://github.com/prowler-cloud/prowler/pull/10086) - `entra_seamless_sso_disabled` check for m365 provider [(#10086)](https://github.com/prowler-cloud/prowler/pull/10086)
- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985) - Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107) - Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)
- Openstack block storage 7 new checks [(#10120)](https://github.com/prowler-cloud/prowler/pull/10120)
- SecNumCloud compliance framework for the AWS provider [(#10117)](https://github.com/prowler-cloud/prowler/pull/10117) - SecNumCloud compliance framework for the AWS provider [(#10117)](https://github.com/prowler-cloud/prowler/pull/10117)
- CIS 6.0 for the AWS provider [(#10127)](https://github.com/prowler-cloud/prowler/pull/10127) - CIS 6.0 for the AWS provider [(#10127)](https://github.com/prowler-cloud/prowler/pull/10127)
- `entra_require_mfa_for_management_api` check for m365 provider [(#10150)](https://github.com/prowler-cloud/prowler/pull/10150) - `entra_require_mfa_for_management_api` check for m365 provider [(#10150)](https://github.com/prowler-cloud/prowler/pull/10150)

View File

@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
BlockStorage,
)
blockstorage_client = BlockStorage(Provider.get_global_provider())

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List
from openstack import exceptions as openstack_exceptions
from prowler.lib.logger import logger
from prowler.providers.openstack.lib.service.service import OpenStackService
class BlockStorage(OpenStackService):
"""Service wrapper using openstacksdk block storage (Cinder) APIs."""
def __init__(self, provider) -> None:
super().__init__(__class__.__name__, provider)
self.volumes: List[VolumeResource] = []
self.snapshots: List[SnapshotResource] = []
self.backups: List[BackupResource] = []
self._list_volumes()
self._list_snapshots()
self._list_backups()
def _list_volumes(self) -> None:
"""List all block storage volumes across all audited regions."""
logger.info("BlockStorage - Listing volumes...")
for region, conn in self.regional_connections.items():
try:
for volume in conn.block_storage.volumes():
attachments = getattr(volume, "attachments", []) or []
self.volumes.append(
VolumeResource(
id=getattr(volume, "id", ""),
name=getattr(volume, "name", ""),
status=getattr(volume, "status", ""),
size=getattr(volume, "size", 0),
volume_type=getattr(volume, "volume_type", ""),
is_encrypted=getattr(volume, "is_encrypted", False),
is_bootable=str(
getattr(volume, "is_bootable", "false")
).lower()
== "true",
is_multiattach=getattr(volume, "is_multiattach", False),
attachments=attachments,
metadata=getattr(volume, "metadata", {}),
availability_zone=getattr(volume, "availability_zone", ""),
snapshot_id=getattr(volume, "snapshot_id", "") or "",
source_volume_id=getattr(volume, "source_volume_id", "")
or "",
project_id=self.project_id,
region=region,
)
)
except openstack_exceptions.SDKException as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Failed to list block storage volumes in region {region}: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Unexpected error listing block storage volumes in region {region}: {error}"
)
def _list_snapshots(self) -> None:
"""List all block storage snapshots across all audited regions."""
logger.info("BlockStorage - Listing snapshots...")
for region, conn in self.regional_connections.items():
try:
for snapshot in conn.block_storage.snapshots():
self.snapshots.append(
SnapshotResource(
id=getattr(snapshot, "id", ""),
name=getattr(snapshot, "name", ""),
status=getattr(snapshot, "status", ""),
size=getattr(snapshot, "size", 0),
volume_id=getattr(snapshot, "volume_id", ""),
metadata=getattr(snapshot, "metadata", {}),
project_id=self.project_id,
region=region,
)
)
except openstack_exceptions.SDKException as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Failed to list block storage snapshots in region {region}: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Unexpected error listing block storage snapshots in region {region}: {error}"
)
def _list_backups(self) -> None:
"""List all block storage backups across all audited regions."""
logger.info("BlockStorage - Listing backups...")
for region, conn in self.regional_connections.items():
try:
for backup in conn.block_storage.backups():
self.backups.append(
BackupResource(
id=getattr(backup, "id", ""),
name=getattr(backup, "name", ""),
status=getattr(backup, "status", ""),
size=getattr(backup, "size", 0),
volume_id=getattr(backup, "volume_id", ""),
is_incremental=getattr(backup, "is_incremental", False),
availability_zone=getattr(backup, "availability_zone", ""),
project_id=self.project_id,
region=region,
)
)
except openstack_exceptions.SDKException as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Failed to list block storage backups in region {region}: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
f"Unexpected error listing block storage backups in region {region}: {error}"
)
@dataclass
class VolumeResource:
"""Represents an OpenStack block storage volume."""
id: str
name: str
status: str
size: int
volume_type: str
is_encrypted: bool
is_bootable: bool
is_multiattach: bool
attachments: List[Dict]
metadata: Dict[str, str]
availability_zone: str
snapshot_id: str
source_volume_id: str
project_id: str
region: str
@dataclass
class SnapshotResource:
"""Represents an OpenStack block storage snapshot."""
id: str
name: str
status: str
size: int
volume_id: str
metadata: Dict[str, str]
project_id: str
region: str
@dataclass
class BackupResource:
"""Represents an OpenStack block storage backup."""
id: str
name: str
status: str
size: int
volume_id: str
is_incremental: bool
availability_zone: str
project_id: str
region: str

View File

@@ -0,0 +1,40 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_snapshot_metadata_sensitive_data",
"CheckTitle": "Block storage snapshot metadata does not contain sensitive data",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "critical",
"ResourceType": "OS::Cinder::Snapshot",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage snapshot metadata** is evaluated to detect **sensitive data** such as passwords, API keys, secrets, and private keys. Snapshot metadata is accessible via the OpenStack API to any user with access to the project. Storing secrets in metadata exposes them to unauthorized access through the API.",
"Risk": "Snapshot metadata containing sensitive data exposes credentials through the OpenStack API, accessible to any project member. Attackers with project access can enumerate snapshot metadata to extract passwords, API keys, and private keys. Stolen credentials enable unauthorized modifications, privilege escalation, and data breaches.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/cli/cli-manage-volumes.html",
"https://cheatsheetseries.owasp.org/cheatsheets/Secrets_Management_Cheat_Sheet.html"
],
"Remediation": {
"Code": {
"CLI": "openstack volume snapshot unset --property <sensitive_key> <snapshot_id>",
"NativeIaC": "",
"Other": "1. Navigate to **Block Storage > Snapshots**\n2. Select snapshot with sensitive metadata\n3. Remove sensitive metadata keys using CLI command\n4. Rotate exposed credentials immediately\n5. Store secrets in Barbican or external secrets manager instead",
"Terraform": ""
},
"Recommendation": {
"Text": "Never store secrets in snapshot metadata; use Barbican (OpenStack Key Manager), Vault, or external secrets management instead. Remove any sensitive data currently stored in snapshot metadata and rotate exposed credentials immediately. Implement metadata policies to prevent sensitive data from being added.",
"Url": "https://hub.prowler.com/check/blockstorage_snapshot_metadata_sensitive_data"
}
},
"Categories": [
"secrets",
"encryption"
],
"DependsOn": [],
"RelatedTo": [
"blockstorage_volume_metadata_sensitive_data"
],
"Notes": "This check uses the detect-secrets library to scan for credentials. May produce false positives on metadata keys containing secret-like keywords. Findings should be reviewed manually. The audit_config allows configuring secrets_ignore_patterns to exclude specific patterns and detect_secrets_plugins to customize detection."
}

View File

@@ -0,0 +1,62 @@
import json
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.lib.utils.utils import detect_secrets_scan
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_snapshot_metadata_sensitive_data(Check):
"""Ensure block storage snapshot metadata does not contain sensitive data like passwords or API keys."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
secrets_ignore_patterns = blockstorage_client.audit_config.get(
"secrets_ignore_patterns", []
)
for snapshot in blockstorage_client.snapshots:
report = CheckReportOpenStack(metadata=self.metadata(), resource=snapshot)
report.status = "PASS"
report.status_extended = f"Snapshot {snapshot.name} ({snapshot.id}) metadata does not contain sensitive data."
if snapshot.metadata:
# Build metadata dict and parallel list of keys
dump_metadata = {}
original_metadata_keys = []
for key, value in snapshot.metadata.items():
dump_metadata[key] = value
original_metadata_keys.append(key)
# Convert metadata dict to JSON string for detect-secrets scanning
metadata_json = json.dumps(dump_metadata, indent=2)
detect_secrets_output = detect_secrets_scan(
data=metadata_json,
excluded_secrets=secrets_ignore_patterns,
detect_secrets_plugins=blockstorage_client.audit_config.get(
"detect_secrets_plugins"
),
)
if detect_secrets_output:
# Map line numbers back to metadata keys using the parallel list
# Line numbering: line 1 = "{", line 2 = first key-value, etc.
secrets_string = ", ".join(
[
f"{secret['type']} in metadata key '{original_metadata_keys[secret['line_number'] - 2]}'"
for secret in detect_secrets_output
if 0
<= secret["line_number"] - 2
< len(original_metadata_keys)
]
)
report.status = "FAIL"
report.status_extended = f"Snapshot {snapshot.name} ({snapshot.id}) metadata contains potential secrets -> {secrets_string}."
else:
report.status_extended = f"Snapshot {snapshot.name} ({snapshot.id}) has no metadata (no sensitive data exposure risk)."
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_snapshot_not_orphaned",
"CheckTitle": "Block storage snapshots reference existing volumes",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "OS::Cinder::Snapshot",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage snapshots** are evaluated to verify they **reference existing volumes**. Orphaned snapshots (whose source volumes have been deleted) may contain stale data, incur unnecessary storage costs, and can be overlooked during security reviews. They may also contain sensitive data from deleted volumes that is no longer being managed.",
"Risk": "Orphaned snapshots may contain sensitive data from deleted volumes that is no longer actively managed or monitored. These snapshots continue to consume storage resources and may be restored by unauthorized users to access old data. They can be overlooked during security audits and compliance reviews.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/cli/cli-manage-volumes.html"
],
"Remediation": {
"Code": {
"CLI": "openstack volume snapshot list\nopenstack volume snapshot delete <snapshot_id>",
"NativeIaC": "",
"Other": "1. Navigate to **Block Storage > Snapshots**\n2. Identify snapshots whose source volumes no longer exist\n3. Review each orphaned snapshot for necessity\n4. Back up data if needed by creating a volume from the snapshot\n5. Delete orphaned snapshots that are no longer needed",
"Terraform": ""
},
"Recommendation": {
"Text": "Review orphaned snapshots regularly and delete those no longer needed. Before deleting a volume, review and clean up associated snapshots. Implement snapshot lifecycle policies to prevent accumulation of orphaned snapshots. Tag snapshots with ownership and purpose metadata.",
"Url": "https://hub.prowler.com/check/blockstorage_snapshot_not_orphaned"
}
},
"Categories": [
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Orphaned snapshots may be intentionally retained for data recovery or compliance purposes. This check identifies snapshots referencing non-existent volumes for review. Organizations should evaluate each orphaned snapshot based on retention policies."
}

View File

@@ -0,0 +1,32 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_snapshot_not_orphaned(Check):
"""Ensure block storage snapshots reference existing volumes."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
# Build set of existing volume IDs
existing_volume_ids = {volume.id for volume in blockstorage_client.volumes}
for snapshot in blockstorage_client.snapshots:
report = CheckReportOpenStack(metadata=self.metadata(), resource=snapshot)
if snapshot.volume_id in existing_volume_ids:
report.status = "PASS"
report.status_extended = f"Snapshot {snapshot.name} ({snapshot.id}) references existing volume {snapshot.volume_id}."
else:
report.status = "FAIL"
report.status_extended = (
f"Snapshot {snapshot.name} ({snapshot.id}) references non-existent volume "
f"{snapshot.volume_id} and may be orphaned."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_volume_backup_exists",
"CheckTitle": "Block storage volumes have at least one backup",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "OS::Cinder::Volume",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage volumes** are evaluated to verify that at least one **backup** exists. Volume backups provide disaster recovery capability by storing volume data in a separate storage backend (e.g., Swift or Ceph). Without backups, data loss from volume corruption, accidental deletion, or infrastructure failure is irrecoverable.",
"Risk": "Volumes without backups are vulnerable to permanent data loss from hardware failures, accidental deletion, software bugs, or ransomware attacks. Without backups, recovery from disasters requires rebuilding data from scratch, which may be impossible for stateful applications.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/admin/volume-backups.html"
],
"Remediation": {
"Code": {
"CLI": "openstack volume backup create --name <backup_name> <volume_id>",
"NativeIaC": "",
"Other": "1. Navigate to **Block Storage > Volumes**\n2. Select the volume to back up\n3. Click **Create Backup**\n4. Provide a name and optional description\n5. Set up automated backup schedules using cron or orchestration tools",
"Terraform": ""
},
"Recommendation": {
"Text": "Create regular backups for all block storage volumes, especially those containing critical data. Implement automated backup schedules. Use incremental backups to reduce storage costs and backup time. Test backup restoration regularly to ensure recoverability.",
"Url": "https://hub.prowler.com/check/blockstorage_volume_backup_exists"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "The backup service must be enabled in the Cinder configuration. Backups are stored in a separate backend (typically Swift or Ceph). This check verifies the existence of at least one backup, not backup freshness or scheduling."
}

View File

@@ -0,0 +1,37 @@
from collections import Counter
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_volume_backup_exists(Check):
"""Ensure block storage volumes have at least one backup."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
# Build volume_id -> backup count mapping
backup_counts = Counter(
backup.volume_id for backup in blockstorage_client.backups
)
for volume in blockstorage_client.volumes:
report = CheckReportOpenStack(metadata=self.metadata(), resource=volume)
count = backup_counts.get(volume.id, 0)
if count > 0:
report.status = "PASS"
report.status_extended = (
f"Volume {volume.name} ({volume.id}) has {count} backup(s)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Volume {volume.name} ({volume.id}) does not have any backups."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_volume_encryption_enabled",
"CheckTitle": "Block storage volumes have encryption enabled",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "OS::Cinder::Volume",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage volumes** (Cinder) are evaluated to verify that **encryption** is enabled. Volume encryption protects data at rest by encrypting the entire volume using a key managed by the OpenStack Key Manager (Barbican). Without encryption, data stored on volumes is vulnerable to unauthorized access if the underlying storage is compromised.",
"Risk": "Unencrypted volumes expose data at rest to unauthorized access if physical storage media is compromised, stolen, or improperly decommissioned. Attackers with access to the storage backend can read sensitive data directly. Compliance frameworks (PCI-DSS, HIPAA, SOC2) require encryption of data at rest.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/configuration/block-storage/volume-encryption.html",
"https://docs.openstack.org/barbican/latest/"
],
"Remediation": {
"Code": {
"CLI": "openstack volume type create --encryption-provider luks --encryption-cipher aes-xts-plain64 --encryption-key-size 256 --encryption-control-location front-end encrypted_type\nopenstack volume create --size 10 --type encrypted_type encrypted-volume",
"NativeIaC": "",
"Other": "1. Navigate to **Block Storage > Volumes**\n2. Create a new volume using an encrypted volume type\n3. Migrate data from unencrypted volumes to encrypted ones\n4. Delete the old unencrypted volumes\n\nNote: Existing volumes cannot be encrypted in-place; data must be migrated.",
"Terraform": "```hcl\nresource \"openstack_blockstorage_volume_v3\" \"encrypted_volume\" {\n name = \"encrypted-volume\"\n size = 10\n volume_type = openstack_blockstorage_volume_type_v3.encrypted.name\n}\n```"
},
"Recommendation": {
"Text": "Enable encryption on all block storage volumes using encrypted volume types backed by Barbican key management. Create encrypted volume types with strong ciphers (aes-xts-plain64) and adequate key sizes (256-bit). Migrate existing unencrypted volumes to encrypted ones.",
"Url": "https://hub.prowler.com/check/blockstorage_volume_encryption_enabled"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Volume encryption requires Barbican (OpenStack Key Manager) to be deployed and configured. Encryption is set at the volume type level and applies to all volumes created with that type. Existing volumes cannot be encrypted in-place."
}

View File

@@ -0,0 +1,28 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_volume_encryption_enabled(Check):
"""Ensure block storage volumes have encryption enabled."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
for volume in blockstorage_client.volumes:
report = CheckReportOpenStack(metadata=self.metadata(), resource=volume)
if volume.is_encrypted:
report.status = "PASS"
report.status_extended = (
f"Volume {volume.name} ({volume.id}) has encryption enabled."
)
else:
report.status = "FAIL"
report.status_extended = f"Volume {volume.name} ({volume.id}) does not have encryption enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1,38 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_volume_metadata_sensitive_data",
"CheckTitle": "Block storage volume metadata does not contain sensitive data",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "critical",
"ResourceType": "OS::Cinder::Volume",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage volume metadata** is evaluated to detect **sensitive data** such as passwords, API keys, secrets, and private keys. Volume metadata is accessible via the OpenStack API to any user with access to the project. Storing secrets in metadata exposes them to unauthorized access through the API.",
"Risk": "Volume metadata containing sensitive data exposes credentials through the OpenStack API, accessible to any project member. Attackers with project access can enumerate volume metadata to extract passwords, API keys, and private keys. Stolen credentials enable unauthorized modifications, privilege escalation, and data breaches.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/cli/cli-manage-volumes.html",
"https://cheatsheetseries.owasp.org/cheatsheets/Secrets_Management_Cheat_Sheet.html"
],
"Remediation": {
"Code": {
"CLI": "openstack volume unset --property <sensitive_key> <volume_id>",
"NativeIaC": "",
"Other": "1. Navigate to **Block Storage > Volumes**\n2. Select volume with sensitive metadata\n3. Remove sensitive metadata keys using CLI command\n4. Rotate exposed credentials immediately\n5. Store secrets in Barbican or external secrets manager instead",
"Terraform": "```hcl\n# Use Barbican for secrets instead of volume metadata\nresource \"openstack_blockstorage_volume_v3\" \"secure_volume\" {\n name = \"app-data\"\n size = 10\n\n # Safe metadata (non-sensitive labels only)\n metadata = {\n environment = \"production\"\n application = \"web-app\"\n }\n}\n```"
},
"Recommendation": {
"Text": "Never store secrets in volume metadata; use Barbican (OpenStack Key Manager), Vault, or external secrets management instead. Remove any sensitive data currently stored in volume metadata and rotate exposed credentials immediately. Implement metadata policies to prevent sensitive data from being added.",
"Url": "https://hub.prowler.com/check/blockstorage_volume_metadata_sensitive_data"
}
},
"Categories": [
"secrets",
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check uses the detect-secrets library to scan for credentials. May produce false positives on metadata keys containing secret-like keywords. Findings should be reviewed manually. The audit_config allows configuring secrets_ignore_patterns to exclude specific patterns and detect_secrets_plugins to customize detection."
}

View File

@@ -0,0 +1,62 @@
import json
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.lib.utils.utils import detect_secrets_scan
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_volume_metadata_sensitive_data(Check):
"""Ensure block storage volume metadata does not contain sensitive data like passwords or API keys."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
secrets_ignore_patterns = blockstorage_client.audit_config.get(
"secrets_ignore_patterns", []
)
for volume in blockstorage_client.volumes:
report = CheckReportOpenStack(metadata=self.metadata(), resource=volume)
report.status = "PASS"
report.status_extended = f"Volume {volume.name} ({volume.id}) metadata does not contain sensitive data."
if volume.metadata:
# Build metadata dict and parallel list of keys
dump_metadata = {}
original_metadata_keys = []
for key, value in volume.metadata.items():
dump_metadata[key] = value
original_metadata_keys.append(key)
# Convert metadata dict to JSON string for detect-secrets scanning
metadata_json = json.dumps(dump_metadata, indent=2)
detect_secrets_output = detect_secrets_scan(
data=metadata_json,
excluded_secrets=secrets_ignore_patterns,
detect_secrets_plugins=blockstorage_client.audit_config.get(
"detect_secrets_plugins"
),
)
if detect_secrets_output:
# Map line numbers back to metadata keys using the parallel list
# Line numbering: line 1 = "{", line 2 = first key-value, etc.
secrets_string = ", ".join(
[
f"{secret['type']} in metadata key '{original_metadata_keys[secret['line_number'] - 2]}'"
for secret in detect_secrets_output
if 0
<= secret["line_number"] - 2
< len(original_metadata_keys)
]
)
report.status = "FAIL"
report.status_extended = f"Volume {volume.name} ({volume.id}) metadata contains potential secrets -> {secrets_string}."
else:
report.status_extended = f"Volume {volume.name} ({volume.id}) has no metadata (no sensitive data exposure risk)."
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_volume_multiattach_disabled",
"CheckTitle": "Block storage volumes do not have multi-attach enabled",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "OS::Cinder::Volume",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage volumes** are evaluated to verify that **multi-attach** is not enabled. Multi-attach allows a volume to be attached to multiple instances simultaneously, increasing the attack surface and potentially leading to data corruption or unauthorized access from compromised instances.",
"Risk": "Multi-attach volumes can be accessed by multiple instances simultaneously, increasing the blast radius if any attached instance is compromised. Data corruption may occur if applications do not implement proper cluster-aware file systems. Unauthorized modifications from one instance can affect all other attached instances.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/admin/volume-multiattach.html"
],
"Remediation": {
"Code": {
"CLI": "openstack volume create --size 10 --type <non_multiattach_type> <volume_name>",
"NativeIaC": "",
"Other": "1. Identify volumes with multi-attach enabled\n2. Evaluate if multi-attach is truly required for the workload\n3. For volumes that do not require multi-attach, migrate data to a new volume without multi-attach\n4. Ensure multi-attach volumes use cluster-aware file systems (e.g., GFS2, OCFS2)",
"Terraform": "```hcl\nresource \"openstack_blockstorage_volume_v3\" \"single_attach\" {\n name = \"single-attach-volume\"\n size = 10\n multiattach = false\n}\n```"
},
"Recommendation": {
"Text": "Disable multi-attach on volumes unless specifically required for clustered applications. When multi-attach is necessary, ensure cluster-aware file systems are used and implement strict access controls. Review multi-attach volumes regularly to verify continued business justification.",
"Url": "https://hub.prowler.com/check/blockstorage_volume_multiattach_disabled"
}
},
"Categories": [
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Multi-attach is a legitimate feature for clustered applications using cluster-aware file systems. This check flags volumes with multi-attach enabled for review. Organizations should evaluate whether multi-attach is necessary on a per-volume basis."
}

View File

@@ -0,0 +1,29 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_volume_multiattach_disabled(Check):
"""Ensure block storage volumes do not have multi-attach enabled."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
for volume in blockstorage_client.volumes:
report = CheckReportOpenStack(metadata=self.metadata(), resource=volume)
if not volume.is_multiattach:
report.status = "PASS"
report.status_extended = f"Volume {volume.name} ({volume.id}) does not have multi-attach enabled."
else:
report.status = "FAIL"
report.status_extended = (
f"Volume {volume.name} ({volume.id}) has multi-attach enabled, "
f"allowing simultaneous attachment to multiple instances."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,36 @@
{
"Provider": "openstack",
"CheckID": "blockstorage_volume_not_unattached",
"CheckTitle": "Block storage volumes are attached to instances",
"CheckType": [],
"ServiceName": "blockstorage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "OS::Cinder::Volume",
"ResourceGroup": "storage",
"Description": "**OpenStack block storage volumes** are evaluated to verify they are **attached to at least one instance**. Unattached volumes may indicate orphaned resources that are no longer in use but continue to incur storage costs and may contain sensitive data without active monitoring or access controls.",
"Risk": "Unattached volumes may contain sensitive data that is no longer actively managed or monitored. Orphaned volumes increase storage costs and can be overlooked during security audits. If unattached volumes contain credentials or sensitive data, they may be accessed by unauthorized users who gain project access.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.openstack.org/cinder/latest/cli/cli-manage-volumes.html"
],
"Remediation": {
"Code": {
"CLI": "openstack volume list --status available\nopenstack volume delete <volume_id>",
"NativeIaC": "",
"Other": "1. Navigate to **Block Storage > Volumes**\n2. Filter by status 'available' (unattached)\n3. Review each unattached volume for necessity\n4. Back up data if needed, then delete orphaned volumes\n5. Implement volume lifecycle policies to prevent accumulation",
"Terraform": ""
},
"Recommendation": {
"Text": "Review unattached volumes regularly and delete those no longer needed. Back up important data before deletion. Implement volume lifecycle policies and automated cleanup for orphaned resources. Tag volumes with ownership and purpose metadata for easier management.",
"Url": "https://hub.prowler.com/check/blockstorage_volume_not_unattached"
}
},
"Categories": [
"trust-boundaries"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Some volumes may be intentionally unattached (e.g., data volumes awaiting attachment, backup volumes). This check identifies unattached volumes for review, not automatic remediation. Organizations should evaluate each unattached volume on a case-by-case basis."
}

View File

@@ -0,0 +1,33 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportOpenStack
from prowler.providers.openstack.services.blockstorage.blockstorage_client import (
blockstorage_client,
)
class blockstorage_volume_not_unattached(Check):
"""Ensure block storage volumes are attached to at least one instance."""
def execute(self) -> List[CheckReportOpenStack]:
findings: List[CheckReportOpenStack] = []
for volume in blockstorage_client.volumes:
report = CheckReportOpenStack(metadata=self.metadata(), resource=volume)
attachment_count = len(volume.attachments)
if attachment_count > 0:
report.status = "PASS"
report.status_extended = f"Volume {volume.name} ({volume.id}) is attached to {attachment_count} instance(s)."
elif volume.status != "available":
report.status = "PASS"
report.status_extended = (
f"Volume {volume.name} ({volume.id}) is not attached but is in "
f"'{volume.status}' state (not idle)."
)
else:
report.status = "FAIL"
report.status_extended = f"Volume {volume.name} ({volume.id}) is unattached and may be orphaned."
findings.append(report)
return findings

View File

@@ -0,0 +1,350 @@
"""Tests for blockstorage_snapshot_metadata_sensitive_data check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
SnapshotResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_snapshot_metadata_sensitive_data:
"""Test suite for blockstorage_snapshot_metadata_sensitive_data check."""
def test_no_snapshots(self):
"""Test when no snapshots exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.snapshots = []
blockstorage_client.audit_config = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 0
def test_snapshot_no_metadata(self):
"""Test snapshot with no metadata (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-1",
name="No Metadata",
status="available",
size=50,
volume_id="vol-1",
metadata={},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Snapshot No Metadata (snap-1) has no metadata (no sensitive data exposure risk)."
)
assert result[0].resource_id == "snap-1"
assert result[0].resource_name == "No Metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_snapshot_safe_metadata(self):
"""Test snapshot with safe metadata (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-2",
name="Safe Metadata",
status="available",
size=50,
volume_id="vol-1",
metadata={"environment": "production", "application": "web-app"},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Snapshot Safe Metadata (snap-2) metadata does not contain sensitive data."
)
assert result[0].resource_id == "snap-2"
assert result[0].resource_name == "Safe Metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_snapshot_password_in_metadata(self):
"""Test snapshot with password in metadata (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-3",
name="Password Metadata",
status="available",
size=50,
volume_id="vol-1",
metadata={"db_password": "supersecret123"},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "contains potential secrets" in result[0].status_extended
def test_snapshot_api_key_in_metadata(self):
"""Test snapshot with API key in metadata (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-4",
name="API Key Metadata",
status="available",
size=50,
volume_id="vol-1",
metadata={"api_key": "sk-1234567890"},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended.startswith(
"Snapshot API Key Metadata (snap-4) metadata contains potential secrets ->"
)
assert result[0].resource_id == "snap-4"
assert result[0].resource_name == "API Key Metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_snapshot_private_key_in_metadata(self):
"""Test snapshot with private key in metadata (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-5",
name="Private Key",
status="available",
size=50,
volume_id="vol-1",
metadata={"ssh_key": "-----BEGIN RSA PRIVATE KEY-----"},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended.startswith(
"Snapshot Private Key (snap-5) metadata contains potential secrets ->"
)
assert result[0].resource_id == "snap-5"
assert result[0].resource_name == "Private Key"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_snapshots_mixed(self):
"""Test multiple snapshots with mixed metadata."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-pass",
name="Safe",
status="available",
size=50,
volume_id="vol-1",
metadata={"tier": "web"},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
SnapshotResource(
id="snap-fail",
name="Unsafe",
status="available",
size=50,
volume_id="vol-2",
metadata={"admin_password": "secret123"},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1
def test_snapshot_metadata_key_correct_identification(self):
"""Test that secrets are correctly attributed to the right metadata keys."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-6",
name="Multiple Keys",
status="available",
size=50,
volume_id="vol-1",
metadata={
"environment": "production",
"application": "web-app",
"db_password": "supersecret123",
"region": "us-east",
},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_metadata_sensitive_data.blockstorage_snapshot_metadata_sensitive_data import (
blockstorage_snapshot_metadata_sensitive_data,
)
check = blockstorage_snapshot_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
# Verify the secret is correctly attributed to 'db_password' key
assert "in metadata key 'db_password'" in result[0].status_extended
assert result[0].resource_id == "snap-6"

View File

@@ -0,0 +1,216 @@
"""Tests for blockstorage_snapshot_not_orphaned check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
SnapshotResource,
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_snapshot_not_orphaned:
"""Test suite for blockstorage_snapshot_not_orphaned check."""
def test_no_snapshots(self):
"""Test when no snapshots exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.snapshots = []
blockstorage_client.volumes = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned import (
blockstorage_snapshot_not_orphaned,
)
check = blockstorage_snapshot_not_orphaned()
result = check.execute()
assert len(result) == 0
def test_snapshot_with_existing_volume(self):
"""Test snapshot referencing an existing volume (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="Existing Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-1",
name="Valid Snapshot",
status="available",
size=100,
volume_id="vol-1",
metadata={},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned import (
blockstorage_snapshot_not_orphaned,
)
check = blockstorage_snapshot_not_orphaned()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Snapshot Valid Snapshot (snap-1) references existing volume vol-1."
)
assert result[0].resource_id == "snap-1"
assert result[0].resource_name == "Valid Snapshot"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_snapshot_orphaned(self):
"""Test snapshot referencing a non-existent volume (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = []
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-2",
name="Orphaned Snapshot",
status="available",
size=100,
volume_id="vol-deleted",
metadata={},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned import (
blockstorage_snapshot_not_orphaned,
)
check = blockstorage_snapshot_not_orphaned()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Snapshot Orphaned Snapshot (snap-2) references non-existent volume vol-deleted and may be orphaned."
)
assert result[0].resource_id == "snap-2"
assert result[0].resource_name == "Orphaned Snapshot"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_snapshots_mixed(self):
"""Test multiple snapshots with mixed orphan status."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="Existing Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
blockstorage_client.snapshots = [
SnapshotResource(
id="snap-pass",
name="Pass",
status="available",
size=100,
volume_id="vol-1",
metadata={},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
SnapshotResource(
id="snap-fail",
name="Fail",
status="available",
size=100,
volume_id="vol-deleted",
metadata={},
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_snapshot_not_orphaned.blockstorage_snapshot_not_orphaned import (
blockstorage_snapshot_not_orphaned,
)
check = blockstorage_snapshot_not_orphaned()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,243 @@
"""Tests for blockstorage_volume_backup_exists check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
BackupResource,
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_volume_backup_exists:
"""Test suite for blockstorage_volume_backup_exists check."""
def test_no_volumes(self):
"""Test when no volumes exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = []
blockstorage_client.backups = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists import (
blockstorage_volume_backup_exists,
)
check = blockstorage_volume_backup_exists()
result = check.execute()
assert len(result) == 0
def test_volume_with_backup(self):
"""Test volume with backups (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="Backed Up Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
blockstorage_client.backups = [
BackupResource(
id="backup-1",
name="Backup 1",
status="available",
size=100,
volume_id="vol-1",
is_incremental=False,
availability_zone="nova",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
BackupResource(
id="backup-2",
name="Backup 2",
status="available",
size=100,
volume_id="vol-1",
is_incremental=True,
availability_zone="nova",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists import (
blockstorage_volume_backup_exists,
)
check = blockstorage_volume_backup_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Volume Backed Up Volume (vol-1) has 2 backup(s)."
)
assert result[0].resource_id == "vol-1"
assert result[0].resource_name == "Backed Up Volume"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_without_backup(self):
"""Test volume without any backups (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-2",
name="No Backup Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
blockstorage_client.backups = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists import (
blockstorage_volume_backup_exists,
)
check = blockstorage_volume_backup_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Volume No Backup Volume (vol-2) does not have any backups."
)
assert result[0].resource_id == "vol-2"
assert result[0].resource_name == "No Backup Volume"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_volumes_mixed(self):
"""Test multiple volumes with mixed backup status."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-pass",
name="Pass",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
VolumeResource(
id="vol-fail",
name="Fail",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
blockstorage_client.backups = [
BackupResource(
id="backup-1",
name="Backup 1",
status="available",
size=100,
volume_id="vol-pass",
is_incremental=False,
availability_zone="nova",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_backup_exists.blockstorage_volume_backup_exists import (
blockstorage_volume_backup_exists,
)
check = blockstorage_volume_backup_exists()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,203 @@
"""Tests for blockstorage_volume_encryption_enabled check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_volume_encryption_enabled:
"""Test suite for blockstorage_volume_encryption_enabled check."""
def test_no_volumes(self):
"""Test when no volumes exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled import (
blockstorage_volume_encryption_enabled,
)
check = blockstorage_volume_encryption_enabled()
result = check.execute()
assert len(result) == 0
def test_volume_encrypted(self):
"""Test volume with encryption enabled (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="Encrypted Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=True,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled import (
blockstorage_volume_encryption_enabled,
)
check = blockstorage_volume_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Volume Encrypted Volume (vol-1) has encryption enabled."
)
assert result[0].resource_id == "vol-1"
assert result[0].resource_name == "Encrypted Volume"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_not_encrypted(self):
"""Test volume without encryption (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-2",
name="Unencrypted Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled import (
blockstorage_volume_encryption_enabled,
)
check = blockstorage_volume_encryption_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Volume Unencrypted Volume (vol-2) does not have encryption enabled."
)
assert result[0].resource_id == "vol-2"
assert result[0].resource_name == "Unencrypted Volume"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_volumes_mixed(self):
"""Test multiple volumes with mixed encryption status."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-pass",
name="Pass",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=True,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
VolumeResource(
id="vol-fail",
name="Fail",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_encryption_enabled.blockstorage_volume_encryption_enabled import (
blockstorage_volume_encryption_enabled,
)
check = blockstorage_volume_encryption_enabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,406 @@
"""Tests for blockstorage_volume_metadata_sensitive_data check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_volume_metadata_sensitive_data:
"""Test suite for blockstorage_volume_metadata_sensitive_data check."""
def test_no_volumes(self):
"""Test when no volumes exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = []
blockstorage_client.audit_config = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 0
def test_volume_no_metadata(self):
"""Test volume with no metadata (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="No Metadata",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Volume No Metadata (vol-1) has no metadata (no sensitive data exposure risk)."
)
assert result[0].resource_id == "vol-1"
assert result[0].resource_name == "No Metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_safe_metadata(self):
"""Test volume with safe metadata (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-2",
name="Safe Metadata",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={"environment": "production", "application": "web-app"},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Volume Safe Metadata (vol-2) metadata does not contain sensitive data."
)
assert result[0].resource_id == "vol-2"
assert result[0].resource_name == "Safe Metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_password_in_metadata(self):
"""Test volume with password in metadata (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-3",
name="Password Metadata",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={"db_password": "supersecret123"},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "contains potential secrets" in result[0].status_extended
def test_volume_api_key_in_metadata(self):
"""Test volume with API key in metadata (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-4",
name="API Key Metadata",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={"api_key": "sk-1234567890"},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended.startswith(
"Volume API Key Metadata (vol-4) metadata contains potential secrets ->"
)
assert result[0].resource_id == "vol-4"
assert result[0].resource_name == "API Key Metadata"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_private_key_in_metadata(self):
"""Test volume with private key in metadata (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-5",
name="Private Key",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={"ssh_key": "-----BEGIN RSA PRIVATE KEY-----"},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended.startswith(
"Volume Private Key (vol-5) metadata contains potential secrets ->"
)
assert result[0].resource_id == "vol-5"
assert result[0].resource_name == "Private Key"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_volumes_mixed(self):
"""Test multiple volumes with mixed metadata."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-pass",
name="Safe",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={"tier": "web"},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
VolumeResource(
id="vol-fail",
name="Unsafe",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={"admin_password": "secret123"},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1
def test_volume_metadata_key_correct_identification(self):
"""Test that secrets are correctly attributed to the right metadata keys."""
blockstorage_client = mock.MagicMock()
blockstorage_client.audit_config = {}
blockstorage_client.volumes = [
VolumeResource(
id="vol-6",
name="Multiple Keys",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={
"environment": "production",
"application": "web-app",
"db_password": "supersecret123",
"region": "us-east",
},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_metadata_sensitive_data.blockstorage_volume_metadata_sensitive_data import (
blockstorage_volume_metadata_sensitive_data,
)
check = blockstorage_volume_metadata_sensitive_data()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
# Verify the secret is correctly attributed to 'db_password' key
assert "in metadata key 'db_password'" in result[0].status_extended
assert result[0].resource_id == "vol-6"

View File

@@ -0,0 +1,203 @@
"""Tests for blockstorage_volume_multiattach_disabled check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_volume_multiattach_disabled:
"""Test suite for blockstorage_volume_multiattach_disabled check."""
def test_no_volumes(self):
"""Test when no volumes exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled import (
blockstorage_volume_multiattach_disabled,
)
check = blockstorage_volume_multiattach_disabled()
result = check.execute()
assert len(result) == 0
def test_volume_without_multiattach(self):
"""Test volume without multi-attach enabled (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="Single Attach",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled import (
blockstorage_volume_multiattach_disabled,
)
check = blockstorage_volume_multiattach_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Volume Single Attach (vol-1) does not have multi-attach enabled."
)
assert result[0].resource_id == "vol-1"
assert result[0].resource_name == "Single Attach"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_with_multiattach(self):
"""Test volume with multi-attach enabled (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-2",
name="Multi Attach",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=True,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled import (
blockstorage_volume_multiattach_disabled,
)
check = blockstorage_volume_multiattach_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Volume Multi Attach (vol-2) has multi-attach enabled, allowing simultaneous attachment to multiple instances."
)
assert result[0].resource_id == "vol-2"
assert result[0].resource_name == "Multi Attach"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_multiple_volumes_mixed(self):
"""Test multiple volumes with mixed multi-attach status."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-pass",
name="Pass",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
VolumeResource(
id="vol-fail",
name="Fail",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=True,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_multiattach_disabled.blockstorage_volume_multiattach_disabled import (
blockstorage_volume_multiattach_disabled,
)
check = blockstorage_volume_multiattach_disabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,250 @@
"""Tests for blockstorage_volume_not_unattached check."""
from unittest import mock
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_blockstorage_volume_not_unattached:
"""Test suite for blockstorage_volume_not_unattached check."""
def test_no_volumes(self):
"""Test when no volumes exist."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached import (
blockstorage_volume_not_unattached,
)
check = blockstorage_volume_not_unattached()
result = check.execute()
assert len(result) == 0
def test_volume_attached(self):
"""Test volume that is attached to instances (PASS)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-1",
name="Attached Volume",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[
{"server_id": "server-1"},
{"server_id": "server-2"},
],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached import (
blockstorage_volume_not_unattached,
)
check = blockstorage_volume_not_unattached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Volume Attached Volume (vol-1) is attached to 2 instance(s)."
)
assert result[0].resource_id == "vol-1"
assert result[0].resource_name == "Attached Volume"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_unattached_available(self):
"""Test volume that is available and unattached (FAIL)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-2",
name="Orphaned Volume",
status="available",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached import (
blockstorage_volume_not_unattached,
)
check = blockstorage_volume_not_unattached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Volume Orphaned Volume (vol-2) is unattached and may be orphaned."
)
assert result[0].resource_id == "vol-2"
assert result[0].resource_name == "Orphaned Volume"
assert result[0].region == OPENSTACK_REGION
assert result[0].project_id == OPENSTACK_PROJECT_ID
def test_volume_unattached_non_available_status(self):
"""Test volume that is unattached but in non-available state (PASS - not idle)."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-3",
name="Error Volume",
status="error",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached import (
blockstorage_volume_not_unattached,
)
check = blockstorage_volume_not_unattached()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "'error' state (not idle)" in result[0].status_extended
def test_multiple_volumes_mixed(self):
"""Test multiple volumes with mixed attachment status."""
blockstorage_client = mock.MagicMock()
blockstorage_client.volumes = [
VolumeResource(
id="vol-pass",
name="Pass",
status="in-use",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[{"server_id": "server-1"}],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
VolumeResource(
id="vol-fail",
name="Fail",
status="available",
size=100,
volume_type="standard",
is_encrypted=False,
is_bootable=False,
is_multiattach=False,
attachments=[],
metadata={},
availability_zone="nova",
snapshot_id="",
source_volume_id="",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached.blockstorage_client",
new=blockstorage_client,
),
):
from prowler.providers.openstack.services.blockstorage.blockstorage_volume_not_unattached.blockstorage_volume_not_unattached import (
blockstorage_volume_not_unattached,
)
check = blockstorage_volume_not_unattached()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,519 @@
"""Tests for OpenStack BlockStorage service."""
from unittest.mock import MagicMock, patch
from openstack import exceptions as openstack_exceptions
from prowler.providers.openstack.services.blockstorage.blockstorage_service import (
BackupResource,
BlockStorage,
SnapshotResource,
VolumeResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class TestBlockStorageService:
"""Test suite for BlockStorage service."""
def test_blockstorage_service_initialization(self):
"""Test BlockStorage service initializes correctly."""
provider = set_mocked_openstack_provider()
with (
patch.object(BlockStorage, "_list_volumes", return_value=[]),
patch.object(BlockStorage, "_list_snapshots", return_value=[]),
patch.object(BlockStorage, "_list_backups", return_value=[]),
):
block_storage = BlockStorage(provider)
assert block_storage.service_name == "BlockStorage"
assert block_storage.provider == provider
assert block_storage.connection == provider.connection
assert block_storage.regional_connections == provider.regional_connections
assert block_storage.audited_regions == [OPENSTACK_REGION]
assert block_storage.region == OPENSTACK_REGION
assert block_storage.project_id == OPENSTACK_PROJECT_ID
assert block_storage.volumes == []
assert block_storage.snapshots == []
assert block_storage.backups == []
def test_blockstorage_list_volumes_success(self):
"""Test listing volumes successfully."""
provider = set_mocked_openstack_provider()
mock_volume = MagicMock()
mock_volume.id = "vol-1"
mock_volume.name = "Volume One"
mock_volume.status = "in-use"
mock_volume.size = 100
mock_volume.volume_type = "encrypted"
mock_volume.is_encrypted = True
mock_volume.is_bootable = "true"
mock_volume.is_multiattach = False
mock_volume.attachments = [{"server_id": "server-1", "device": "/dev/vda"}]
mock_volume.metadata = {"environment": "production"}
mock_volume.availability_zone = "nova"
mock_volume.snapshot_id = "snap-1"
mock_volume.source_volume_id = None
provider.connection.block_storage.volumes.return_value = [mock_volume]
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert len(block_storage.volumes) == 1
assert isinstance(block_storage.volumes[0], VolumeResource)
assert block_storage.volumes[0].id == "vol-1"
assert block_storage.volumes[0].name == "Volume One"
assert block_storage.volumes[0].status == "in-use"
assert block_storage.volumes[0].size == 100
assert block_storage.volumes[0].volume_type == "encrypted"
assert block_storage.volumes[0].is_encrypted is True
assert block_storage.volumes[0].is_bootable is True
assert block_storage.volumes[0].is_multiattach is False
assert len(block_storage.volumes[0].attachments) == 1
assert block_storage.volumes[0].metadata == {"environment": "production"}
assert block_storage.volumes[0].availability_zone == "nova"
assert block_storage.volumes[0].snapshot_id == "snap-1"
assert block_storage.volumes[0].source_volume_id == ""
assert block_storage.volumes[0].project_id == OPENSTACK_PROJECT_ID
assert block_storage.volumes[0].region == OPENSTACK_REGION
def test_blockstorage_list_volumes_empty(self):
"""Test listing volumes when none exist."""
provider = set_mocked_openstack_provider()
provider.connection.block_storage.volumes.return_value = []
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert block_storage.volumes == []
def test_blockstorage_list_volumes_sdk_exception(self):
"""Test handling SDKException when listing volumes."""
provider = set_mocked_openstack_provider()
provider.connection.block_storage.volumes.side_effect = (
openstack_exceptions.SDKException("API error")
)
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert block_storage.volumes == []
def test_blockstorage_list_volumes_generic_exception(self):
"""Test handling generic exception when listing volumes."""
provider = set_mocked_openstack_provider()
provider.connection.block_storage.volumes.side_effect = Exception(
"Unexpected error"
)
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert block_storage.volumes == []
def test_blockstorage_list_snapshots_success(self):
"""Test listing snapshots successfully."""
provider = set_mocked_openstack_provider()
mock_snapshot = MagicMock()
mock_snapshot.id = "snap-1"
mock_snapshot.name = "Snapshot One"
mock_snapshot.status = "available"
mock_snapshot.size = 50
mock_snapshot.volume_id = "vol-1"
mock_snapshot.metadata = {"backup": "daily"}
provider.connection.block_storage.volumes.return_value = []
provider.connection.block_storage.snapshots.return_value = [mock_snapshot]
provider.connection.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert len(block_storage.snapshots) == 1
assert isinstance(block_storage.snapshots[0], SnapshotResource)
assert block_storage.snapshots[0].id == "snap-1"
assert block_storage.snapshots[0].name == "Snapshot One"
assert block_storage.snapshots[0].status == "available"
assert block_storage.snapshots[0].size == 50
assert block_storage.snapshots[0].volume_id == "vol-1"
assert block_storage.snapshots[0].metadata == {"backup": "daily"}
assert block_storage.snapshots[0].project_id == OPENSTACK_PROJECT_ID
assert block_storage.snapshots[0].region == OPENSTACK_REGION
def test_blockstorage_list_snapshots_sdk_exception(self):
"""Test handling SDKException when listing snapshots."""
provider = set_mocked_openstack_provider()
provider.connection.block_storage.volumes.return_value = []
provider.connection.block_storage.snapshots.side_effect = (
openstack_exceptions.SDKException("API error")
)
provider.connection.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert block_storage.snapshots == []
def test_blockstorage_list_backups_success(self):
"""Test listing backups successfully."""
provider = set_mocked_openstack_provider()
mock_backup = MagicMock()
mock_backup.id = "backup-1"
mock_backup.name = "Backup One"
mock_backup.status = "available"
mock_backup.size = 100
mock_backup.volume_id = "vol-1"
mock_backup.is_incremental = True
mock_backup.availability_zone = "nova"
provider.connection.block_storage.volumes.return_value = []
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.return_value = [mock_backup]
block_storage = BlockStorage(provider)
assert len(block_storage.backups) == 1
assert isinstance(block_storage.backups[0], BackupResource)
assert block_storage.backups[0].id == "backup-1"
assert block_storage.backups[0].name == "Backup One"
assert block_storage.backups[0].status == "available"
assert block_storage.backups[0].size == 100
assert block_storage.backups[0].volume_id == "vol-1"
assert block_storage.backups[0].is_incremental is True
assert block_storage.backups[0].availability_zone == "nova"
assert block_storage.backups[0].project_id == OPENSTACK_PROJECT_ID
assert block_storage.backups[0].region == OPENSTACK_REGION
def test_blockstorage_list_backups_sdk_exception(self):
"""Test handling SDKException when listing backups."""
provider = set_mocked_openstack_provider()
provider.connection.block_storage.volumes.return_value = []
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.side_effect = (
openstack_exceptions.SDKException("API error")
)
block_storage = BlockStorage(provider)
assert block_storage.backups == []
def test_blockstorage_list_backups_generic_exception(self):
"""Test handling generic exception when listing backups."""
provider = set_mocked_openstack_provider()
provider.connection.block_storage.volumes.return_value = []
provider.connection.block_storage.snapshots.return_value = []
provider.connection.block_storage.backups.side_effect = Exception(
"Unexpected error"
)
block_storage = BlockStorage(provider)
assert block_storage.backups == []
def test_blockstorage_service_inherits_from_base(self):
"""Test BlockStorage service inherits from OpenStackService."""
provider = set_mocked_openstack_provider()
with (
patch.object(BlockStorage, "_list_volumes", return_value=[]),
patch.object(BlockStorage, "_list_snapshots", return_value=[]),
patch.object(BlockStorage, "_list_backups", return_value=[]),
):
block_storage = BlockStorage(provider)
assert hasattr(block_storage, "service_name")
assert hasattr(block_storage, "provider")
assert hasattr(block_storage, "connection")
assert hasattr(block_storage, "session")
assert hasattr(block_storage, "region")
assert hasattr(block_storage, "project_id")
assert hasattr(block_storage, "identity")
assert hasattr(block_storage, "audit_config")
assert hasattr(block_storage, "fixer_config")
def test_volume_resource_dataclass(self):
"""Test VolumeResource dataclass has all required attributes."""
volume = VolumeResource(
id="vol-1",
name="Test Volume",
status="in-use",
size=100,
volume_type="encrypted",
is_encrypted=True,
is_bootable=True,
is_multiattach=False,
attachments=[{"server_id": "server-1"}],
metadata={"env": "prod"},
availability_zone="nova",
snapshot_id="snap-1",
source_volume_id="",
project_id="project-1",
region="RegionOne",
)
assert volume.id == "vol-1"
assert volume.name == "Test Volume"
assert volume.is_encrypted is True
assert volume.is_bootable is True
assert volume.is_multiattach is False
def test_snapshot_resource_dataclass(self):
"""Test SnapshotResource dataclass has all required attributes."""
snapshot = SnapshotResource(
id="snap-1",
name="Test Snapshot",
status="available",
size=50,
volume_id="vol-1",
metadata={},
project_id="project-1",
region="RegionOne",
)
assert snapshot.id == "snap-1"
assert snapshot.volume_id == "vol-1"
def test_backup_resource_dataclass(self):
"""Test BackupResource dataclass has all required attributes."""
backup = BackupResource(
id="backup-1",
name="Test Backup",
status="available",
size=100,
volume_id="vol-1",
is_incremental=True,
availability_zone="nova",
project_id="project-1",
region="RegionOne",
)
assert backup.id == "backup-1"
assert backup.volume_id == "vol-1"
assert backup.is_incremental is True
def test_blockstorage_list_volumes_multi_region(self):
"""Test listing volumes across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_vol_uk = MagicMock()
mock_vol_uk.id = "vol-uk"
mock_vol_uk.name = "Volume UK"
mock_vol_uk.status = "in-use"
mock_vol_uk.size = 100
mock_vol_uk.volume_type = "standard"
mock_vol_uk.is_encrypted = False
mock_vol_uk.is_bootable = "false"
mock_vol_uk.is_multiattach = False
mock_vol_uk.attachments = []
mock_vol_uk.metadata = {}
mock_vol_uk.availability_zone = "nova"
mock_vol_uk.snapshot_id = None
mock_vol_uk.source_volume_id = None
mock_vol_de = MagicMock()
mock_vol_de.id = "vol-de"
mock_vol_de.name = "Volume DE"
mock_vol_de.status = "available"
mock_vol_de.size = 200
mock_vol_de.volume_type = "encrypted"
mock_vol_de.is_encrypted = True
mock_vol_de.is_bootable = "true"
mock_vol_de.is_multiattach = False
mock_vol_de.attachments = []
mock_vol_de.metadata = {}
mock_vol_de.availability_zone = "nova"
mock_vol_de.snapshot_id = None
mock_vol_de.source_volume_id = None
mock_conn_uk1.block_storage.volumes.return_value = [mock_vol_uk]
mock_conn_de1.block_storage.volumes.return_value = [mock_vol_de]
mock_conn_uk1.block_storage.snapshots.return_value = []
mock_conn_de1.block_storage.snapshots.return_value = []
mock_conn_uk1.block_storage.backups.return_value = []
mock_conn_de1.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert len(block_storage.volumes) == 2
uk_vol = next(v for v in block_storage.volumes if v.id == "vol-uk")
de_vol = next(v for v in block_storage.volumes if v.id == "vol-de")
assert uk_vol.region == "UK1"
assert de_vol.region == "DE1"
def test_blockstorage_list_snapshots_multi_region(self):
"""Test listing snapshots across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_snap_uk = MagicMock()
mock_snap_uk.id = "snap-uk"
mock_snap_uk.name = "Snapshot UK"
mock_snap_uk.status = "available"
mock_snap_uk.size = 50
mock_snap_uk.volume_id = "vol-uk"
mock_snap_uk.metadata = {}
mock_snap_de = MagicMock()
mock_snap_de.id = "snap-de"
mock_snap_de.name = "Snapshot DE"
mock_snap_de.status = "available"
mock_snap_de.size = 75
mock_snap_de.volume_id = "vol-de"
mock_snap_de.metadata = {}
mock_conn_uk1.block_storage.volumes.return_value = []
mock_conn_de1.block_storage.volumes.return_value = []
mock_conn_uk1.block_storage.snapshots.return_value = [mock_snap_uk]
mock_conn_de1.block_storage.snapshots.return_value = [mock_snap_de]
mock_conn_uk1.block_storage.backups.return_value = []
mock_conn_de1.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert len(block_storage.snapshots) == 2
uk_snap = next(s for s in block_storage.snapshots if s.id == "snap-uk")
de_snap = next(s for s in block_storage.snapshots if s.id == "snap-de")
assert uk_snap.region == "UK1"
assert de_snap.region == "DE1"
def test_blockstorage_list_backups_multi_region(self):
"""Test listing backups across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_backup_uk = MagicMock()
mock_backup_uk.id = "backup-uk"
mock_backup_uk.name = "Backup UK"
mock_backup_uk.status = "available"
mock_backup_uk.size = 100
mock_backup_uk.volume_id = "vol-uk"
mock_backup_uk.is_incremental = False
mock_backup_uk.availability_zone = "nova"
mock_backup_de = MagicMock()
mock_backup_de.id = "backup-de"
mock_backup_de.name = "Backup DE"
mock_backup_de.status = "available"
mock_backup_de.size = 200
mock_backup_de.volume_id = "vol-de"
mock_backup_de.is_incremental = True
mock_backup_de.availability_zone = "nova"
mock_conn_uk1.block_storage.volumes.return_value = []
mock_conn_de1.block_storage.volumes.return_value = []
mock_conn_uk1.block_storage.snapshots.return_value = []
mock_conn_de1.block_storage.snapshots.return_value = []
mock_conn_uk1.block_storage.backups.return_value = [mock_backup_uk]
mock_conn_de1.block_storage.backups.return_value = [mock_backup_de]
block_storage = BlockStorage(provider)
assert len(block_storage.backups) == 2
uk_backup = next(b for b in block_storage.backups if b.id == "backup-uk")
de_backup = next(b for b in block_storage.backups if b.id == "backup-de")
assert uk_backup.region == "UK1"
assert de_backup.region == "DE1"
def test_blockstorage_multi_region_partial_failure(self):
"""Test that a failing region doesn't prevent other regions from being listed."""
provider = set_mocked_openstack_provider()
mock_conn_ok = MagicMock()
mock_conn_fail = MagicMock()
provider.regional_connections = {"UK1": mock_conn_ok, "DE1": mock_conn_fail}
mock_vol = MagicMock()
mock_vol.id = "vol-uk"
mock_vol.name = "Volume UK"
mock_vol.status = "in-use"
mock_vol.size = 100
mock_vol.volume_type = "standard"
mock_vol.is_encrypted = False
mock_vol.is_bootable = "false"
mock_vol.is_multiattach = False
mock_vol.attachments = []
mock_vol.metadata = {}
mock_vol.availability_zone = "nova"
mock_vol.snapshot_id = None
mock_vol.source_volume_id = None
mock_conn_ok.block_storage.volumes.return_value = [mock_vol]
mock_conn_fail.block_storage.volumes.side_effect = (
openstack_exceptions.SDKException("API error in DE1")
)
mock_conn_ok.block_storage.snapshots.return_value = []
mock_conn_fail.block_storage.snapshots.side_effect = (
openstack_exceptions.SDKException("API error in DE1")
)
mock_conn_ok.block_storage.backups.return_value = []
mock_conn_fail.block_storage.backups.side_effect = (
openstack_exceptions.SDKException("API error in DE1")
)
block_storage = BlockStorage(provider)
assert len(block_storage.volumes) == 1
assert block_storage.volumes[0].id == "vol-uk"
assert block_storage.volumes[0].region == "UK1"
def test_blockstorage_multi_region_one_empty(self):
"""Test multi-region where one region has resources and the other is empty."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_vol = MagicMock()
mock_vol.id = "vol-uk"
mock_vol.name = "Volume UK"
mock_vol.status = "available"
mock_vol.size = 50
mock_vol.volume_type = "standard"
mock_vol.is_encrypted = False
mock_vol.is_bootable = "false"
mock_vol.is_multiattach = False
mock_vol.attachments = []
mock_vol.metadata = {}
mock_vol.availability_zone = "nova"
mock_vol.snapshot_id = None
mock_vol.source_volume_id = None
mock_conn_uk1.block_storage.volumes.return_value = [mock_vol]
mock_conn_de1.block_storage.volumes.return_value = []
mock_conn_uk1.block_storage.snapshots.return_value = []
mock_conn_de1.block_storage.snapshots.return_value = []
mock_conn_uk1.block_storage.backups.return_value = []
mock_conn_de1.block_storage.backups.return_value = []
block_storage = BlockStorage(provider)
assert len(block_storage.volumes) == 1
assert block_storage.volumes[0].id == "vol-uk"
assert block_storage.volumes[0].region == "UK1"