feat(ui): add Cloudflare provider support (#9910)

Co-authored-by: Alan Buscaglia <gentlemanprogramming@gmail.com>
Co-authored-by: Alejandro Bailo <59607668+alejandrobailo@users.noreply.github.com>
Co-authored-by: alejandrobailo <alejandrobailo94@gmail.com>
This commit is contained in:
Hugo Pereira Brito
2026-02-23 09:33:17 +01:00
committed by GitHub
parent 9f6121bc05
commit bb5a4371bd
32 changed files with 1163 additions and 67 deletions

View File

@@ -6,6 +6,10 @@ from prowler.providers.cloudflare.cloudflare_provider import CloudflareProvider
from prowler.providers.cloudflare.exceptions.exceptions import (
CloudflareCredentialsError,
CloudflareInvalidAccountError,
CloudflareInvalidAPIKeyError,
CloudflareInvalidAPITokenError,
CloudflareNoAccountsError,
CloudflareUserTokenRequiredError,
)
from prowler.providers.cloudflare.models import (
CloudflareAccount,
@@ -95,69 +99,47 @@ class TestCloudflareProvider:
assert provider.session.api_email == API_EMAIL
def test_cloudflare_provider_test_connection_success(self):
with (
patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=MagicMock(),
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
),
patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_identity",
return_value=CloudflareIdentityInfo(
user_id=USER_ID,
email=USER_EMAIL,
accounts=[
CloudflareAccount(
id=ACCOUNT_ID,
name=ACCOUNT_NAME,
type="standard",
)
],
audited_accounts=[ACCOUNT_ID],
),
mock_client = MagicMock()
# Simulate successful user.get() call
mock_client.user.get.return_value = MagicMock(id=USER_ID, email=USER_EMAIL)
with patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
):
provider = CloudflareProvider()
connection = provider.test_connection()
connection = CloudflareProvider.test_connection(api_token=API_TOKEN)
assert isinstance(connection, Connection)
assert connection.is_connected is True
assert connection.error is None
def test_cloudflare_provider_test_connection_failure(self):
def test_cloudflare_provider_test_connection_failure_no_accounts(self):
mock_client = MagicMock()
mock_client.user.get.side_effect = Exception("Connection failed")
mock_client.accounts.list.return_value = iter([]) # Empty accounts list
with (
patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
),
patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_identity",
return_value=CloudflareIdentityInfo(
user_id=USER_ID,
email=USER_EMAIL,
accounts=[],
audited_accounts=[],
),
with patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
):
provider = CloudflareProvider()
connection = provider.test_connection()
connection = CloudflareProvider.test_connection(
api_token=API_TOKEN, raise_on_exception=False
)
assert isinstance(connection, Connection)
assert connection.is_connected is False
assert connection.error is not None
assert isinstance(connection.error, CloudflareNoAccountsError)
def test_cloudflare_provider_no_credentials_raises_error(self):
with patch(
@@ -305,3 +287,262 @@ class TestCloudflareProvider:
assert provider.audit_config is not None
assert provider.fixer_config is not None
assert provider.mutelist is not None
class TestCloudflareValidateCredentials:
"""Tests for validate_credentials method."""
def test_validate_credentials_success(self):
"""Test successful credential validation."""
mock_client = MagicMock()
mock_client.user.get.return_value = MagicMock(id=USER_ID, email=USER_EMAIL)
session = CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
)
# Should not raise any exception
CloudflareProvider.validate_credentials(session)
mock_client.user.get.assert_called_once()
def test_validate_credentials_user_token_required(self):
"""Test that user token required error is raised for Account tokens."""
mock_client = MagicMock()
# Simulate error code 9109 - user-level authentication required
from cloudflare._exceptions import PermissionDeniedError
mock_client.user.get.side_effect = PermissionDeniedError(
"Error code: 403 - {'errors': [{'code': 9109, 'message': 'Valid user-level authentication not found'}]}",
response=MagicMock(status_code=403),
body=None,
)
session = CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
)
with pytest.raises(CloudflareUserTokenRequiredError):
CloudflareProvider.validate_credentials(session)
def test_validate_credentials_invalid_api_token(self):
"""Test that invalid API token error is raised."""
mock_client = MagicMock()
from cloudflare._exceptions import BadRequestError
mock_client.user.get.side_effect = BadRequestError(
"Error code: 400 - {'errors': [{'code': 6003, 'message': 'Invalid request headers', 'error_chain': [{'code': 6111}]}]}",
response=MagicMock(status_code=400),
body=None,
)
session = CloudflareSession(
client=mock_client,
api_token="invalid_token",
api_key=None,
api_email=None,
)
with pytest.raises(CloudflareInvalidAPITokenError):
CloudflareProvider.validate_credentials(session)
def test_validate_credentials_invalid_api_key(self):
"""Test that invalid API key error is raised (403 with code 9103)."""
mock_client = MagicMock()
from cloudflare._exceptions import PermissionDeniedError
# Real error: 403 with code 9103 "Unknown X-Auth-Key or X-Auth-Email"
mock_client.user.get.side_effect = PermissionDeniedError(
"Error code: 403 - {'success': False, 'errors': [{'code': 9103, 'message': 'Unknown X-Auth-Key or X-Auth-Email'}]}",
response=MagicMock(status_code=403),
body=None,
)
session = CloudflareSession(
client=mock_client,
api_token=None,
api_key="invalid_key",
api_email="invalid@email.com",
)
with pytest.raises(CloudflareInvalidAPIKeyError):
CloudflareProvider.validate_credentials(session)
def test_validate_credentials_invalid_api_key_bad_request(self):
"""Test that invalid API key error is raised when using API Key + Email with 6003 error."""
mock_client = MagicMock()
from cloudflare._exceptions import BadRequestError
# Same error code as token but using API Key + Email auth
mock_client.user.get.side_effect = BadRequestError(
"Error code: 400 - {'errors': [{'code': 6003, 'message': 'Invalid request headers'}]}",
response=MagicMock(status_code=400),
body=None,
)
session = CloudflareSession(
client=mock_client,
api_token=None,
api_key="invalid_key",
api_email="invalid@email.com",
)
# Should raise CloudflareInvalidAPIKeyError, NOT CloudflareInvalidAPITokenError
with pytest.raises(CloudflareInvalidAPIKeyError):
CloudflareProvider.validate_credentials(session)
def test_validate_credentials_fallback_to_accounts_list(self):
"""Test fallback to accounts.list() when user.get() fails with non-auth error."""
mock_client = MagicMock()
# Simulate a non-auth error on user.get()
mock_client.user.get.side_effect = Exception("Some other error")
# accounts.list() returns valid accounts
mock_account = MagicMock()
mock_account.id = ACCOUNT_ID
mock_client.accounts.list.return_value = iter([mock_account])
session = CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
)
# Should not raise - fallback succeeded
CloudflareProvider.validate_credentials(session)
mock_client.accounts.list.assert_called_once()
def test_validate_credentials_no_accounts(self):
"""Test that no accounts error is raised when accounts.list() is empty."""
mock_client = MagicMock()
mock_client.user.get.side_effect = Exception("Some error")
mock_client.accounts.list.return_value = iter([]) # Empty
session = CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
)
with pytest.raises(CloudflareNoAccountsError):
CloudflareProvider.validate_credentials(session)
class TestCloudflareTestConnection:
"""Tests for test_connection method."""
def test_test_connection_returns_prowler_exception(self):
"""Test that test_connection returns Prowler exceptions, not raw SDK errors."""
mock_client = MagicMock()
from cloudflare._exceptions import BadRequestError
mock_client.user.get.side_effect = BadRequestError(
"Error code: 400 - {'errors': [{'code': 6003}]}",
response=MagicMock(status_code=400),
body=None,
)
with patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
):
connection = CloudflareProvider.test_connection(
api_token=API_TOKEN, raise_on_exception=False
)
assert connection.is_connected is False
assert isinstance(connection.error, CloudflareInvalidAPITokenError)
def test_test_connection_user_token_required(self):
"""Test that user token required error is properly returned."""
mock_client = MagicMock()
from cloudflare._exceptions import PermissionDeniedError
mock_client.user.get.side_effect = PermissionDeniedError(
"Error code: 403 - {'errors': [{'code': 9109}]}",
response=MagicMock(status_code=403),
body=None,
)
with patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
):
connection = CloudflareProvider.test_connection(
api_token=API_TOKEN, raise_on_exception=False
)
assert connection.is_connected is False
assert isinstance(connection.error, CloudflareUserTokenRequiredError)
# Verify the error message is user-friendly
assert "User-level API token required" in str(connection.error)
def test_test_connection_invalid_api_key(self):
"""Test that invalid API key error is properly returned."""
mock_client = MagicMock()
from cloudflare._exceptions import BadRequestError
mock_client.user.get.side_effect = BadRequestError(
"Unknown X-Auth-Key or X-Auth-Email",
response=MagicMock(status_code=400),
body=None,
)
with patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=None,
api_key=API_KEY,
api_email=API_EMAIL,
),
):
connection = CloudflareProvider.test_connection(
api_key=API_KEY, api_email=API_EMAIL, raise_on_exception=False
)
assert connection.is_connected is False
assert isinstance(connection.error, CloudflareInvalidAPIKeyError)
# Verify the error message is user-friendly
assert "Invalid API Key or Email" in str(connection.error)
def test_test_connection_raises_when_requested(self):
"""Test that exceptions are raised when raise_on_exception=True."""
mock_client = MagicMock()
from cloudflare._exceptions import BadRequestError
mock_client.user.get.side_effect = BadRequestError(
"Error code: 400 - {'errors': [{'code': 6003}]}",
response=MagicMock(status_code=400),
body=None,
)
with patch(
"prowler.providers.cloudflare.cloudflare_provider.CloudflareProvider.setup_session",
return_value=CloudflareSession(
client=mock_client,
api_token=API_TOKEN,
api_key=None,
api_email=None,
),
):
with pytest.raises(CloudflareInvalidAPITokenError):
CloudflareProvider.test_connection(
api_token=API_TOKEN, raise_on_exception=True
)