From 030d053c84b1cc09295ac7d6c2472beadb4b0363 Mon Sep 17 00:00:00 2001
From: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
Date: Tue, 24 Feb 2026 12:50:52 +0100
Subject: [PATCH] chore(openstack): support multi-region in the same provider
(#10135)
---
api/src/backend/api/utils.py | 7 +-
.../providers/openstack/authentication.mdx | 94 +++
.../openstack/getting-started-openstack.mdx | 30 +
prowler/CHANGELOG.md | 1 +
.../openstack/exceptions/exceptions.py | 48 ++
.../openstack/lib/service/service.py | 8 +-
prowler/providers/openstack/models.py | 23 +-
.../providers/openstack/openstack_provider.py | 107 ++-
.../services/compute/compute_service.py | 223 +++----
.../providers/openstack/openstack_fixtures.py | 3 +
.../openstack/openstack_provider_test.py | 623 ++++++++++++++++++
.../compute/openstack_compute_service_test.py | 155 ++++-
12 files changed, 1192 insertions(+), 130 deletions(-)
diff --git a/api/src/backend/api/utils.py b/api/src/backend/api/utils.py
index 355c3b6e03..6d8cb3f99b 100644
--- a/api/src/backend/api/utils.py
+++ b/api/src/backend/api/utils.py
@@ -216,10 +216,8 @@ def get_prowler_provider_kwargs(
"filter_accounts": [provider.uid],
}
elif provider.provider == Provider.ProviderChoices.OPENSTACK.value:
- # No extra kwargs needed: clouds_yaml_content and clouds_yaml_cloud from the
- # secret are sufficient. Validating project_id (provider.uid) against the
- # clouds.yaml is not feasible because not all auth methods include it and the
- # Keystone API is unavailable on public clouds.
+ # clouds_yaml_content, clouds_yaml_cloud and provider_id are validated
+ # in the provider itself, so it's not needed here.
pass
if mutelist_processor:
@@ -294,6 +292,7 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
openstack_kwargs = {
"clouds_yaml_content": prowler_provider_kwargs["clouds_yaml_content"],
"clouds_yaml_cloud": prowler_provider_kwargs["clouds_yaml_cloud"],
+ "provider_id": provider.uid,
"raise_on_exception": False,
}
return prowler_provider.test_connection(**openstack_kwargs)
diff --git a/docs/user-guide/providers/openstack/authentication.mdx b/docs/user-guide/providers/openstack/authentication.mdx
index 88db4e221e..8c122f5e26 100644
--- a/docs/user-guide/providers/openstack/authentication.mdx
+++ b/docs/user-guide/providers/openstack/authentication.mdx
@@ -337,6 +337,99 @@ prowler openstack --clouds-yaml-cloud ovh-staging --output-directory ./reports/o
prowler openstack --clouds-yaml-cloud infomaniak-production --output-directory ./reports/infomaniak/
```
+## Multi-Region Scanning
+
+Many OpenStack providers (OVH, Infomaniak, etc.) offer resources across multiple regions within the same project. By default, the `clouds.yaml` file downloaded from Horizon uses `region_name` which targets a **single region**. Prowler supports scanning **all regions** in a single run by using the `regions` key instead.
+
+### Configuring Multi-Region
+
+Replace the `region_name` key with a `regions` list in your `clouds.yaml`:
+
+```yaml
+clouds:
+ ovh-multiregion:
+ auth:
+ auth_url: https://auth.cloud.ovh.net/v3
+ username: user-xxxxxxxxxx
+ password: your-password-here
+ project_id: your-project-id
+ user_domain_name: Default
+ project_domain_name: Default
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: "3"
+```
+
+Then run Prowler as usual:
+
+```bash
+prowler openstack --clouds-yaml-cloud ovh-multiregion
+```
+
+Prowler will create a separate connection to each region and scan all resources across them. Findings in the output will include the region where each resource was found.
+
+
+You must use **either** `region_name` (single region) **or** `regions` (multi-region), not both. Prowler will raise an error if both keys are present in the same cloud configuration.
+
+
+### How It Works
+
+The `region_name` and `regions` keys are part of the [OpenStack SDK configuration format](https://docs.openstack.org/openstacksdk/latest/user/config/configuration.html#site-specific-file-locations). When `regions` is set, the SDK can produce a separate cloud config object for each region — but it does not iterate over them automatically. Prowler uses this to create one authenticated connection per region and iterates over all of them when listing resources. This means:
+
+- **Authentication** is tested against every configured region during connection setup
+- **Resources** from all regions are collected in a single scan
+- **Findings** include the specific region for each resource
+- If a single region fails to connect, the entire scan fails (fail-fast)
+
+### Finding Your Available Regions
+
+To discover which regions are available for your project, use the OpenStack CLI:
+
+```bash
+openstack --os-cloud your-cloud region list
+```
+
+Or check your provider's control panel for a list of available regions.
+
+### Single-Region vs Multi-Region
+
+| Configuration | Key | Behavior |
+|--------------|-----|----------|
+| Single region | `region_name: UK1` | Scans resources in UK1 only |
+| Multi-region | `regions: [UK1, DE1]` | Scans resources in both UK1 and DE1 |
+
+You can keep both configurations as separate cloud entries in the same `clouds.yaml` file:
+
+```yaml
+clouds:
+ # Single region entry — targets UK1 only
+ ovh:
+ auth:
+ auth_url: https://auth.cloud.ovh.net/v3
+ username: user-xxxxxxxxxx
+ password: your-password-here
+ project_id: your-project-id
+ user_domain_name: Default
+ project_domain_name: Default
+ region_name: UK1
+ identity_api_version: "3"
+
+ # Multi-region entry — targets UK1 and DE1
+ ovh-multiregion:
+ auth:
+ auth_url: https://auth.cloud.ovh.net/v3
+ username: user-xxxxxxxxxx
+ password: your-password-here
+ project_id: your-project-id
+ user_domain_name: Default
+ project_domain_name: Default
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: "3"
+```
+
## Creating a User With Reader Role
For security auditing, Prowler only needs **read-only access** to your OpenStack resources.
@@ -534,3 +627,4 @@ Using Public Cloud credentials can limit Keystone API access, so the command abo
- [OpenStack Documentation](https://docs.openstack.org/)
- [OpenStack Security Guide](https://docs.openstack.org/security-guide/)
- [clouds.yaml Format](https://docs.openstack.org/python-openstackclient/latest/configuration/index.html)
+- [OpenStack SDK Configuration (`region_name` / `regions`)](https://docs.openstack.org/openstacksdk/latest/user/config/configuration.html#site-specific-file-locations)
diff --git a/docs/user-guide/providers/openstack/getting-started-openstack.mdx b/docs/user-guide/providers/openstack/getting-started-openstack.mdx
index bfb840d303..e41806059a 100644
--- a/docs/user-guide/providers/openstack/getting-started-openstack.mdx
+++ b/docs/user-guide/providers/openstack/getting-started-openstack.mdx
@@ -180,6 +180,36 @@ prowler openstack --clouds-yaml-cloud production --output-directory ./reports/pr
prowler openstack --clouds-yaml-cloud staging --output-directory ./reports/staging/
```
+**Scan all regions in a single run:**
+
+If your OpenStack project spans multiple regions, replace `region_name` with a `regions` list in your `clouds.yaml`:
+
+```yaml
+clouds:
+ ovh-multiregion:
+ auth:
+ auth_url: https://auth.cloud.ovh.net/v3
+ username: user-xxxxxxxxxx
+ password: your-password-here
+ project_id: your-project-id
+ user_domain_name: Default
+ project_domain_name: Default
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: "3"
+```
+
+```bash
+prowler openstack --clouds-yaml-cloud ovh-multiregion
+```
+
+Prowler will connect to each region and scan resources across all of them. See the [Authentication guide](/user-guide/providers/openstack/authentication#multi-region-scanning) for more details.
+
+
+You must use either `region_name` (single region) or `regions` (multi-region list), not both.
+
+
**Use mutelist to suppress findings:**
Create a mutelist file to suppress known findings:
diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md
index f57911bf28..d10ff5689b 100644
--- a/prowler/CHANGELOG.md
+++ b/prowler/CHANGELOG.md
@@ -28,6 +28,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Registry scan mode for `image` provider: enumerate and scan all images from OCI standard, Docker Hub, and ECR [(#9985)](https://github.com/prowler-cloud/prowler/pull/9985)
- Add file descriptor limits (`ulimits`) to Docker Compose worker services to prevent `Too many open files` errors [(#10107)](https://github.com/prowler-cloud/prowler/pull/10107)
- CIS 6.0 for the AWS provider [(#10127)](https://github.com/prowler-cloud/prowler/pull/10127)
+- OpenStack provider multiple regions support [(#10135)](https://github.com/prowler-cloud/prowler/pull/10135)
### 🐞 Fixed
diff --git a/prowler/providers/openstack/exceptions/exceptions.py b/prowler/providers/openstack/exceptions/exceptions.py
index 941f6b7084..c78e35c475 100644
--- a/prowler/providers/openstack/exceptions/exceptions.py
+++ b/prowler/providers/openstack/exceptions/exceptions.py
@@ -42,6 +42,18 @@ class OpenStackBaseException(ProwlerException):
"message": "Invalid or malformed clouds.yaml configuration file",
"remediation": "Check that the clouds.yaml file is valid YAML and follows the OpenStack configuration format.",
},
+ (10009, "OpenStackInvalidProviderIdError"): {
+ "message": "Provider ID does not match the project_id in clouds.yaml",
+ "remediation": "Ensure the provider_id matches the project_id configured in your clouds.yaml file.",
+ },
+ (10010, "OpenStackNoRegionError"): {
+ "message": "No region configuration found in clouds.yaml",
+ "remediation": "Add either 'region_name' (single region) or 'regions' (list of regions) to your cloud configuration in clouds.yaml.",
+ },
+ (10011, "OpenStackAmbiguousRegionError"): {
+ "message": "Ambiguous region configuration in clouds.yaml",
+ "remediation": "Use either 'region_name' or 'regions' in your cloud configuration, not both.",
+ },
}
def __init__(self, code, file=None, original_exception=None, message=None):
@@ -164,3 +176,39 @@ class OpenStackInvalidConfigError(OpenStackBaseException):
original_exception=original_exception,
message=message,
)
+
+
+class OpenStackInvalidProviderIdError(OpenStackBaseException):
+ """Exception for provider_id not matching project_id in clouds.yaml"""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ code=10009,
+ file=file,
+ original_exception=original_exception,
+ message=message,
+ )
+
+
+class OpenStackNoRegionError(OpenStackBaseException):
+ """Exception for missing region configuration in clouds.yaml"""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ code=10010,
+ file=file,
+ original_exception=original_exception,
+ message=message,
+ )
+
+
+class OpenStackAmbiguousRegionError(OpenStackBaseException):
+ """Exception for ambiguous region configuration in clouds.yaml"""
+
+ def __init__(self, file=None, original_exception=None, message=None):
+ super().__init__(
+ code=10011,
+ file=file,
+ original_exception=original_exception,
+ message=message,
+ )
diff --git a/prowler/providers/openstack/lib/service/service.py b/prowler/providers/openstack/lib/service/service.py
index 5190da589a..716c797423 100644
--- a/prowler/providers/openstack/lib/service/service.py
+++ b/prowler/providers/openstack/lib/service/service.py
@@ -9,8 +9,14 @@ class OpenStackService:
self.service_name = service_name
self.provider = provider
self.connection = provider.connection
+ self.regional_connections = provider.regional_connections
+ self.audited_regions = list(provider.regional_connections.keys())
self.session = provider.session
- self.region = provider.session.region_name
+ self.region = (
+ provider.session.region_name
+ or ", ".join(provider.session.regions or [])
+ or "global"
+ )
self.project_id = provider.session.project_id
self.identity = provider.identity
self.audit_config = provider.audit_config
diff --git a/prowler/providers/openstack/models.py b/prowler/providers/openstack/models.py
index aeba6948f1..1b3b750a0f 100644
--- a/prowler/providers/openstack/models.py
+++ b/prowler/providers/openstack/models.py
@@ -1,5 +1,5 @@
import re
-from typing import Optional
+from typing import List, Optional
from pydantic.v1 import BaseModel, Field
@@ -35,27 +35,42 @@ class OpenStackSession(BaseModel):
username: str
password: str
project_id: str
- region_name: str
+ region_name: Optional[str] = None
+ regions: Optional[List[str]] = None
user_domain_name: str = Field(default="Default")
project_domain_name: str = Field(default="Default")
- def as_sdk_config(self) -> dict:
+ def as_sdk_config(self, region_override: Optional[str] = None) -> dict:
"""Return a dict compatible with openstacksdk.connect().
Note: The OpenStack SDK distinguishes between project_id (must be UUID)
and project_name (any string identifier). We accept project_id from users
but internally pass it as project_name to the SDK if it's not a UUID.
This allows compatibility with providers like OVH that use numeric IDs.
+
+ When ``regions`` is set (multi-region), we pass the first region as
+ ``region_name`` for the default connection. The SDK does **not**
+ iterate over a ``regions`` list automatically — callers must create
+ one connection per region via ``regional_connections``.
+
+ Args:
+ region_override: If provided, use this region instead of the
+ session's ``region_name`` / first entry in ``regions``.
"""
config = {
"auth_url": self.auth_url,
"username": self.username,
"password": self.password,
- "region_name": self.region_name,
"project_domain_name": self.project_domain_name,
"user_domain_name": self.user_domain_name,
"identity_api_version": self.identity_api_version,
}
+ # Determine region: explicit override > session region_name > first in regions list
+ region = region_override or self.region_name
+ if region:
+ config["region_name"] = region
+ elif self.regions:
+ config["region_name"] = self.regions[0]
# If project_id is a UUID, pass it as project_id to SDK
# Otherwise, pass it as project_name (e.g., OVH numeric IDs)
if _is_uuid(self.project_id):
diff --git a/prowler/providers/openstack/openstack_provider.py b/prowler/providers/openstack/openstack_provider.py
index d892dc1abb..e81a399478 100644
--- a/prowler/providers/openstack/openstack_provider.py
+++ b/prowler/providers/openstack/openstack_provider.py
@@ -18,11 +18,14 @@ from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
from prowler.providers.openstack.exceptions.exceptions import (
+ OpenStackAmbiguousRegionError,
OpenStackAuthenticationError,
OpenStackCloudNotFoundError,
OpenStackConfigFileNotFoundError,
OpenStackCredentialsError,
OpenStackInvalidConfigError,
+ OpenStackInvalidProviderIdError,
+ OpenStackNoRegionError,
OpenStackSessionError,
)
from prowler.providers.openstack.lib.mutelist.mutelist import OpenStackMutelist
@@ -81,7 +84,22 @@ class OpenstackProvider(Provider):
user_domain_name=user_domain_name,
project_domain_name=project_domain_name,
)
- self._connection = OpenstackProvider._create_connection(self._session)
+
+ # Build per-region connections. When ``regions`` is configured
+ # (multi-region clouds.yaml) we create one connection per region;
+ # otherwise a single connection is created.
+ if self._session.regions:
+ self._regional_connections: dict[str, OpenStackConnection] = {}
+ for region in self._session.regions:
+ self._regional_connections[region] = (
+ OpenstackProvider._create_connection(self._session, region=region)
+ )
+ # Default connection = first region (used for identity setup, etc.)
+ self._connection = next(iter(self._regional_connections.values()))
+ else:
+ self._connection = OpenstackProvider._create_connection(self._session)
+ self._regional_connections = {self._session.region_name: self._connection}
+
self._identity = OpenstackProvider.setup_identity(
self._connection, self._session
)
@@ -132,6 +150,10 @@ class OpenstackProvider(Provider):
def connection(self) -> OpenStackConnection:
return self._connection
+ @property
+ def regional_connections(self) -> dict[str, OpenStackConnection]:
+ return self._regional_connections
+
@staticmethod
def setup_session(
clouds_yaml_file: Optional[str] = None,
@@ -269,13 +291,27 @@ class OpenstackProvider(Provider):
message=f"Missing required fields in clouds.yaml for cloud '{clouds_yaml_cloud}': {', '.join(missing_fields)}",
)
+ # Validate region configuration: must have region_name XOR regions
+ region_name = cloud_config.get("region_name")
+ regions = cloud_config.get("regions")
+
+ if region_name and regions:
+ raise OpenStackAmbiguousRegionError(
+ message=f"Cloud '{clouds_yaml_cloud}' has both 'region_name' and 'regions' configured. Use one or the other.",
+ )
+ if not region_name and not regions:
+ raise OpenStackNoRegionError(
+ message=f"Cloud '{clouds_yaml_cloud}' has neither 'region_name' nor 'regions' configured. Add one to your clouds.yaml.",
+ )
+
return OpenStackSession(
auth_url=auth_dict.get("auth_url"),
identity_api_version=str(cloud_config.get("identity_api_version", "3")),
username=auth_dict.get("username"),
password=auth_dict.get("password"),
project_id=auth_dict.get("project_id") or auth_dict.get("project_name"),
- region_name=cloud_config.get("region_name"),
+ region_name=region_name,
+ regions=regions,
user_domain_name=auth_dict.get("user_domain_name", "Default"),
project_domain_name=auth_dict.get("project_domain_name", "Default"),
)
@@ -360,6 +396,28 @@ class OpenstackProvider(Provider):
message=f"Missing required fields in clouds.yaml for cloud '{clouds_yaml_cloud}': {', '.join(missing_fields)}",
)
+ # Get raw cloud config to validate region configuration.
+ # cloud_config.config is the SDK-processed config (CloudRegion),
+ # which may not preserve the 'regions' key. os_config.cloud_config
+ # holds the original parsed YAML before SDK processing.
+ raw_cloud_config = os_config.cloud_config.get("clouds", {}).get(
+ clouds_yaml_cloud, {}
+ )
+
+ region_name = raw_cloud_config.get("region_name")
+ regions = raw_cloud_config.get("regions")
+
+ if region_name and regions:
+ raise OpenStackAmbiguousRegionError(
+ file=clouds_yaml_file,
+ message=f"Cloud '{clouds_yaml_cloud}' has both 'region_name' and 'regions' configured. Use one or the other.",
+ )
+ if not region_name and not regions:
+ raise OpenStackNoRegionError(
+ file=clouds_yaml_file,
+ message=f"Cloud '{clouds_yaml_cloud}' has neither 'region_name' nor 'regions' configured. Add one to your clouds.yaml.",
+ )
+
# Build OpenStackSession from cloud config
return OpenStackSession(
auth_url=auth_dict.get("auth_url"),
@@ -369,7 +427,8 @@ class OpenstackProvider(Provider):
username=auth_dict.get("username"),
password=auth_dict.get("password"),
project_id=auth_dict.get("project_id") or auth_dict.get("project_name"),
- region_name=cloud_config.config.get("region_name"),
+ region_name=region_name,
+ regions=regions,
user_domain_name=auth_dict.get("user_domain_name", "Default"),
project_domain_name=auth_dict.get("project_domain_name", "Default"),
)
@@ -378,6 +437,8 @@ class OpenstackProvider(Provider):
OpenStackConfigFileNotFoundError,
OpenStackCloudNotFoundError,
OpenStackInvalidConfigError,
+ OpenStackNoRegionError,
+ OpenStackAmbiguousRegionError,
):
# Re-raise our custom exceptions
raise
@@ -395,6 +456,7 @@ class OpenstackProvider(Provider):
@staticmethod
def _create_connection(
session: OpenStackSession,
+ region: str | None = None,
) -> OpenStackConnection:
"""Initialize the OpenStack SDK connection.
@@ -402,13 +464,18 @@ class OpenstackProvider(Provider):
and environment variables to ensure Prowler uses only the credentials
provided through its own configuration mechanisms (CLI args, config file,
or environment variables read by Prowler itself in setup_session()).
+
+ Args:
+ session: The OpenStack session configuration.
+ region: Optional region override — when given, the connection is
+ scoped to this specific region instead of the session default.
"""
try:
# Don't load from clouds.yaml or environment variables, we configure this in setup_session()
conn = connect(
load_yaml_config=False,
load_envvars=False,
- **session.as_sdk_config(),
+ **session.as_sdk_config(region_override=region),
)
conn.authorize()
return conn
@@ -463,7 +530,7 @@ class OpenstackProvider(Provider):
username=user_name,
project_id=project_id,
project_name=project_name,
- region_name=session.region_name,
+ region_name=session.region_name or ", ".join(session.regions or []),
user_domain_name=session.user_domain_name,
project_domain_name=session.project_domain_name,
)
@@ -481,6 +548,7 @@ class OpenstackProvider(Provider):
region_name: Optional[str] = None,
user_domain_name: Optional[str] = None,
project_domain_name: Optional[str] = None,
+ provider_id: Optional[str] = None,
raise_on_exception: bool = True,
) -> Connection:
"""Test connection to OpenStack without creating a full provider instance.
@@ -500,6 +568,7 @@ class OpenstackProvider(Provider):
region_name: OpenStack region name
user_domain_name: User domain name (default: "Default")
project_domain_name: Project domain name (default: "Default")
+ provider_id: OpenStack provider ID for validation (optional)
raise_on_exception: Whether to raise exception on failure (default: True)
Returns:
@@ -547,8 +616,18 @@ class OpenstackProvider(Provider):
project_domain_name=project_domain_name,
)
- # Create and test connection
- OpenstackProvider._create_connection(session)
+ # Validate provider_id matches project_id from config
+ if provider_id and session.project_id != provider_id:
+ raise OpenStackInvalidProviderIdError(
+ message=f"Provider ID '{provider_id}' does not match project_id '{session.project_id}' from clouds.yaml",
+ )
+
+ # Create and test connection(s) — one per region when multi-region
+ if session.regions:
+ for region in session.regions:
+ OpenstackProvider._create_connection(session, region=region)
+ else:
+ OpenstackProvider._create_connection(session)
logger.info("OpenStack provider: Connection test successful")
return Connection(is_connected=True)
@@ -560,6 +639,9 @@ class OpenstackProvider(Provider):
OpenStackConfigFileNotFoundError,
OpenStackCloudNotFoundError,
OpenStackInvalidConfigError,
+ OpenStackInvalidProviderIdError,
+ OpenStackNoRegionError,
+ OpenStackAmbiguousRegionError,
) as error:
logger.error(f"OpenStack connection test failed: {error}")
if raise_on_exception:
@@ -578,18 +660,23 @@ class OpenstackProvider(Provider):
def print_credentials(self) -> None:
"""Output sanitized credential summary."""
- region = self._session.region_name
auth_url = self._session.auth_url
project_id = self._session.project_id
username = self._identity.username
+
+ if self._session.regions:
+ region_display = ", ".join(self._session.regions)
+ else:
+ region_display = self._session.region_name
+
messages = [
f"Auth URL: {auth_url}",
f"Project ID: {project_id}",
f"Username: {username}",
- f"Region: {region}",
+ f"Region: {region_display}",
]
print_boxes(messages, "OpenStack Credentials")
logger.info(
f"Using OpenStack endpoint {Fore.YELLOW}{auth_url}{Style.RESET_ALL} "
- f"in region {Fore.YELLOW}{region}{Style.RESET_ALL}"
+ f"in region {Fore.YELLOW}{region_display}{Style.RESET_ALL}"
)
diff --git a/prowler/providers/openstack/services/compute/compute_service.py b/prowler/providers/openstack/services/compute/compute_service.py
index 65233930b4..7d32c54f69 100644
--- a/prowler/providers/openstack/services/compute/compute_service.py
+++ b/prowler/providers/openstack/services/compute/compute_service.py
@@ -15,125 +15,128 @@ class Compute(OpenStackService):
def __init__(self, provider) -> None:
super().__init__(__class__.__name__, provider)
- self.client = self.connection.compute
self.instances: List[ComputeInstance] = []
self._list_instances()
def _list_instances(self) -> None:
- """List all compute instances in the current project."""
+ """List all compute instances across all audited regions."""
logger.info("Compute - Listing instances...")
- try:
- for server in self.client.servers():
- # Extract security group names (handle None case)
- sg_list = getattr(server, "security_groups", None) or []
- security_groups = [sg.get("name", "") for sg in sg_list]
+ for region, conn in self.regional_connections.items():
+ try:
+ for server in conn.compute.servers():
+ # Extract security group names (handle None case)
+ sg_list = getattr(server, "security_groups", None) or []
+ security_groups = [sg.get("name", "") for sg in sg_list]
- # Extract network information from addresses
- networks_dict = {}
- addresses_attr = getattr(server, "addresses", None)
- if addresses_attr:
- for net_name, addr_list in addresses_attr.items():
- # addr_list is a list of dicts like:
- # [{'version': 4, 'addr': '57.128.163.151', 'OS-EXT-IPS:type': 'fixed'}]
- ip_list = []
- if isinstance(addr_list, list):
- for addr_dict in addr_list:
- if isinstance(addr_dict, dict) and "addr" in addr_dict:
- ip_list.append(addr_dict["addr"])
- elif isinstance(addr_dict, str):
- # Fallback: if it's just a string IP
- ip_list.append(addr_dict)
- elif isinstance(addr_list, str):
- # Fallback: single string IP
- ip_list = [addr_list]
- networks_dict[net_name] = ip_list
+ # Extract network information from addresses
+ networks_dict = {}
+ addresses_attr = getattr(server, "addresses", None)
+ if addresses_attr:
+ for net_name, addr_list in addresses_attr.items():
+ # addr_list is a list of dicts like:
+ # [{'version': 4, 'addr': '57.128.163.151', 'OS-EXT-IPS:type': 'fixed'}]
+ ip_list = []
+ if isinstance(addr_list, list):
+ for addr_dict in addr_list:
+ if (
+ isinstance(addr_dict, dict)
+ and "addr" in addr_dict
+ ):
+ ip_list.append(addr_dict["addr"])
+ elif isinstance(addr_dict, str):
+ # Fallback: if it's just a string IP
+ ip_list.append(addr_dict)
+ elif isinstance(addr_list, str):
+ # Fallback: single string IP
+ ip_list = [addr_list]
+ networks_dict[net_name] = ip_list
- # Extract trusted image certificates
- trusted_certs = (
- getattr(server, "trusted_image_certificates", None) or []
- )
-
- # Get SDK computed properties
- public_v4 = getattr(server, "public_v4", "")
- public_v6 = getattr(server, "public_v6", "")
- private_v4 = getattr(server, "private_v4", "")
- private_v6 = getattr(server, "private_v6", "")
-
- # Fallback: If SDK attributes are not populated, classify IPs from networks
- # This handles clouds where SDK computed properties are not available
- if (
- not (public_v4 or public_v6 or private_v4 or private_v6)
- and networks_dict
- ):
- for network_name, ip_list in networks_dict.items():
- for ip_str in ip_list:
- try:
- ip_obj = ipaddress.ip_address(ip_str)
- # Classify as private or public
- if ip_obj.is_private:
- # Assign first private IP found to appropriate field
- if ip_obj.version == 4 and not private_v4:
- private_v4 = ip_str
- elif ip_obj.version == 6 and not private_v6:
- private_v6 = ip_str
- elif not (
- ip_obj.is_loopback
- or ip_obj.is_link_local
- or ip_obj.is_reserved
- or ip_obj.is_multicast
- ):
- # Assign first public IP found to appropriate field
- if ip_obj.version == 4 and not public_v4:
- public_v4 = ip_str
- elif ip_obj.version == 6 and not public_v6:
- public_v6 = ip_str
- except ValueError:
- # Invalid IP address, skip
- continue
-
- self.instances.append(
- ComputeInstance(
- # Basic instance information
- id=getattr(server, "id", ""),
- name=getattr(server, "name", ""),
- status=getattr(server, "status", ""),
- flavor_id=getattr(server, "flavor", {}).get("id", ""),
- security_groups=security_groups,
- region=self.region,
- project_id=self.project_id,
- # Access Control & Authentication
- is_locked=getattr(server, "is_locked", False),
- locked_reason=getattr(server, "locked_reason", ""),
- key_name=getattr(server, "key_name", ""),
- user_id=getattr(server, "user_id", ""),
- # Network Exposure
- access_ipv4=getattr(server, "access_ipv4", ""),
- access_ipv6=getattr(server, "access_ipv6", ""),
- public_v4=public_v4,
- public_v6=public_v6,
- private_v4=private_v4,
- private_v6=private_v6,
- networks=networks_dict,
- # Configuration Security
- has_config_drive=getattr(server, "has_config_drive", False),
- metadata=getattr(server, "metadata", {}),
- user_data=getattr(server, "user_data", ""),
- # Image Trust
- trusted_image_certificates=(
- trusted_certs if isinstance(trusted_certs, list) else []
- ),
+ # Extract trusted image certificates
+ trusted_certs = (
+ getattr(server, "trusted_image_certificates", None) or []
)
+
+ # Get SDK computed properties
+ public_v4 = getattr(server, "public_v4", "")
+ public_v6 = getattr(server, "public_v6", "")
+ private_v4 = getattr(server, "private_v4", "")
+ private_v6 = getattr(server, "private_v6", "")
+
+ # Fallback: If SDK attributes are not populated, classify IPs from networks
+ # This handles clouds where SDK computed properties are not available
+ if (
+ not (public_v4 or public_v6 or private_v4 or private_v6)
+ and networks_dict
+ ):
+ for network_name, ip_list in networks_dict.items():
+ for ip_str in ip_list:
+ try:
+ ip_obj = ipaddress.ip_address(ip_str)
+ # Classify as private or public
+ if ip_obj.is_private:
+ # Assign first private IP found to appropriate field
+ if ip_obj.version == 4 and not private_v4:
+ private_v4 = ip_str
+ elif ip_obj.version == 6 and not private_v6:
+ private_v6 = ip_str
+ elif not (
+ ip_obj.is_loopback
+ or ip_obj.is_link_local
+ or ip_obj.is_reserved
+ or ip_obj.is_multicast
+ ):
+ # Assign first public IP found to appropriate field
+ if ip_obj.version == 4 and not public_v4:
+ public_v4 = ip_str
+ elif ip_obj.version == 6 and not public_v6:
+ public_v6 = ip_str
+ except ValueError:
+ # Invalid IP address, skip
+ continue
+
+ self.instances.append(
+ ComputeInstance(
+ # Basic instance information
+ id=getattr(server, "id", ""),
+ name=getattr(server, "name", ""),
+ status=getattr(server, "status", ""),
+ flavor_id=getattr(server, "flavor", {}).get("id", ""),
+ security_groups=security_groups,
+ region=region,
+ project_id=self.project_id,
+ # Access Control & Authentication
+ is_locked=getattr(server, "is_locked", False),
+ locked_reason=getattr(server, "locked_reason", ""),
+ key_name=getattr(server, "key_name", ""),
+ user_id=getattr(server, "user_id", ""),
+ # Network Exposure
+ access_ipv4=getattr(server, "access_ipv4", ""),
+ access_ipv6=getattr(server, "access_ipv6", ""),
+ public_v4=public_v4,
+ public_v6=public_v6,
+ private_v4=private_v4,
+ private_v6=private_v6,
+ networks=networks_dict,
+ # Configuration Security
+ has_config_drive=getattr(server, "has_config_drive", False),
+ metadata=getattr(server, "metadata", {}),
+ user_data=getattr(server, "user_data", ""),
+ # Image Trust
+ trusted_image_certificates=(
+ trusted_certs if isinstance(trusted_certs, list) else []
+ ),
+ )
+ )
+ except openstack_exceptions.SDKException as error:
+ logger.error(
+ f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
+ f"Failed to list compute instances in region {region}: {error}"
+ )
+ except Exception as error:
+ logger.error(
+ f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
+ f"Unexpected error listing compute instances in region {region}: {error}"
)
- except openstack_exceptions.SDKException as error:
- logger.error(
- f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
- f"Failed to list compute instances: {error}"
- )
- except Exception as error:
- logger.error(
- f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- "
- f"Unexpected error listing compute instances: {error}"
- )
@dataclass
diff --git a/tests/providers/openstack/openstack_fixtures.py b/tests/providers/openstack/openstack_fixtures.py
index 75b7f5bb6d..fb8b5935f4 100644
--- a/tests/providers/openstack/openstack_fixtures.py
+++ b/tests/providers/openstack/openstack_fixtures.py
@@ -62,6 +62,9 @@ def set_mocked_openstack_provider(
# Mock connection
provider.connection = MagicMock()
+ # Mock regional connections (single-region default)
+ provider.regional_connections = {region_name: provider.connection}
+
# Mock audit config
provider.audit_config = audit_config or {}
provider.fixer_config = {}
diff --git a/tests/providers/openstack/openstack_provider_test.py b/tests/providers/openstack/openstack_provider_test.py
index 8033df03dd..654d109134 100644
--- a/tests/providers/openstack/openstack_provider_test.py
+++ b/tests/providers/openstack/openstack_provider_test.py
@@ -14,11 +14,14 @@ from prowler.config.config import (
)
from prowler.providers.common.models import Connection
from prowler.providers.openstack.exceptions.exceptions import (
+ OpenStackAmbiguousRegionError,
OpenStackAuthenticationError,
OpenStackCloudNotFoundError,
OpenStackConfigFileNotFoundError,
OpenStackCredentialsError,
OpenStackInvalidConfigError,
+ OpenStackInvalidProviderIdError,
+ OpenStackNoRegionError,
)
from prowler.providers.openstack.models import OpenStackIdentityInfo, OpenStackSession
from prowler.providers.openstack.openstack_provider import OpenstackProvider
@@ -1023,3 +1026,623 @@ clouds:
assert provider.session.auth_url == "https://openstack.example.com:5000/v3"
assert provider.session.username == "test-user"
assert provider.session.project_id == "test-project"
+
+
+class TestOpenstackProviderRegionValidation:
+ """Test suite for OpenStack Provider region validation (region_name XOR regions)."""
+
+ @pytest.fixture(autouse=True)
+ def clean_openstack_env(self, monkeypatch):
+ """Ensure clean OpenStack environment for all tests."""
+ openstack_env_vars = [
+ "OS_AUTH_URL",
+ "OS_USERNAME",
+ "OS_PASSWORD",
+ "OS_PROJECT_ID",
+ "OS_REGION_NAME",
+ "OS_CLOUD",
+ "OS_IDENTITY_API_VERSION",
+ "OS_USER_DOMAIN_NAME",
+ "OS_PROJECT_DOMAIN_NAME",
+ ]
+ for env_var in openstack_env_vars:
+ monkeypatch.delenv(env_var, raising=False)
+
+ def test_clouds_yaml_content_with_region_name_only(self):
+ """Test that clouds.yaml content with only region_name produces a valid session."""
+ clouds_yaml_content = """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ region_name: RegionOne
+ identity_api_version: 3
+"""
+ session = OpenstackProvider._setup_session_from_clouds_yaml_content(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ assert session.region_name == "RegionOne"
+ assert session.regions is None
+
+ def test_clouds_yaml_content_with_regions_list_only(self):
+ """Test that clouds.yaml content with only regions list produces a valid session."""
+ clouds_yaml_content = """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ regions:
+ - RegionOne
+ - RegionTwo
+ identity_api_version: 3
+"""
+ session = OpenstackProvider._setup_session_from_clouds_yaml_content(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ assert session.region_name is None
+ assert session.regions == ["RegionOne", "RegionTwo"]
+
+ def test_clouds_yaml_content_with_both_region_name_and_regions(self):
+ """Test that clouds.yaml content with both region_name and regions raises error."""
+ clouds_yaml_content = """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ region_name: RegionOne
+ regions:
+ - RegionOne
+ - RegionTwo
+ identity_api_version: 3
+"""
+ with pytest.raises(OpenStackAmbiguousRegionError) as excinfo:
+ OpenstackProvider._setup_session_from_clouds_yaml_content(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ assert "both 'region_name' and 'regions'" in str(excinfo.value)
+
+ def test_clouds_yaml_content_with_neither_region_name_nor_regions(self):
+ """Test that clouds.yaml content with neither region_name nor regions raises error."""
+ clouds_yaml_content = """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ identity_api_version: 3
+"""
+ with pytest.raises(OpenStackNoRegionError) as excinfo:
+ OpenstackProvider._setup_session_from_clouds_yaml_content(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ assert "neither 'region_name' nor 'regions'" in str(excinfo.value)
+
+ def test_clouds_yaml_file_with_regions_list(self, tmp_path):
+ """Test loading clouds.yaml file with regions list."""
+ clouds_yaml = tmp_path / "clouds.yaml"
+ clouds_yaml.write_text(
+ """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ regions:
+ - RegionOne
+ - RegionTwo
+ identity_api_version: 3
+"""
+ )
+
+ mock_connection = MagicMock()
+ mock_connection.authorize.return_value = None
+ mock_connection.current_user_id = None
+ mock_connection.current_project_id = "test-project-id"
+ mock_connection.identity.get_project.return_value = None
+
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.return_value = mock_connection
+
+ provider = OpenstackProvider(
+ clouds_yaml_file=str(clouds_yaml),
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ assert provider.session.region_name is None
+ assert provider.session.regions == ["RegionOne", "RegionTwo"]
+
+ def test_clouds_yaml_file_with_both_regions_raises_error(self, tmp_path):
+ """Test that clouds.yaml file with both region_name and regions raises error."""
+ clouds_yaml = tmp_path / "clouds.yaml"
+ clouds_yaml.write_text(
+ """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ region_name: RegionOne
+ regions:
+ - RegionOne
+ - RegionTwo
+ identity_api_version: 3
+"""
+ )
+
+ with pytest.raises(OpenStackAmbiguousRegionError):
+ OpenstackProvider(
+ clouds_yaml_file=str(clouds_yaml),
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ def test_clouds_yaml_file_with_no_region_raises_error(self, tmp_path):
+ """Test that clouds.yaml file with neither region_name nor regions raises error."""
+ clouds_yaml = tmp_path / "clouds.yaml"
+ clouds_yaml.write_text(
+ """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ identity_api_version: 3
+"""
+ )
+
+ with pytest.raises(OpenStackNoRegionError):
+ OpenstackProvider(
+ clouds_yaml_file=str(clouds_yaml),
+ clouds_yaml_cloud="test-cloud",
+ )
+
+ def test_session_as_sdk_config_with_regions_list(self):
+ """Test OpenStackSession.as_sdk_config() with regions list uses first region."""
+ session = OpenStackSession(
+ auth_url="https://openstack.example.com:5000/v3",
+ identity_api_version="3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project",
+ regions=["RegionOne", "RegionTwo"],
+ user_domain_name="Default",
+ project_domain_name="Default",
+ )
+
+ sdk_config = session.as_sdk_config()
+
+ # SDK does not iterate over regions automatically, so we pass the
+ # first region as region_name for the default connection
+ assert sdk_config["region_name"] == "RegionOne"
+ assert "regions" not in sdk_config
+
+ def test_session_as_sdk_config_with_region_name(self):
+ """Test OpenStackSession.as_sdk_config() with single region_name."""
+ session = OpenStackSession(
+ auth_url="https://openstack.example.com:5000/v3",
+ identity_api_version="3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project",
+ region_name="RegionOne",
+ user_domain_name="Default",
+ project_domain_name="Default",
+ )
+
+ sdk_config = session.as_sdk_config()
+
+ assert sdk_config["region_name"] == "RegionOne"
+ assert "regions" not in sdk_config
+
+
+class TestOpenstackProviderIdValidation:
+ """Test suite for OpenStack Provider ID validation."""
+
+ @pytest.fixture(autouse=True)
+ def clean_openstack_env(self, monkeypatch):
+ """Ensure clean OpenStack environment for all tests."""
+ openstack_env_vars = [
+ "OS_AUTH_URL",
+ "OS_USERNAME",
+ "OS_PASSWORD",
+ "OS_PROJECT_ID",
+ "OS_REGION_NAME",
+ "OS_CLOUD",
+ "OS_IDENTITY_API_VERSION",
+ "OS_USER_DOMAIN_NAME",
+ "OS_PROJECT_DOMAIN_NAME",
+ ]
+ for env_var in openstack_env_vars:
+ monkeypatch.delenv(env_var, raising=False)
+
+ def test_test_connection_provider_id_matches(self):
+ """Test test_connection succeeds when provider_id matches project_id."""
+ mock_connection = MagicMock()
+ mock_connection.authorize.return_value = None
+
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.return_value = mock_connection
+
+ connection_result = OpenstackProvider.test_connection(
+ auth_url="https://openstack.example.com:5000/v3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project-id",
+ region_name="RegionOne",
+ provider_id="test-project-id",
+ raise_on_exception=False,
+ )
+
+ assert connection_result.is_connected is True
+ assert connection_result.error is None
+
+ def test_test_connection_provider_id_does_not_match(self):
+ """Test test_connection fails when provider_id doesn't match project_id."""
+ connection_result = OpenstackProvider.test_connection(
+ auth_url="https://openstack.example.com:5000/v3",
+ username="test-user",
+ password="test-password",
+ project_id="actual-project-id",
+ region_name="RegionOne",
+ provider_id="different-project-id",
+ raise_on_exception=False,
+ )
+
+ assert connection_result.is_connected is False
+ assert isinstance(connection_result.error, OpenStackInvalidProviderIdError)
+
+ def test_test_connection_provider_id_mismatch_raises(self):
+ """Test test_connection raises when provider_id doesn't match and raise_on_exception=True."""
+ with pytest.raises(OpenStackInvalidProviderIdError) as excinfo:
+ OpenstackProvider.test_connection(
+ auth_url="https://openstack.example.com:5000/v3",
+ username="test-user",
+ password="test-password",
+ project_id="actual-project-id",
+ region_name="RegionOne",
+ provider_id="different-project-id",
+ raise_on_exception=True,
+ )
+
+ assert "different-project-id" in str(excinfo.value)
+ assert "actual-project-id" in str(excinfo.value)
+
+ def test_test_connection_no_provider_id_skips_validation(self):
+ """Test test_connection skips provider_id validation when not provided."""
+ mock_connection = MagicMock()
+ mock_connection.authorize.return_value = None
+
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.return_value = mock_connection
+
+ connection_result = OpenstackProvider.test_connection(
+ auth_url="https://openstack.example.com:5000/v3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project-id",
+ region_name="RegionOne",
+ raise_on_exception=False,
+ )
+
+ assert connection_result.is_connected is True
+
+ def test_test_connection_provider_id_with_clouds_yaml_content(self):
+ """Test test_connection validates provider_id against clouds.yaml content project_id."""
+ clouds_yaml_content = """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: yaml-project-id
+ region_name: RegionOne
+"""
+ connection_result = OpenstackProvider.test_connection(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="test-cloud",
+ provider_id="wrong-project-id",
+ raise_on_exception=False,
+ )
+
+ assert connection_result.is_connected is False
+ assert isinstance(connection_result.error, OpenStackInvalidProviderIdError)
+
+ def test_test_connection_region_error_surfaced(self):
+ """Test test_connection surfaces region validation errors."""
+ clouds_yaml_content = """
+clouds:
+ test-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ identity_api_version: 3
+"""
+ connection_result = OpenstackProvider.test_connection(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="test-cloud",
+ raise_on_exception=False,
+ )
+
+ assert connection_result.is_connected is False
+ assert isinstance(connection_result.error, OpenStackNoRegionError)
+
+
+class TestOpenstackProviderRegionalConnections:
+ """Test suite for OpenStack Provider regional_connections."""
+
+ @pytest.fixture(autouse=True)
+ def clean_openstack_env(self, monkeypatch):
+ """Ensure clean OpenStack environment for all tests."""
+ openstack_env_vars = [
+ "OS_AUTH_URL",
+ "OS_USERNAME",
+ "OS_PASSWORD",
+ "OS_PROJECT_ID",
+ "OS_REGION_NAME",
+ "OS_CLOUD",
+ "OS_IDENTITY_API_VERSION",
+ "OS_USER_DOMAIN_NAME",
+ "OS_PROJECT_DOMAIN_NAME",
+ ]
+ for env_var in openstack_env_vars:
+ monkeypatch.delenv(env_var, raising=False)
+
+ def test_single_region_regional_connections(self):
+ """Test regional_connections has one entry for single-region provider."""
+ mock_connection = MagicMock()
+ mock_connection.authorize.return_value = None
+ mock_connection.current_user_id = None
+ mock_connection.current_project_id = "test-project"
+ mock_connection.identity.get_project.return_value = None
+
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.return_value = mock_connection
+
+ provider = OpenstackProvider(
+ auth_url="https://openstack.example.com:5000/v3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project",
+ region_name="RegionOne",
+ )
+
+ assert len(provider.regional_connections) == 1
+ assert "RegionOne" in provider.regional_connections
+ assert provider.regional_connections["RegionOne"] is provider.connection
+ mock_connect.assert_called_once()
+
+ def test_multi_region_regional_connections(self):
+ """Test regional_connections has entries for each region in multi-region setup."""
+ mock_conn_region1 = MagicMock()
+ mock_conn_region1.authorize.return_value = None
+ mock_conn_region1.current_user_id = None
+ mock_conn_region1.current_project_id = "test-project-id"
+ mock_conn_region1.identity.get_project.return_value = None
+
+ mock_conn_region2 = MagicMock()
+ mock_conn_region2.authorize.return_value = None
+
+ clouds_yaml_content = """
+clouds:
+ multi-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: 3
+"""
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.side_effect = [mock_conn_region1, mock_conn_region2]
+
+ provider = OpenstackProvider(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="multi-cloud",
+ )
+
+ assert len(provider.regional_connections) == 2
+ assert "UK1" in provider.regional_connections
+ assert "DE1" in provider.regional_connections
+ assert provider.regional_connections["UK1"] is mock_conn_region1
+ assert provider.regional_connections["DE1"] is mock_conn_region2
+ # Default connection should be the first region
+ assert provider.connection is mock_conn_region1
+ assert mock_connect.call_count == 2
+
+ def test_multi_region_test_connection_tests_all_regions(self):
+ """Test test_connection tests connectivity to every region."""
+ mock_connection = MagicMock()
+ mock_connection.authorize.return_value = None
+
+ clouds_yaml_content = """
+clouds:
+ multi-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: 3
+"""
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.return_value = mock_connection
+
+ result = OpenstackProvider.test_connection(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="multi-cloud",
+ raise_on_exception=False,
+ )
+
+ assert result.is_connected is True
+ # Should have called connect once per region
+ assert mock_connect.call_count == 2
+
+ def test_multi_region_test_connection_fails_if_one_region_fails(self):
+ """Test test_connection fails if any region fails."""
+ mock_conn_ok = MagicMock()
+ mock_conn_ok.authorize.return_value = None
+
+ mock_conn_fail = MagicMock()
+ mock_conn_fail.authorize.side_effect = openstack_exceptions.SDKException(
+ "Connection failed in DE1"
+ )
+
+ clouds_yaml_content = """
+clouds:
+ multi-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: 3
+"""
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.side_effect = [mock_conn_ok, mock_conn_fail]
+
+ result = OpenstackProvider.test_connection(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="multi-cloud",
+ raise_on_exception=False,
+ )
+
+ assert result.is_connected is False
+
+ def test_session_as_sdk_config_region_override(self):
+ """Test as_sdk_config with region_override overrides region_name."""
+ session = OpenStackSession(
+ auth_url="https://openstack.example.com:5000/v3",
+ identity_api_version="3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project",
+ region_name="RegionOne",
+ user_domain_name="Default",
+ project_domain_name="Default",
+ )
+
+ sdk_config = session.as_sdk_config(region_override="RegionTwo")
+ assert sdk_config["region_name"] == "RegionTwo"
+
+ def test_session_as_sdk_config_region_override_with_regions_list(self):
+ """Test as_sdk_config with region_override overrides regions list."""
+ session = OpenStackSession(
+ auth_url="https://openstack.example.com:5000/v3",
+ identity_api_version="3",
+ username="test-user",
+ password="test-password",
+ project_id="test-project",
+ regions=["UK1", "DE1"],
+ user_domain_name="Default",
+ project_domain_name="Default",
+ )
+
+ sdk_config = session.as_sdk_config(region_override="DE1")
+ assert sdk_config["region_name"] == "DE1"
+
+ def test_multi_region_test_connection_provider_id_matches(self):
+ """Test test_connection validates provider_id in multi-region setup."""
+ mock_connection = MagicMock()
+ mock_connection.authorize.return_value = None
+
+ clouds_yaml_content = """
+clouds:
+ multi-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: test-project-id
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: 3
+"""
+ with patch(
+ "prowler.providers.openstack.openstack_provider.connect"
+ ) as mock_connect:
+ mock_connect.return_value = mock_connection
+
+ result = OpenstackProvider.test_connection(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="multi-cloud",
+ provider_id="test-project-id",
+ raise_on_exception=False,
+ )
+
+ assert result.is_connected is True
+
+ def test_multi_region_test_connection_provider_id_mismatch(self):
+ """Test test_connection fails when provider_id doesn't match in multi-region."""
+ clouds_yaml_content = """
+clouds:
+ multi-cloud:
+ auth:
+ auth_url: https://openstack.example.com:5000/v3
+ username: test-user
+ password: test-password
+ project_id: actual-project-id
+ regions:
+ - UK1
+ - DE1
+ identity_api_version: 3
+"""
+ result = OpenstackProvider.test_connection(
+ clouds_yaml_content=clouds_yaml_content,
+ clouds_yaml_cloud="multi-cloud",
+ provider_id="wrong-project-id",
+ raise_on_exception=False,
+ )
+
+ assert result.is_connected is False
+ assert isinstance(result.error, OpenStackInvalidProviderIdError)
diff --git a/tests/providers/openstack/services/compute/openstack_compute_service_test.py b/tests/providers/openstack/services/compute/openstack_compute_service_test.py
index 782572d6a1..9020548a13 100644
--- a/tests/providers/openstack/services/compute/openstack_compute_service_test.py
+++ b/tests/providers/openstack/services/compute/openstack_compute_service_test.py
@@ -28,9 +28,10 @@ class TestComputeService:
assert compute.service_name == "Compute"
assert compute.provider == provider
assert compute.connection == provider.connection
+ assert compute.regional_connections == provider.regional_connections
+ assert compute.audited_regions == [OPENSTACK_REGION]
assert compute.region == OPENSTACK_REGION
assert compute.project_id == OPENSTACK_PROJECT_ID
- assert compute.client == provider.connection.compute
assert compute.instances == []
mock_list.assert_called_once()
@@ -303,6 +304,8 @@ class TestComputeService:
assert hasattr(compute, "service_name")
assert hasattr(compute, "provider")
assert hasattr(compute, "connection")
+ assert hasattr(compute, "regional_connections")
+ assert hasattr(compute, "audited_regions")
assert hasattr(compute, "session")
assert hasattr(compute, "region")
assert hasattr(compute, "project_id")
@@ -343,3 +346,153 @@ class TestComputeService:
assert len(compute.instances) == 1
assert compute.instances[0].id == "instance-1"
assert compute.instances[0].networks == {} # Should default to empty dict
+
+ def test_compute_list_instances_multi_region(self):
+ """Test listing instances across multiple regions."""
+ provider = set_mocked_openstack_provider()
+
+ # Create two mock connections for two regions
+ mock_conn_uk1 = MagicMock()
+ mock_conn_de1 = MagicMock()
+
+ # Set up regional connections
+ provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
+
+ mock_server_uk = MagicMock()
+ mock_server_uk.id = "instance-uk"
+ mock_server_uk.name = "Instance UK"
+ mock_server_uk.status = "ACTIVE"
+ mock_server_uk.flavor = {"id": "flavor-1"}
+ mock_server_uk.security_groups = [{"name": "default"}]
+ mock_server_uk.is_locked = False
+ mock_server_uk.locked_reason = ""
+ mock_server_uk.key_name = ""
+ mock_server_uk.user_id = ""
+ mock_server_uk.access_ipv4 = ""
+ mock_server_uk.access_ipv6 = ""
+ mock_server_uk.public_v4 = ""
+ mock_server_uk.public_v6 = ""
+ mock_server_uk.private_v4 = "10.0.0.1"
+ mock_server_uk.private_v6 = ""
+ mock_server_uk.addresses = {"private": [{"version": 4, "addr": "10.0.0.1"}]}
+ mock_server_uk.has_config_drive = False
+ mock_server_uk.metadata = {}
+ mock_server_uk.user_data = ""
+ mock_server_uk.trusted_image_certificates = []
+
+ mock_server_de = MagicMock()
+ mock_server_de.id = "instance-de"
+ mock_server_de.name = "Instance DE"
+ mock_server_de.status = "ACTIVE"
+ mock_server_de.flavor = {"id": "flavor-2"}
+ mock_server_de.security_groups = [{"name": "default"}]
+ mock_server_de.is_locked = False
+ mock_server_de.locked_reason = ""
+ mock_server_de.key_name = ""
+ mock_server_de.user_id = ""
+ mock_server_de.access_ipv4 = ""
+ mock_server_de.access_ipv6 = ""
+ mock_server_de.public_v4 = ""
+ mock_server_de.public_v6 = ""
+ mock_server_de.private_v4 = "10.0.0.2"
+ mock_server_de.private_v6 = ""
+ mock_server_de.addresses = {"private": [{"version": 4, "addr": "10.0.0.2"}]}
+ mock_server_de.has_config_drive = False
+ mock_server_de.metadata = {}
+ mock_server_de.user_data = ""
+ mock_server_de.trusted_image_certificates = []
+
+ mock_conn_uk1.compute.servers.return_value = [mock_server_uk]
+ mock_conn_de1.compute.servers.return_value = [mock_server_de]
+
+ compute = Compute(provider)
+
+ assert len(compute.instances) == 2
+ # Verify instances have correct region tags
+ uk_instance = next(i for i in compute.instances if i.id == "instance-uk")
+ de_instance = next(i for i in compute.instances if i.id == "instance-de")
+ assert uk_instance.region == "UK1"
+ assert de_instance.region == "DE1"
+
+ def test_compute_list_instances_multi_region_partial_failure(self):
+ """Test that a failing region doesn't prevent other regions from being listed."""
+ provider = set_mocked_openstack_provider()
+
+ mock_conn_ok = MagicMock()
+ mock_conn_fail = MagicMock()
+
+ provider.regional_connections = {"UK1": mock_conn_ok, "DE1": mock_conn_fail}
+
+ mock_server = MagicMock()
+ mock_server.id = "instance-uk"
+ mock_server.name = "Instance UK"
+ mock_server.status = "ACTIVE"
+ mock_server.flavor = {"id": "flavor-1"}
+ mock_server.security_groups = [{"name": "default"}]
+ mock_server.is_locked = False
+ mock_server.locked_reason = ""
+ mock_server.key_name = ""
+ mock_server.user_id = ""
+ mock_server.access_ipv4 = ""
+ mock_server.access_ipv6 = ""
+ mock_server.public_v4 = ""
+ mock_server.public_v6 = ""
+ mock_server.private_v4 = "10.0.0.1"
+ mock_server.private_v6 = ""
+ mock_server.addresses = {}
+ mock_server.has_config_drive = False
+ mock_server.metadata = {}
+ mock_server.user_data = ""
+ mock_server.trusted_image_certificates = []
+
+ mock_conn_ok.compute.servers.return_value = [mock_server]
+ mock_conn_fail.compute.servers.side_effect = openstack_exceptions.SDKException(
+ "API error in DE1"
+ )
+
+ compute = Compute(provider)
+
+ # Should have the instance from UK1, DE1 failure is logged but doesn't crash
+ assert len(compute.instances) == 1
+ assert compute.instances[0].id == "instance-uk"
+ assert compute.instances[0].region == "UK1"
+
+ def test_compute_list_instances_multi_region_one_empty(self):
+ """Test multi-region where one region has instances and the other is empty."""
+ provider = set_mocked_openstack_provider()
+
+ mock_conn_uk1 = MagicMock()
+ mock_conn_de1 = MagicMock()
+
+ provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
+
+ mock_server = MagicMock()
+ mock_server.id = "instance-uk"
+ mock_server.name = "Instance UK"
+ mock_server.status = "ACTIVE"
+ mock_server.flavor = {"id": "flavor-1"}
+ mock_server.security_groups = [{"name": "default"}]
+ mock_server.is_locked = False
+ mock_server.locked_reason = ""
+ mock_server.key_name = ""
+ mock_server.user_id = ""
+ mock_server.access_ipv4 = ""
+ mock_server.access_ipv6 = ""
+ mock_server.public_v4 = ""
+ mock_server.public_v6 = ""
+ mock_server.private_v4 = "10.0.0.1"
+ mock_server.private_v6 = ""
+ mock_server.addresses = {}
+ mock_server.has_config_drive = False
+ mock_server.metadata = {}
+ mock_server.user_data = ""
+ mock_server.trusted_image_certificates = []
+
+ mock_conn_uk1.compute.servers.return_value = [mock_server]
+ mock_conn_de1.compute.servers.return_value = [] # Empty region
+
+ compute = Compute(provider)
+
+ assert len(compute.instances) == 1
+ assert compute.instances[0].id == "instance-uk"
+ assert compute.instances[0].region == "UK1"