fix(oci): validate credentials before scanning (#9738)

This commit is contained in:
Andoni Alonso
2026-01-08 15:47:26 +01:00
committed by GitHub
parent 795220e290
commit 27492cbd42
5 changed files with 292 additions and 83 deletions

View File

@@ -164,7 +164,7 @@ prowler oci --profile PRODUCTION
Use a config file from a custom location:
```bash
prowler oci --config-file /path/to/custom/config
prowler oci --oci-config-file /path/to/custom/config
```
### Setting Up API Keys
@@ -377,7 +377,7 @@ ls -la ~/.oci/config
mkdir -p ~/.oci
# Specify custom location
prowler oci --config-file /path/to/config
prowler oci --oci-config-file /path/to/config
```
#### Error: "InvalidKeyOrSignature"

View File

@@ -122,7 +122,7 @@ prowler oci --profile production
##### Using a Custom Config File
```bash
prowler oci --config-file /path/to/custom/config
prowler oci --oci-config-file /path/to/custom/config
```
#### Instance Principal Authentication

View File

@@ -42,6 +42,13 @@ All notable changes to the **Prowler SDK** are documented in this file.
---
## [5.16.2] (Prowler v5.16.2) (UNRELEASED)
### Fixed
- Fix OCI authentication error handling and validation [(#9738)](https://github.com/prowler-cloud/prowler/pull/9738)
---
## [5.16.1] (Prowler v5.16.1)
### Fixed

View File

@@ -266,7 +266,6 @@ class OraclecloudProvider(Provider):
# If API key credentials are provided directly, create config from them
if user and fingerprint and tenancy and region:
import base64
import tempfile
logger.info("Using API key credentials from direct parameters")
@@ -280,21 +279,19 @@ class OraclecloudProvider(Provider):
# Handle private key
if key_content:
# Decode base64 key content and write to temp file
# Decode base64 key content
try:
key_data = base64.b64decode(key_content)
temp_key_file = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".pem"
)
temp_key_file.write(key_data)
temp_key_file.close()
config["key_file"] = temp_key_file.name
decoded_key = key_data.decode("utf-8")
except Exception as decode_error:
logger.error(f"Failed to decode key_content: {decode_error}")
raise OCIInvalidConfigError(
file=pathlib.Path(__file__).name,
message="Failed to decode key_content. Ensure it is base64 encoded.",
)
# Use OCI SDK's native key_content support
config["key_content"] = decoded_key
elif key_file:
config["key_file"] = os.path.expanduser(key_file)
else:
@@ -428,78 +425,85 @@ class OraclecloudProvider(Provider):
Raises:
- OCIAuthenticationError: If authentication fails.
"""
try:
# Get tenancy from config
tenancy_id = session.config.get("tenancy")
# Get tenancy from config
tenancy_id = session.config.get("tenancy")
if not tenancy_id:
raise OCINoCredentialsError(
file=pathlib.Path(__file__).name,
message="Tenancy ID not found in configuration",
)
# Validate tenancy OCID format
if not OraclecloudProvider.validate_ocid(tenancy_id, "tenancy"):
raise OCIInvalidTenancyError(
file=pathlib.Path(__file__).name,
message=f"Invalid tenancy OCID format: {tenancy_id}",
)
# Get user from config (not available in instance principal)
user_id = session.config.get("user", "instance-principal")
# Get region from config or use provided region
if not region:
region = session.config.get("region", "us-ashburn-1")
# Validate region
if region not in OCI_REGIONS:
raise OCIInvalidRegionError(
file=pathlib.Path(__file__).name,
message=f"Invalid region: {region}",
)
# Get tenancy name using Identity service
tenancy_name = "unknown"
try:
# Create identity client with proper authentication handling
if session.signer:
identity_client = oci.identity.IdentityClient(
config=session.config, signer=session.signer
)
else:
identity_client = oci.identity.IdentityClient(config=session.config)
tenancy = identity_client.get_tenancy(tenancy_id).data
tenancy_name = tenancy.name
logger.info(f"Tenancy Name: {tenancy_name}")
except Exception as error:
logger.warning(
f"Could not retrieve tenancy name: {error}. Using 'unknown'"
)
logger.info(f"OCI Tenancy ID: {tenancy_id}")
logger.info(f"OCI User ID: {user_id}")
logger.info(f"OCI Region: {region}")
return OCIIdentityInfo(
tenancy_id=tenancy_id,
tenancy_name=tenancy_name,
user_id=user_id,
region=region,
profile=session.profile,
audited_regions=set([region]) if region else set(),
audited_compartments=compartment_ids if compartment_ids else [],
if not tenancy_id:
raise OCINoCredentialsError(
file=pathlib.Path(__file__).name,
message="Tenancy ID not found in configuration",
)
except Exception as error:
# Validate tenancy OCID format
if not OraclecloudProvider.validate_ocid(tenancy_id, "tenancy"):
raise OCIInvalidTenancyError(
file=pathlib.Path(__file__).name,
message=f"Invalid tenancy OCID format: {tenancy_id}",
)
# Get user from config (not available in instance principal)
user_id = session.config.get("user", "instance-principal")
# Get region from config or use provided region
if not region:
region = session.config.get("region", "us-ashburn-1")
# Validate region
if region not in OCI_REGIONS:
raise OCIInvalidRegionError(
file=pathlib.Path(__file__).name,
message=f"Invalid region: {region}",
)
# Validate credentials by calling OCI Identity service
try:
if session.signer:
identity_client = oci.identity.IdentityClient(
config=session.config, signer=session.signer
)
else:
identity_client = oci.identity.IdentityClient(config=session.config)
tenancy = identity_client.get_tenancy(tenancy_id).data
tenancy_name = tenancy.name
logger.info(f"Tenancy Name: {tenancy_name}")
except oci.exceptions.ServiceError as error:
logger.critical(
f"OCIAuthenticationError[{error.__traceback__.tb_lineno}]: {error}"
f"OCI credential validation failed (HTTP {error.status}): {error.message}"
)
raise OCIAuthenticationError(
original_exception=error,
file=pathlib.Path(__file__).name,
message=f"OCI credential validation failed: {error.message}. Please verify your credentials and try again.",
original_exception=error,
)
except oci.exceptions.InvalidPrivateKey as error:
logger.critical(f"Invalid OCI private key: {error}")
raise OCIAuthenticationError(
file=pathlib.Path(__file__).name,
message="Invalid OCI private key format. Ensure the key is a valid PEM-encoded private key.",
original_exception=error,
)
except Exception as error:
logger.critical(f"OCI authentication error: {error}")
raise OCIAuthenticationError(
file=pathlib.Path(__file__).name,
message=f"Failed to authenticate with OCI: {error}",
original_exception=error,
)
logger.info(f"OCI Tenancy ID: {tenancy_id}")
logger.info(f"OCI User ID: {user_id}")
logger.info(f"OCI Region: {region}")
return OCIIdentityInfo(
tenancy_id=tenancy_id,
tenancy_name=tenancy_name,
user_id=user_id,
region=region,
profile=session.profile,
audited_regions=set([region]) if region else set(),
audited_compartments=compartment_ids if compartment_ids else [],
)
@staticmethod
def validate_ocid(ocid: str, resource_type: str = None) -> bool:
@@ -838,7 +842,6 @@ class OraclecloudProvider(Provider):
# If API key credentials are provided directly, create config from them
if user and fingerprint and tenancy and region:
import base64
import tempfile
logger.info("Using API key credentials from direct parameters")
@@ -852,21 +855,19 @@ class OraclecloudProvider(Provider):
# Handle private key
if key_content:
# Decode base64 key content and write to temp file
# Decode base64 key content
try:
key_data = base64.b64decode(key_content)
temp_key_file = tempfile.NamedTemporaryFile(
mode="wb", delete=False, suffix=".pem"
)
temp_key_file.write(key_data)
temp_key_file.close()
config["key_file"] = temp_key_file.name
decoded_key = key_data.decode("utf-8")
except Exception as decode_error:
logger.error(f"Failed to decode key_content: {decode_error}")
raise OCIInvalidConfigError(
file=pathlib.Path(__file__).name,
message="Failed to decode key_content. Ensure it is base64 encoded.",
)
# Use OCI SDK's native key_content support
config["key_content"] = decoded_key
elif key_file:
config["key_file"] = os.path.expanduser(key_file)
else:

View File

@@ -0,0 +1,201 @@
from unittest.mock import MagicMock, patch
import pytest
from prowler.providers.oraclecloud.exceptions.exceptions import (
OCIAuthenticationError,
OCIInvalidConfigError,
)
from prowler.providers.oraclecloud.models import OCISession
from prowler.providers.oraclecloud.oraclecloud_provider import OraclecloudProvider
class TestSetIdentityAuthenticationErrors:
"""Tests for authentication error handling in set_identity()"""
@pytest.fixture
def mock_session(self):
"""Create a mock OCI session."""
session = OCISession(
config={
"tenancy": "ocid1.tenancy.oc1..aaaaaaaexample",
"user": "ocid1.user.oc1..aaaaaaaexample",
"region": "us-ashburn-1",
"fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
},
signer=None,
profile="DEFAULT",
)
return session
def test_authentication_error_401_raises_exception(self, mock_session):
"""Test 401 error raises OCIAuthenticationError."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.side_effect = self._create_service_error(
401, "Authentication failed"
)
mock_identity_client.return_value = mock_client_instance
with pytest.raises(OCIAuthenticationError) as exc_info:
OraclecloudProvider.set_identity(mock_session)
assert "OCI credential validation failed" in str(exc_info.value)
def test_authentication_error_403_raises_exception(self, mock_session):
"""Test 403 error raises OCIAuthenticationError."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.side_effect = self._create_service_error(
403, "Forbidden access"
)
mock_identity_client.return_value = mock_client_instance
with pytest.raises(OCIAuthenticationError) as exc_info:
OraclecloudProvider.set_identity(mock_session)
assert "OCI credential validation failed" in str(exc_info.value)
def test_authentication_error_404_raises_exception(self, mock_session):
"""Test 404 error raises OCIAuthenticationError."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.side_effect = self._create_service_error(
404, "Resource not found"
)
mock_identity_client.return_value = mock_client_instance
with pytest.raises(OCIAuthenticationError) as exc_info:
OraclecloudProvider.set_identity(mock_session)
assert "OCI credential validation failed" in str(exc_info.value)
def test_service_error_500_raises_exception(self, mock_session):
"""Test 500 error raises OCIAuthenticationError (can't validate credentials)."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.side_effect = self._create_service_error(
500, "Internal server error"
)
mock_identity_client.return_value = mock_client_instance
with pytest.raises(OCIAuthenticationError) as exc_info:
OraclecloudProvider.set_identity(mock_session)
assert "OCI credential validation failed" in str(exc_info.value)
def test_invalid_private_key_raises_exception(self, mock_session):
"""Test InvalidPrivateKey exception raises OCIAuthenticationError."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
import oci
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.side_effect = (
oci.exceptions.InvalidPrivateKey("Invalid private key")
)
mock_identity_client.return_value = mock_client_instance
with pytest.raises(OCIAuthenticationError) as exc_info:
OraclecloudProvider.set_identity(mock_session)
assert "Invalid OCI private key format" in str(exc_info.value)
def test_generic_exception_raises_authentication_error(self, mock_session):
"""Test generic exception raises OCIAuthenticationError."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.side_effect = Exception("Unexpected error")
mock_identity_client.return_value = mock_client_instance
with pytest.raises(OCIAuthenticationError) as exc_info:
OraclecloudProvider.set_identity(mock_session)
assert "Failed to authenticate with OCI" in str(exc_info.value)
def test_successful_authentication(self, mock_session):
"""Test successful authentication returns identity info."""
with patch("oci.identity.IdentityClient") as mock_identity_client:
mock_tenancy = MagicMock()
mock_tenancy.name = "test-tenancy"
mock_response = MagicMock()
mock_response.data = mock_tenancy
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.return_value = mock_response
mock_identity_client.return_value = mock_client_instance
identity = OraclecloudProvider.set_identity(mock_session)
assert identity.tenancy_name == "test-tenancy"
assert identity.tenancy_id == "ocid1.tenancy.oc1..aaaaaaaexample"
assert identity.user_id == "ocid1.user.oc1..aaaaaaaexample"
assert identity.region == "us-ashburn-1"
@staticmethod
def _create_service_error(status, message):
"""Helper to create an OCI ServiceError."""
import oci
error = oci.exceptions.ServiceError(
status=status,
code="TestError",
headers={},
message=message,
)
return error
class TestTestConnectionKeyValidation:
"""Tests for key_content validation in test_connection()"""
def test_test_connection_invalid_base64_key_raises_error(self):
"""Test invalid base64 key content raises OCIInvalidConfigError."""
with pytest.raises(OCIInvalidConfigError) as exc_info:
OraclecloudProvider.test_connection(
oci_config_file=None,
profile=None,
key_content="not-valid-base64!!!",
user="ocid1.user.oc1..aaaaaaaexample",
fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
region="us-ashburn-1",
)
assert "Failed to decode key_content" in str(exc_info.value)
def test_test_connection_valid_key_content_proceeds(self):
"""Test valid base64 key content proceeds to authentication."""
import base64
# The SDK will validate the actual key format during authentication
valid_key = """-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8n0sMcD/QHWCJ7yGSEtLN2T
...key content...
-----END RSA PRIVATE KEY-----"""
encoded_key = base64.b64encode(valid_key.encode("utf-8")).decode("utf-8")
with (
patch("oci.config.validate_config"),
patch("oci.identity.IdentityClient") as mock_identity_client,
):
mock_tenancy = MagicMock()
mock_tenancy.name = "test-tenancy"
mock_response = MagicMock()
mock_response.data = mock_tenancy
mock_client_instance = MagicMock()
mock_client_instance.get_tenancy.return_value = mock_response
mock_identity_client.return_value = mock_client_instance
connection = OraclecloudProvider.test_connection(
oci_config_file=None,
profile=None,
key_content=encoded_key,
user="ocid1.user.oc1..aaaaaaaexample",
fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99",
tenancy="ocid1.tenancy.oc1..aaaaaaaexample",
region="us-ashburn-1",
raise_on_exception=False,
)
assert connection.is_connected is True