mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
feat(oraclecloud): support regionless SDK setup
This commit is contained in:
@@ -126,6 +126,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- A broken built-in provider no longer aborts the CLI when a different provider was invoked [(#11618)](https://github.com/prowler-cloud/prowler/pull/11618)
|
||||
- GCP organization scans with `--organization-id` no longer silently fall back to the credentials' host project when the Cloud Asset API call fails [(#11280)](https://github.com/prowler-cloud/prowler/pull/11280)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
- Oracle Cloud API key authentication now uses an internal bootstrap region when no explicit scan region filter is provided [(#11565)](https://github.com/prowler-cloud/prowler/pull/11565)
|
||||
|
||||
---
|
||||
|
||||
## [5.30.0] (Prowler v5.30.0)
|
||||
|
||||
@@ -68,12 +68,13 @@ class OraclecloudProvider(Provider):
|
||||
_mutelist: OCIMutelist
|
||||
audit_metadata: Audit_Metadata
|
||||
_home_region: str = "us-ashburn-1"
|
||||
_bootstrap_region: str = "us-ashburn-1"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
oci_config_file: str = None,
|
||||
profile: str = None,
|
||||
region: set = set(),
|
||||
region: set = None,
|
||||
compartment_ids: list = None,
|
||||
config_path: str = None,
|
||||
config_content: dict = None,
|
||||
@@ -131,8 +132,11 @@ class OraclecloudProvider(Provider):
|
||||
|
||||
# Check if the configuration is scanning a single region
|
||||
single_region = None
|
||||
if region:
|
||||
single_region = list(region)[0] if len(region) == 1 else None
|
||||
if isinstance(region, str):
|
||||
single_region = region
|
||||
elif region:
|
||||
single_region = sorted(region)[0]
|
||||
bootstrap_region = single_region or self._bootstrap_region
|
||||
|
||||
# Setup OCI Session
|
||||
logger.info("Setting up OCI session ...")
|
||||
@@ -145,7 +149,7 @@ class OraclecloudProvider(Provider):
|
||||
key_file=key_file,
|
||||
key_content=key_content,
|
||||
tenancy=tenancy,
|
||||
region=single_region,
|
||||
region=bootstrap_region,
|
||||
pass_phrase=pass_phrase,
|
||||
)
|
||||
|
||||
@@ -155,7 +159,7 @@ class OraclecloudProvider(Provider):
|
||||
logger.info("Validating OCI credentials ...")
|
||||
self._identity = self.set_identity(
|
||||
session=self._session,
|
||||
region=single_region,
|
||||
region=bootstrap_region,
|
||||
compartment_ids=compartment_ids,
|
||||
)
|
||||
logger.info("OCI credentials validated")
|
||||
@@ -165,7 +169,13 @@ class OraclecloudProvider(Provider):
|
||||
# Determine the tenancy home region from the full subscription list, independent of
|
||||
# the --region filter, so tenancy-level APIs (e.g. the Audit configuration) always
|
||||
# target the home region instead of a filtered, non-home region.
|
||||
all_subscribed_regions = self.get_regions_to_audit()
|
||||
try:
|
||||
all_subscribed_regions = self.get_regions_to_audit()
|
||||
except OCISetUpSessionError:
|
||||
if single_region and len(self._regions) == 1:
|
||||
all_subscribed_regions = self._regions
|
||||
else:
|
||||
raise
|
||||
self._home_region = next(
|
||||
(region.key for region in all_subscribed_regions if region.is_home_region),
|
||||
self._regions[0].key if self._regions else "us-ashburn-1",
|
||||
@@ -284,7 +294,7 @@ class OraclecloudProvider(Provider):
|
||||
signer = None
|
||||
|
||||
# If API key credentials are provided directly, create config from them
|
||||
if user and fingerprint and tenancy and region:
|
||||
if user and fingerprint and tenancy:
|
||||
import base64
|
||||
|
||||
logger.info("Using API key credentials from direct parameters")
|
||||
@@ -294,7 +304,7 @@ class OraclecloudProvider(Provider):
|
||||
"user": user,
|
||||
"fingerprint": fingerprint,
|
||||
"tenancy": tenancy,
|
||||
"region": region,
|
||||
"region": region or OraclecloudProvider._bootstrap_region,
|
||||
}
|
||||
|
||||
# Handle private key
|
||||
@@ -565,6 +575,12 @@ class OraclecloudProvider(Provider):
|
||||
"""
|
||||
regions = []
|
||||
|
||||
explicit_regions = None
|
||||
if isinstance(region_set, str):
|
||||
explicit_regions = {region_set}
|
||||
elif region_set:
|
||||
explicit_regions = set(region_set)
|
||||
|
||||
# Audit all subscribed regions
|
||||
try:
|
||||
# Create identity client with proper authentication handling
|
||||
@@ -581,11 +597,9 @@ class OraclecloudProvider(Provider):
|
||||
).data
|
||||
|
||||
# Check if auditing specific region or all
|
||||
regions_check = (
|
||||
region_set
|
||||
if region_set
|
||||
else [sub.region_name for sub in region_subscriptions]
|
||||
)
|
||||
regions_check = explicit_regions or [
|
||||
sub.region_name for sub in region_subscriptions
|
||||
]
|
||||
|
||||
for region_sub in region_subscriptions:
|
||||
if region_sub.region_name in regions_check:
|
||||
@@ -600,11 +614,21 @@ class OraclecloudProvider(Provider):
|
||||
)
|
||||
logger.info(f"Found {len(regions)} subscribed regions")
|
||||
except Exception as error:
|
||||
if not explicit_regions or len(explicit_regions) != 1:
|
||||
raise OCISetUpSessionError(
|
||||
original_exception=error,
|
||||
message=(
|
||||
"Could not retrieve OCI subscribed regions. "
|
||||
"Configure an explicit region to preserve legacy single-region scans, "
|
||||
"or fix the credentials/permissions required to list region subscriptions."
|
||||
),
|
||||
) from error
|
||||
|
||||
config_region = next(iter(explicit_regions))
|
||||
logger.warning(
|
||||
f"Could not retrieve region subscriptions: {error}. Using configured region."
|
||||
f"Could not retrieve region subscriptions: {error}. "
|
||||
f"Using explicitly configured region {config_region}."
|
||||
)
|
||||
# Fallback to configured region
|
||||
config_region = self._session.config.get("region", "us-ashburn-1")
|
||||
regions.append(
|
||||
OCIRegion(
|
||||
key=config_region,
|
||||
@@ -855,7 +879,7 @@ class OraclecloudProvider(Provider):
|
||||
session = None
|
||||
|
||||
# If API key credentials are provided directly, create config from them
|
||||
if user and fingerprint and tenancy and region:
|
||||
if user and fingerprint and tenancy:
|
||||
import base64
|
||||
|
||||
logger.info("Using API key credentials from direct parameters")
|
||||
@@ -865,7 +889,7 @@ class OraclecloudProvider(Provider):
|
||||
"user": user,
|
||||
"fingerprint": fingerprint,
|
||||
"tenancy": tenancy,
|
||||
"region": region,
|
||||
"region": region or OraclecloudProvider._bootstrap_region,
|
||||
}
|
||||
|
||||
# Handle private key
|
||||
@@ -914,7 +938,7 @@ class OraclecloudProvider(Provider):
|
||||
|
||||
identity = OraclecloudProvider.set_identity(
|
||||
session=session,
|
||||
region=region,
|
||||
region=region or OraclecloudProvider._bootstrap_region,
|
||||
)
|
||||
|
||||
# Validate provider_id if provided
|
||||
|
||||
@@ -5,6 +5,7 @@ import pytest
|
||||
from prowler.providers.oraclecloud.exceptions.exceptions import (
|
||||
OCIAuthenticationError,
|
||||
OCIInvalidConfigError,
|
||||
OCISetUpSessionError,
|
||||
)
|
||||
from prowler.providers.oraclecloud.models import OCIIdentityInfo, OCIRegion, OCISession
|
||||
from prowler.providers.oraclecloud.oraclecloud_provider import OraclecloudProvider
|
||||
@@ -200,6 +201,41 @@ MIIEpQIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8n0sMcD/QHWCJ7yGSEtLN2T
|
||||
|
||||
assert connection.is_connected is True
|
||||
|
||||
def test_test_connection_direct_credentials_without_region_uses_bootstrap_region(
|
||||
self,
|
||||
):
|
||||
"""Direct API key auth should not fall back to config-file auth without a region."""
|
||||
import base64
|
||||
|
||||
valid_key = (
|
||||
"-----BEGIN RSA PRIVATE KEY-----\nfake\n-----END RSA PRIVATE KEY-----"
|
||||
)
|
||||
encoded_key = base64.b64encode(valid_key.encode("utf-8")).decode("utf-8")
|
||||
|
||||
with (
|
||||
patch("oci.config.validate_config") as mock_validate_config,
|
||||
patch("oci.identity.IdentityClient") as mock_identity_client,
|
||||
):
|
||||
mock_tenancy = MagicMock()
|
||||
mock_tenancy.name = "test-tenancy"
|
||||
mock_response = MagicMock()
|
||||
mock_response.data = mock_tenancy
|
||||
mock_client_instance = MagicMock()
|
||||
mock_client_instance.get_tenancy.return_value = mock_response
|
||||
mock_identity_client.return_value = mock_client_instance
|
||||
|
||||
connection = OraclecloudProvider.test_connection(
|
||||
key_content=encoded_key,
|
||||
user="ocid1.user.oc1..aaaaaaaexample",
|
||||
fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
|
||||
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
provider_id="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
raise_on_exception=False,
|
||||
)
|
||||
|
||||
assert connection.is_connected is True
|
||||
assert mock_validate_config.call_args.args[0]["region"] == "us-ashburn-1"
|
||||
|
||||
|
||||
class TestOraclecloudProviderInit:
|
||||
"""Tests for OraclecloudProvider initialization"""
|
||||
@@ -256,6 +292,159 @@ class TestOraclecloudProviderInit:
|
||||
assert provider.home_region == "us-ashburn-1"
|
||||
mock_set_global.assert_called_once_with(provider)
|
||||
|
||||
def test_init_with_multiple_regions_uses_deterministic_session_region(self):
|
||||
mock_session = OCISession(
|
||||
config={"region": "us-ashburn-1"}, signer=None, profile="DEFAULT"
|
||||
)
|
||||
mock_identity = OCIIdentityInfo(
|
||||
tenancy_id="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
tenancy_name="test-tenancy",
|
||||
user_id="ocid1.user.oc1..aaaaaaaexample",
|
||||
region="us-ashburn-1",
|
||||
profile="DEFAULT",
|
||||
audited_regions=set(),
|
||||
audited_compartments=[],
|
||||
)
|
||||
audited_regions = [
|
||||
OCIRegion(key="us-ashburn-1", name="us-ashburn-1", is_home_region=True),
|
||||
OCIRegion(key="us-phoenix-1", name="us-phoenix-1", is_home_region=False),
|
||||
]
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.setup_session",
|
||||
return_value=mock_session,
|
||||
) as mock_setup_session,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.set_identity",
|
||||
return_value=mock_identity,
|
||||
) as mock_set_identity,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_regions_to_audit",
|
||||
return_value=audited_regions,
|
||||
) as mock_get_regions_to_audit,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_compartments_to_audit",
|
||||
return_value=["ocid1.compartment.oc1..aaaaaaaexample"],
|
||||
),
|
||||
patch("prowler.providers.common.provider.Provider.set_global_provider"),
|
||||
):
|
||||
provider = OraclecloudProvider(
|
||||
region={"us-phoenix-1", "us-ashburn-1"},
|
||||
user="ocid1.user.oc1..aaaaaaaexample",
|
||||
fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
|
||||
key_content="fake-base64-key-content",
|
||||
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
config_content={"dummy": True},
|
||||
mutelist_content={"Accounts": {}},
|
||||
)
|
||||
|
||||
assert mock_setup_session.call_args.kwargs["region"] == "us-ashburn-1"
|
||||
assert mock_set_identity.call_args.kwargs["region"] == "us-ashburn-1"
|
||||
assert mock_get_regions_to_audit.call_args_list[0].args == (
|
||||
{"us-phoenix-1", "us-ashburn-1"},
|
||||
)
|
||||
assert provider.regions == audited_regions
|
||||
|
||||
def test_init_with_legacy_region_string_uses_full_region_for_identity(self):
|
||||
mock_session = OCISession(
|
||||
config={"region": "us-ashburn-1"}, signer=None, profile="DEFAULT"
|
||||
)
|
||||
mock_identity = OCIIdentityInfo(
|
||||
tenancy_id="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
tenancy_name="test-tenancy",
|
||||
user_id="ocid1.user.oc1..aaaaaaaexample",
|
||||
region="us-ashburn-1",
|
||||
profile="DEFAULT",
|
||||
audited_regions=set(),
|
||||
audited_compartments=[],
|
||||
)
|
||||
audited_regions = [
|
||||
OCIRegion(key="us-ashburn-1", name="us-ashburn-1", is_home_region=True),
|
||||
]
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.setup_session",
|
||||
return_value=mock_session,
|
||||
) as mock_setup_session,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.set_identity",
|
||||
return_value=mock_identity,
|
||||
) as mock_set_identity,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_regions_to_audit",
|
||||
return_value=audited_regions,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_compartments_to_audit",
|
||||
return_value=["ocid1.compartment.oc1..aaaaaaaexample"],
|
||||
),
|
||||
patch("prowler.providers.common.provider.Provider.set_global_provider"),
|
||||
):
|
||||
OraclecloudProvider(
|
||||
region="us-ashburn-1",
|
||||
user="ocid1.user.oc1..aaaaaaaexample",
|
||||
fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
|
||||
key_content="fake-base64-key-content",
|
||||
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
config_content={"dummy": True},
|
||||
mutelist_content={"Accounts": {}},
|
||||
)
|
||||
|
||||
assert mock_setup_session.call_args.kwargs["region"] == "us-ashburn-1"
|
||||
assert mock_set_identity.call_args.kwargs["region"] == "us-ashburn-1"
|
||||
|
||||
def test_init_without_region_uses_bootstrap_region_without_scan_filter(self):
|
||||
mock_session = OCISession(
|
||||
config={"region": "us-ashburn-1"}, signer=None, profile="DEFAULT"
|
||||
)
|
||||
mock_identity = OCIIdentityInfo(
|
||||
tenancy_id="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
tenancy_name="test-tenancy",
|
||||
user_id="ocid1.user.oc1..aaaaaaaexample",
|
||||
region="us-ashburn-1",
|
||||
profile="DEFAULT",
|
||||
audited_regions=set(),
|
||||
audited_compartments=[],
|
||||
)
|
||||
all_subscribed_regions = [
|
||||
OCIRegion(key="us-ashburn-1", name="us-ashburn-1", is_home_region=True),
|
||||
OCIRegion(key="us-phoenix-1", name="us-phoenix-1", is_home_region=False),
|
||||
]
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.setup_session",
|
||||
return_value=mock_session,
|
||||
) as mock_setup_session,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.set_identity",
|
||||
return_value=mock_identity,
|
||||
) as mock_set_identity,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_regions_to_audit",
|
||||
return_value=all_subscribed_regions,
|
||||
) as mock_get_regions_to_audit,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_compartments_to_audit",
|
||||
return_value=["ocid1.compartment.oc1..aaaaaaaexample"],
|
||||
),
|
||||
patch("prowler.providers.common.provider.Provider.set_global_provider"),
|
||||
):
|
||||
provider = OraclecloudProvider(
|
||||
user="ocid1.user.oc1..aaaaaaaexample",
|
||||
fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
|
||||
key_content="fake-base64-key-content",
|
||||
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
config_content={"dummy": True},
|
||||
mutelist_content={"Accounts": {}},
|
||||
)
|
||||
|
||||
assert mock_setup_session.call_args.kwargs["region"] == "us-ashburn-1"
|
||||
assert mock_set_identity.call_args.kwargs["region"] == "us-ashburn-1"
|
||||
assert mock_get_regions_to_audit.call_args_list[0].args == (None,)
|
||||
assert provider.regions == all_subscribed_regions
|
||||
|
||||
def test_home_region_uses_full_subscription_list_not_region_filter(self):
|
||||
"""Home region must come from the full subscription list, not the --region filter.
|
||||
|
||||
@@ -313,3 +502,105 @@ class TestOraclecloudProviderInit:
|
||||
|
||||
assert provider.regions == audited_regions
|
||||
assert provider.home_region == "us-ashburn-1"
|
||||
|
||||
def test_init_with_legacy_single_region_preserves_fallback_for_home_region(self):
|
||||
mock_session = OCISession(
|
||||
config={"region": "us-phoenix-1"}, signer=None, profile="DEFAULT"
|
||||
)
|
||||
mock_identity = OCIIdentityInfo(
|
||||
tenancy_id="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
tenancy_name="test-tenancy",
|
||||
user_id="ocid1.user.oc1..aaaaaaaexample",
|
||||
region="us-phoenix-1",
|
||||
profile="DEFAULT",
|
||||
audited_regions=set(),
|
||||
audited_compartments=[],
|
||||
)
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.setup_session",
|
||||
return_value=mock_session,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.set_identity",
|
||||
return_value=mock_identity,
|
||||
),
|
||||
patch("oci.identity.IdentityClient") as mock_identity_client,
|
||||
patch(
|
||||
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_compartments_to_audit",
|
||||
return_value=["ocid1.compartment.oc1..aaaaaaaexample"],
|
||||
),
|
||||
patch("prowler.providers.common.provider.Provider.set_global_provider"),
|
||||
):
|
||||
mock_identity_client.return_value.list_region_subscriptions.side_effect = (
|
||||
Exception("discovery failed")
|
||||
)
|
||||
|
||||
provider = OraclecloudProvider(
|
||||
region="us-phoenix-1",
|
||||
config_content={"dummy": True},
|
||||
mutelist_content={"Accounts": {}},
|
||||
)
|
||||
|
||||
assert [region.key for region in provider.regions] == ["us-phoenix-1"]
|
||||
assert provider.home_region == "us-phoenix-1"
|
||||
|
||||
|
||||
class TestGetRegionsToAudit:
|
||||
def _provider_with_identity(self):
|
||||
provider = OraclecloudProvider.__new__(OraclecloudProvider)
|
||||
provider._session = OCISession(
|
||||
config={"region": "us-ashburn-1"}, signer=None, profile="DEFAULT"
|
||||
)
|
||||
provider._identity = OCIIdentityInfo(
|
||||
tenancy_id="ocid1.tenancy.oc1..aaaaaaaexample",
|
||||
tenancy_name="test-tenancy",
|
||||
user_id="ocid1.user.oc1..aaaaaaaexample",
|
||||
region="us-ashburn-1",
|
||||
profile="DEFAULT",
|
||||
audited_regions=set(),
|
||||
audited_compartments=[],
|
||||
)
|
||||
return provider
|
||||
|
||||
def test_regionless_scan_raises_when_region_subscription_discovery_fails(self):
|
||||
provider = self._provider_with_identity()
|
||||
|
||||
with patch("oci.identity.IdentityClient") as mock_identity_client:
|
||||
mock_identity_client.return_value.list_region_subscriptions.side_effect = (
|
||||
Exception("discovery failed")
|
||||
)
|
||||
|
||||
with pytest.raises(OCISetUpSessionError) as exc_info:
|
||||
provider.get_regions_to_audit()
|
||||
|
||||
assert "Could not retrieve OCI subscribed regions" in str(exc_info.value)
|
||||
|
||||
def test_single_explicit_region_falls_back_when_region_subscription_discovery_fails(
|
||||
self,
|
||||
):
|
||||
provider = self._provider_with_identity()
|
||||
|
||||
with patch("oci.identity.IdentityClient") as mock_identity_client:
|
||||
mock_identity_client.return_value.list_region_subscriptions.side_effect = (
|
||||
Exception("discovery failed")
|
||||
)
|
||||
|
||||
regions = provider.get_regions_to_audit("us-phoenix-1")
|
||||
|
||||
assert len(regions) == 1
|
||||
assert regions[0].key == "us-phoenix-1"
|
||||
|
||||
def test_multiple_explicit_regions_raise_when_region_subscription_discovery_fails(
|
||||
self,
|
||||
):
|
||||
provider = self._provider_with_identity()
|
||||
|
||||
with patch("oci.identity.IdentityClient") as mock_identity_client:
|
||||
mock_identity_client.return_value.list_region_subscriptions.side_effect = (
|
||||
Exception("discovery failed")
|
||||
)
|
||||
|
||||
with pytest.raises(OCISetUpSessionError):
|
||||
provider.get_regions_to_audit({"us-ashburn-1", "us-phoenix-1"})
|
||||
|
||||
Reference in New Issue
Block a user