diff --git a/docs/user-guide/providers/oci/authentication.mdx b/docs/user-guide/providers/oci/authentication.mdx index 2ae73fb3af..7f0c493e76 100644 --- a/docs/user-guide/providers/oci/authentication.mdx +++ b/docs/user-guide/providers/oci/authentication.mdx @@ -164,7 +164,7 @@ prowler oci --profile PRODUCTION Use a config file from a custom location: ```bash -prowler oci --config-file /path/to/custom/config +prowler oci --oci-config-file /path/to/custom/config ``` ### Setting Up API Keys @@ -377,7 +377,7 @@ ls -la ~/.oci/config mkdir -p ~/.oci # Specify custom location -prowler oci --config-file /path/to/config +prowler oci --oci-config-file /path/to/config ``` #### Error: "InvalidKeyOrSignature" diff --git a/docs/user-guide/providers/oci/getting-started-oci.mdx b/docs/user-guide/providers/oci/getting-started-oci.mdx index 80c008480d..8affa2f992 100644 --- a/docs/user-guide/providers/oci/getting-started-oci.mdx +++ b/docs/user-guide/providers/oci/getting-started-oci.mdx @@ -122,7 +122,7 @@ prowler oci --profile production ##### Using a Custom Config File ```bash -prowler oci --config-file /path/to/custom/config +prowler oci --oci-config-file /path/to/custom/config ``` #### Instance Principal Authentication diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 5fd9ca66f4..7fd5b6b734 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -42,6 +42,13 @@ All notable changes to the **Prowler SDK** are documented in this file. --- +## [5.16.2] (Prowler v5.16.2) (UNRELEASED) + +### Fixed +- Fix OCI authentication error handling and validation [(#9738)](https://github.com/prowler-cloud/prowler/pull/9738) + +--- + ## [5.16.1] (Prowler v5.16.1) ### Fixed diff --git a/prowler/providers/oraclecloud/oraclecloud_provider.py b/prowler/providers/oraclecloud/oraclecloud_provider.py index 08d654d580..3ff83e7c22 100644 --- a/prowler/providers/oraclecloud/oraclecloud_provider.py +++ b/prowler/providers/oraclecloud/oraclecloud_provider.py @@ -266,7 +266,6 @@ class OraclecloudProvider(Provider): # If API key credentials are provided directly, create config from them if user and fingerprint and tenancy and region: import base64 - import tempfile logger.info("Using API key credentials from direct parameters") @@ -280,21 +279,19 @@ class OraclecloudProvider(Provider): # Handle private key if key_content: - # Decode base64 key content and write to temp file + # Decode base64 key content try: key_data = base64.b64decode(key_content) - temp_key_file = tempfile.NamedTemporaryFile( - mode="wb", delete=False, suffix=".pem" - ) - temp_key_file.write(key_data) - temp_key_file.close() - config["key_file"] = temp_key_file.name + decoded_key = key_data.decode("utf-8") except Exception as decode_error: logger.error(f"Failed to decode key_content: {decode_error}") raise OCIInvalidConfigError( file=pathlib.Path(__file__).name, message="Failed to decode key_content. Ensure it is base64 encoded.", ) + + # Use OCI SDK's native key_content support + config["key_content"] = decoded_key elif key_file: config["key_file"] = os.path.expanduser(key_file) else: @@ -428,78 +425,85 @@ class OraclecloudProvider(Provider): Raises: - OCIAuthenticationError: If authentication fails. """ - try: - # Get tenancy from config - tenancy_id = session.config.get("tenancy") + # Get tenancy from config + tenancy_id = session.config.get("tenancy") - if not tenancy_id: - raise OCINoCredentialsError( - file=pathlib.Path(__file__).name, - message="Tenancy ID not found in configuration", - ) - - # Validate tenancy OCID format - if not OraclecloudProvider.validate_ocid(tenancy_id, "tenancy"): - raise OCIInvalidTenancyError( - file=pathlib.Path(__file__).name, - message=f"Invalid tenancy OCID format: {tenancy_id}", - ) - - # Get user from config (not available in instance principal) - user_id = session.config.get("user", "instance-principal") - - # Get region from config or use provided region - if not region: - region = session.config.get("region", "us-ashburn-1") - - # Validate region - if region not in OCI_REGIONS: - raise OCIInvalidRegionError( - file=pathlib.Path(__file__).name, - message=f"Invalid region: {region}", - ) - - # Get tenancy name using Identity service - tenancy_name = "unknown" - try: - # Create identity client with proper authentication handling - if session.signer: - identity_client = oci.identity.IdentityClient( - config=session.config, signer=session.signer - ) - else: - identity_client = oci.identity.IdentityClient(config=session.config) - - tenancy = identity_client.get_tenancy(tenancy_id).data - tenancy_name = tenancy.name - logger.info(f"Tenancy Name: {tenancy_name}") - except Exception as error: - logger.warning( - f"Could not retrieve tenancy name: {error}. Using 'unknown'" - ) - - logger.info(f"OCI Tenancy ID: {tenancy_id}") - logger.info(f"OCI User ID: {user_id}") - logger.info(f"OCI Region: {region}") - - return OCIIdentityInfo( - tenancy_id=tenancy_id, - tenancy_name=tenancy_name, - user_id=user_id, - region=region, - profile=session.profile, - audited_regions=set([region]) if region else set(), - audited_compartments=compartment_ids if compartment_ids else [], + if not tenancy_id: + raise OCINoCredentialsError( + file=pathlib.Path(__file__).name, + message="Tenancy ID not found in configuration", ) - except Exception as error: + # Validate tenancy OCID format + if not OraclecloudProvider.validate_ocid(tenancy_id, "tenancy"): + raise OCIInvalidTenancyError( + file=pathlib.Path(__file__).name, + message=f"Invalid tenancy OCID format: {tenancy_id}", + ) + + # Get user from config (not available in instance principal) + user_id = session.config.get("user", "instance-principal") + + # Get region from config or use provided region + if not region: + region = session.config.get("region", "us-ashburn-1") + + # Validate region + if region not in OCI_REGIONS: + raise OCIInvalidRegionError( + file=pathlib.Path(__file__).name, + message=f"Invalid region: {region}", + ) + + # Validate credentials by calling OCI Identity service + try: + if session.signer: + identity_client = oci.identity.IdentityClient( + config=session.config, signer=session.signer + ) + else: + identity_client = oci.identity.IdentityClient(config=session.config) + + tenancy = identity_client.get_tenancy(tenancy_id).data + tenancy_name = tenancy.name + logger.info(f"Tenancy Name: {tenancy_name}") + except oci.exceptions.ServiceError as error: logger.critical( - f"OCIAuthenticationError[{error.__traceback__.tb_lineno}]: {error}" + f"OCI credential validation failed (HTTP {error.status}): {error.message}" ) raise OCIAuthenticationError( - original_exception=error, file=pathlib.Path(__file__).name, + message=f"OCI credential validation failed: {error.message}. Please verify your credentials and try again.", + original_exception=error, ) + except oci.exceptions.InvalidPrivateKey as error: + logger.critical(f"Invalid OCI private key: {error}") + raise OCIAuthenticationError( + file=pathlib.Path(__file__).name, + message="Invalid OCI private key format. Ensure the key is a valid PEM-encoded private key.", + original_exception=error, + ) + except Exception as error: + logger.critical(f"OCI authentication error: {error}") + raise OCIAuthenticationError( + file=pathlib.Path(__file__).name, + message=f"Failed to authenticate with OCI: {error}", + original_exception=error, + ) + + logger.info(f"OCI Tenancy ID: {tenancy_id}") + logger.info(f"OCI User ID: {user_id}") + logger.info(f"OCI Region: {region}") + + return OCIIdentityInfo( + tenancy_id=tenancy_id, + tenancy_name=tenancy_name, + user_id=user_id, + region=region, + profile=session.profile, + audited_regions=set([region]) if region else set(), + audited_compartments=compartment_ids if compartment_ids else [], + ) @staticmethod def validate_ocid(ocid: str, resource_type: str = None) -> bool: @@ -838,7 +842,6 @@ class OraclecloudProvider(Provider): # If API key credentials are provided directly, create config from them if user and fingerprint and tenancy and region: import base64 - import tempfile logger.info("Using API key credentials from direct parameters") @@ -852,21 +855,19 @@ class OraclecloudProvider(Provider): # Handle private key if key_content: - # Decode base64 key content and write to temp file + # Decode base64 key content try: key_data = base64.b64decode(key_content) - temp_key_file = tempfile.NamedTemporaryFile( - mode="wb", delete=False, suffix=".pem" - ) - temp_key_file.write(key_data) - temp_key_file.close() - config["key_file"] = temp_key_file.name + decoded_key = key_data.decode("utf-8") except Exception as decode_error: logger.error(f"Failed to decode key_content: {decode_error}") raise OCIInvalidConfigError( file=pathlib.Path(__file__).name, message="Failed to decode key_content. Ensure it is base64 encoded.", ) + + # Use OCI SDK's native key_content support + config["key_content"] = decoded_key elif key_file: config["key_file"] = os.path.expanduser(key_file) else: diff --git a/tests/providers/oraclecloud/oraclecloud_provider_test.py b/tests/providers/oraclecloud/oraclecloud_provider_test.py new file mode 100644 index 0000000000..dd3b7b7d27 --- /dev/null +++ b/tests/providers/oraclecloud/oraclecloud_provider_test.py @@ -0,0 +1,201 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from prowler.providers.oraclecloud.exceptions.exceptions import ( + OCIAuthenticationError, + OCIInvalidConfigError, +) +from prowler.providers.oraclecloud.models import OCISession +from prowler.providers.oraclecloud.oraclecloud_provider import OraclecloudProvider + + +class TestSetIdentityAuthenticationErrors: + """Tests for authentication error handling in set_identity()""" + + @pytest.fixture + def mock_session(self): + """Create a mock OCI session.""" + session = OCISession( + config={ + "tenancy": "ocid1.tenancy.oc1..aaaaaaaexample", + "user": "ocid1.user.oc1..aaaaaaaexample", + "region": "us-ashburn-1", + "fingerprint": "aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99", + }, + signer=None, + profile="DEFAULT", + ) + return session + + def test_authentication_error_401_raises_exception(self, mock_session): + """Test 401 error raises OCIAuthenticationError.""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.side_effect = self._create_service_error( + 401, "Authentication failed" + ) + mock_identity_client.return_value = mock_client_instance + + with pytest.raises(OCIAuthenticationError) as exc_info: + OraclecloudProvider.set_identity(mock_session) + + assert "OCI credential validation failed" in str(exc_info.value) + + def test_authentication_error_403_raises_exception(self, mock_session): + """Test 403 error raises OCIAuthenticationError.""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.side_effect = self._create_service_error( + 403, "Forbidden access" + ) + mock_identity_client.return_value = mock_client_instance + + with pytest.raises(OCIAuthenticationError) as exc_info: + OraclecloudProvider.set_identity(mock_session) + + assert "OCI credential validation failed" in str(exc_info.value) + + def test_authentication_error_404_raises_exception(self, mock_session): + """Test 404 error raises OCIAuthenticationError.""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.side_effect = self._create_service_error( + 404, "Resource not found" + ) + mock_identity_client.return_value = mock_client_instance + + with pytest.raises(OCIAuthenticationError) as exc_info: + OraclecloudProvider.set_identity(mock_session) + + assert "OCI credential validation failed" in str(exc_info.value) + + def test_service_error_500_raises_exception(self, mock_session): + """Test 500 error raises OCIAuthenticationError (can't validate credentials).""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.side_effect = self._create_service_error( + 500, "Internal server error" + ) + mock_identity_client.return_value = mock_client_instance + + with pytest.raises(OCIAuthenticationError) as exc_info: + OraclecloudProvider.set_identity(mock_session) + + assert "OCI credential validation failed" in str(exc_info.value) + + def test_invalid_private_key_raises_exception(self, mock_session): + """Test InvalidPrivateKey exception raises OCIAuthenticationError.""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + import oci + + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.side_effect = ( + oci.exceptions.InvalidPrivateKey("Invalid private key") + ) + mock_identity_client.return_value = mock_client_instance + + with pytest.raises(OCIAuthenticationError) as exc_info: + OraclecloudProvider.set_identity(mock_session) + + assert "Invalid OCI private key format" in str(exc_info.value) + + def test_generic_exception_raises_authentication_error(self, mock_session): + """Test generic exception raises OCIAuthenticationError.""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.side_effect = Exception("Unexpected error") + mock_identity_client.return_value = mock_client_instance + + with pytest.raises(OCIAuthenticationError) as exc_info: + OraclecloudProvider.set_identity(mock_session) + + assert "Failed to authenticate with OCI" in str(exc_info.value) + + def test_successful_authentication(self, mock_session): + """Test successful authentication returns identity info.""" + with patch("oci.identity.IdentityClient") as mock_identity_client: + mock_tenancy = MagicMock() + mock_tenancy.name = "test-tenancy" + mock_response = MagicMock() + mock_response.data = mock_tenancy + + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.return_value = mock_response + mock_identity_client.return_value = mock_client_instance + + identity = OraclecloudProvider.set_identity(mock_session) + + assert identity.tenancy_name == "test-tenancy" + assert identity.tenancy_id == "ocid1.tenancy.oc1..aaaaaaaexample" + assert identity.user_id == "ocid1.user.oc1..aaaaaaaexample" + assert identity.region == "us-ashburn-1" + + @staticmethod + def _create_service_error(status, message): + """Helper to create an OCI ServiceError.""" + import oci + + error = oci.exceptions.ServiceError( + status=status, + code="TestError", + headers={}, + message=message, + ) + return error + + +class TestTestConnectionKeyValidation: + """Tests for key_content validation in test_connection()""" + + def test_test_connection_invalid_base64_key_raises_error(self): + """Test invalid base64 key content raises OCIInvalidConfigError.""" + with pytest.raises(OCIInvalidConfigError) as exc_info: + OraclecloudProvider.test_connection( + oci_config_file=None, + profile=None, + key_content="not-valid-base64!!!", + user="ocid1.user.oc1..aaaaaaaexample", + fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99", + tenancy="ocid1.tenancy.oc1..aaaaaaaexample", + region="us-ashburn-1", + ) + + assert "Failed to decode key_content" in str(exc_info.value) + + def test_test_connection_valid_key_content_proceeds(self): + """Test valid base64 key content proceeds to authentication.""" + import base64 + + # The SDK will validate the actual key format during authentication + valid_key = """-----BEGIN RSA PRIVATE KEY----- +MIIEpQIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8n0sMcD/QHWCJ7yGSEtLN2T +...key content... +-----END RSA PRIVATE KEY-----""" + encoded_key = base64.b64encode(valid_key.encode("utf-8")).decode("utf-8") + + with ( + patch("oci.config.validate_config"), + patch("oci.identity.IdentityClient") as mock_identity_client, + ): + mock_tenancy = MagicMock() + mock_tenancy.name = "test-tenancy" + mock_response = MagicMock() + mock_response.data = mock_tenancy + + mock_client_instance = MagicMock() + mock_client_instance.get_tenancy.return_value = mock_response + mock_identity_client.return_value = mock_client_instance + + connection = OraclecloudProvider.test_connection( + oci_config_file=None, + profile=None, + key_content=encoded_key, + user="ocid1.user.oc1..aaaaaaaexample", + fingerprint="aa:bb:cc:dd:ee:ff:00:11:22:33:44:55:66:77:88:99", + tenancy="ocid1.tenancy.oc1..aaaaaaaexample", + region="us-ashburn-1", + raise_on_exception=False, + ) + + assert connection.is_connected is True