mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-12 12:48:47 +00:00
Compare commits
1 Commits
fix-beat-a
...
feat/prowl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
96f94aba43 |
@@ -21,6 +21,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- `entra_conditional_access_policy_device_registration_mfa_required` check and `entra_intune_enrollment_sign_in_frequency_every_time` enhancement for M365 provider [(#10222)](https://github.com/prowler-cloud/prowler/pull/10222)
|
||||
- `entra_conditional_access_policy_block_elevated_insider_risk` check for M365 provider [(#10234)](https://github.com/prowler-cloud/prowler/pull/10234)
|
||||
- `Vercel` provider support with 30 checks [(#10189)](https://github.com/prowler-cloud/prowler/pull/10189)
|
||||
- `api_token_ip_restriction_enabled` check for cloudflare provider [(#10624)](https://github.com/prowler-cloud/prowler/pull/10624)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
|
||||
4
prowler/providers/cloudflare/services/api/api_client.py
Normal file
4
prowler/providers/cloudflare/services/api/api_client.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from prowler.providers.cloudflare.services.api.api_service import API
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
api_client = API(Provider.get_global_provider())
|
||||
66
prowler/providers/cloudflare/services/api/api_service.py
Normal file
66
prowler/providers/cloudflare/services/api/api_service.py
Normal file
@@ -0,0 +1,66 @@
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.cloudflare.lib.service.service import CloudflareService
|
||||
|
||||
|
||||
class API(CloudflareService):
|
||||
"""Retrieve Cloudflare API tokens for the authenticated user."""
|
||||
|
||||
def __init__(self, provider):
|
||||
super().__init__(__class__.__name__, provider)
|
||||
self.tokens: list["CloudflareAPIToken"] = []
|
||||
self._list_api_tokens()
|
||||
|
||||
def _list_api_tokens(self) -> None:
|
||||
"""List all API tokens for the authenticated user."""
|
||||
logger.info("APIToken - Listing API tokens...")
|
||||
try:
|
||||
seen_token_ids: set[str] = set()
|
||||
for token in self.client.user.tokens.list():
|
||||
token_id = getattr(token, "id", None)
|
||||
# Prevent infinite loop
|
||||
if token_id in seen_token_ids:
|
||||
break
|
||||
seen_token_ids.add(token_id)
|
||||
|
||||
# Extract IP condition details
|
||||
condition = getattr(token, "condition", None)
|
||||
request_ip = getattr(condition, "request_ip", None) if condition else None
|
||||
ip_in = getattr(request_ip, "in_", None) if request_ip else None
|
||||
ip_not_in = getattr(request_ip, "not_in", None) if request_ip else None
|
||||
|
||||
self.tokens.append(
|
||||
CloudflareAPIToken(
|
||||
id=token_id,
|
||||
name=getattr(token, "name", None),
|
||||
status=getattr(token, "status", None),
|
||||
ip_allow_list=ip_in or [],
|
||||
ip_deny_list=ip_not_in or [],
|
||||
expires_on=getattr(token, "expires_on", None),
|
||||
issued_on=getattr(token, "issued_on", None),
|
||||
last_used_on=getattr(token, "last_used_on", None),
|
||||
modified_on=getattr(token, "modified_on", None),
|
||||
)
|
||||
)
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class CloudflareAPIToken(BaseModel):
|
||||
"""Cloudflare API token representation."""
|
||||
|
||||
id: str
|
||||
name: Optional[str] = None
|
||||
status: Optional[str] = None
|
||||
ip_allow_list: list[str] = []
|
||||
ip_deny_list: list[str] = []
|
||||
expires_on: Optional[datetime] = None
|
||||
issued_on: Optional[datetime] = None
|
||||
last_used_on: Optional[datetime] = None
|
||||
modified_on: Optional[datetime] = None
|
||||
@@ -0,0 +1,36 @@
|
||||
{
|
||||
"Provider": "cloudflare",
|
||||
"CheckID": "api_token_ip_restriction_enabled",
|
||||
"CheckTitle": "IP address filtering restricts API token usage to authorized networks",
|
||||
"CheckType": [],
|
||||
"ServiceName": "api",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Other",
|
||||
"ResourceGroup": "IAM",
|
||||
"Description": "**Cloudflare API tokens** are assessed for **client IP address filtering** by checking whether an allow list or deny list of IP addresses/CIDR ranges is configured to restrict which networks can use the token.",
|
||||
"Risk": "Without **IP address filtering**, a compromised API token can be used from **any network location**.\n- **Confidentiality**: attackers with a stolen token can access account resources from any IP\n- **Integrity**: unauthorized modifications can be made from uncontrolled networks\n- Restricting tokens to known IP ranges limits the blast radius of credential theft",
|
||||
"RelatedUrl": "",
|
||||
"AdditionalURLs": [
|
||||
"https://developers.cloudflare.com/fundamentals/api/get-started/create-token/"
|
||||
],
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "1. Log in to the Cloudflare dashboard\n2. Go to My Profile > API Tokens\n3. Select the token to edit and click the three-dot menu > Edit\n4. Under Client IP Address Filtering, add the IP addresses or CIDR ranges that should be allowed or denied\n5. Save the token",
|
||||
"Terraform": "```hcl\nresource \"cloudflare_api_token\" \"example_resource\" {\n name = \"example_resource\"\n\n policy {\n permission_groups = [\"<PERMISSION_GROUP_ID>\"]\n resources = { \"com.cloudflare.api.account.*\" = \"*\" }\n }\n\n # Restrict token to specific IP ranges\n condition {\n request_ip {\n in = [\"192.0.2.0/24\", \"198.51.100.0/24\"]\n }\n }\n}\n```"
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Configure **client IP address filtering** on all API tokens following the **principle of least privilege**.\n- Restrict tokens to known corporate or CI/CD IP ranges\n- Use CIDR notation to allow specific networks rather than broad ranges\n- Regularly review and update allowed IP lists as infrastructure changes",
|
||||
"Url": "https://hub.prowler.com/checks/cloudflare/api_token_ip_restriction_enabled"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"identity-access"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "API tokens can have both allow lists and deny lists for IP filtering. This check verifies that at least one form of IP restriction is configured. Tokens used by CI/CD pipelines should be restricted to the IP ranges of the pipeline runners."
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
from prowler.lib.check.models import Check, CheckReportCloudflare
|
||||
from prowler.providers.cloudflare.services.api.api_client import api_client
|
||||
|
||||
|
||||
class api_token_ip_restriction_enabled(Check):
|
||||
"""Ensure that Cloudflare API tokens have client IP address filtering configured.
|
||||
|
||||
API tokens with IP address filtering restrict token usage to requests originating
|
||||
from specific IP addresses or CIDR ranges. Without IP filtering, a compromised
|
||||
token can be used from any network location.
|
||||
|
||||
- PASS: The API token has at least one IP address restriction configured.
|
||||
- FAIL: The API token has no IP address filtering configured.
|
||||
"""
|
||||
|
||||
def execute(self) -> list[CheckReportCloudflare]:
|
||||
"""Execute the API token IP restriction check.
|
||||
|
||||
Iterates through all Cloudflare API tokens and verifies that each token
|
||||
has client IP address filtering configured via allow or deny lists.
|
||||
|
||||
Returns:
|
||||
A list of CheckReportCloudflare objects with PASS status if IP
|
||||
filtering is configured, or FAIL status if no IP restrictions exist.
|
||||
"""
|
||||
findings = []
|
||||
for token in api_client.tokens:
|
||||
report = CheckReportCloudflare(
|
||||
metadata=self.metadata(),
|
||||
resource=token,
|
||||
)
|
||||
|
||||
has_ip_restriction = bool(token.ip_allow_list) or bool(
|
||||
token.ip_deny_list
|
||||
)
|
||||
|
||||
if has_ip_restriction:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"API token {token.name} has client IP address filtering configured."
|
||||
)
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"API token {token.name} does not have client IP address filtering configured."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
0
tests/providers/cloudflare/services/api/__init__.py
Normal file
0
tests/providers/cloudflare/services/api/__init__.py
Normal file
@@ -0,0 +1,211 @@
|
||||
from unittest import mock
|
||||
|
||||
from prowler.providers.cloudflare.services.api.api_service import (
|
||||
CloudflareAPIToken,
|
||||
)
|
||||
from tests.providers.cloudflare.cloudflare_fixtures import (
|
||||
set_mocked_cloudflare_provider,
|
||||
)
|
||||
|
||||
TOKEN_ID = "test-token-id"
|
||||
TOKEN_NAME = "Test API Token"
|
||||
|
||||
|
||||
class Test_api_token_ip_restriction_enabled:
|
||||
"""Tests for the api_token_ip_restriction_enabled check."""
|
||||
|
||||
def test_no_tokens(self):
|
||||
api_client = mock.MagicMock
|
||||
api_client.tokens = []
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_cloudflare_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled.api_client",
|
||||
new=api_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled import (
|
||||
api_token_ip_restriction_enabled,
|
||||
)
|
||||
|
||||
check = api_token_ip_restriction_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 0
|
||||
|
||||
def test_token_with_ip_allow_list(self):
|
||||
api_client = mock.MagicMock
|
||||
api_client.tokens = [
|
||||
CloudflareAPIToken(
|
||||
id=TOKEN_ID,
|
||||
name=TOKEN_NAME,
|
||||
status="active",
|
||||
ip_allow_list=["192.0.2.0/24", "198.51.100.0/24"],
|
||||
ip_deny_list=[],
|
||||
)
|
||||
]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_cloudflare_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled.api_client",
|
||||
new=api_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled import (
|
||||
api_token_ip_restriction_enabled,
|
||||
)
|
||||
|
||||
check = api_token_ip_restriction_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].resource_id == TOKEN_ID
|
||||
assert result[0].resource_name == TOKEN_NAME
|
||||
assert result[0].status == "PASS"
|
||||
assert "has client IP address filtering configured" in result[0].status_extended
|
||||
|
||||
def test_token_with_ip_deny_list(self):
|
||||
api_client = mock.MagicMock
|
||||
api_client.tokens = [
|
||||
CloudflareAPIToken(
|
||||
id=TOKEN_ID,
|
||||
name=TOKEN_NAME,
|
||||
status="active",
|
||||
ip_allow_list=[],
|
||||
ip_deny_list=["10.0.0.0/8"],
|
||||
)
|
||||
]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_cloudflare_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled.api_client",
|
||||
new=api_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled import (
|
||||
api_token_ip_restriction_enabled,
|
||||
)
|
||||
|
||||
check = api_token_ip_restriction_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].resource_id == TOKEN_ID
|
||||
assert result[0].status == "PASS"
|
||||
assert "has client IP address filtering configured" in result[0].status_extended
|
||||
|
||||
def test_token_with_both_allow_and_deny_lists(self):
|
||||
api_client = mock.MagicMock
|
||||
api_client.tokens = [
|
||||
CloudflareAPIToken(
|
||||
id=TOKEN_ID,
|
||||
name=TOKEN_NAME,
|
||||
status="active",
|
||||
ip_allow_list=["192.0.2.0/24"],
|
||||
ip_deny_list=["10.0.0.0/8"],
|
||||
)
|
||||
]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_cloudflare_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled.api_client",
|
||||
new=api_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled import (
|
||||
api_token_ip_restriction_enabled,
|
||||
)
|
||||
|
||||
check = api_token_ip_restriction_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert "has client IP address filtering configured" in result[0].status_extended
|
||||
|
||||
def test_token_without_ip_restriction(self):
|
||||
api_client = mock.MagicMock
|
||||
api_client.tokens = [
|
||||
CloudflareAPIToken(
|
||||
id=TOKEN_ID,
|
||||
name=TOKEN_NAME,
|
||||
status="active",
|
||||
ip_allow_list=[],
|
||||
ip_deny_list=[],
|
||||
)
|
||||
]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_cloudflare_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled.api_client",
|
||||
new=api_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled import (
|
||||
api_token_ip_restriction_enabled,
|
||||
)
|
||||
|
||||
check = api_token_ip_restriction_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].resource_id == TOKEN_ID
|
||||
assert result[0].resource_name == TOKEN_NAME
|
||||
assert result[0].status == "FAIL"
|
||||
assert "does not have client IP address filtering configured" in result[0].status_extended
|
||||
|
||||
def test_multiple_tokens_mixed(self):
|
||||
api_client = mock.MagicMock
|
||||
api_client.tokens = [
|
||||
CloudflareAPIToken(
|
||||
id="token-1",
|
||||
name="Restricted Token",
|
||||
status="active",
|
||||
ip_allow_list=["192.0.2.0/24"],
|
||||
ip_deny_list=[],
|
||||
),
|
||||
CloudflareAPIToken(
|
||||
id="token-2",
|
||||
name="Unrestricted Token",
|
||||
status="active",
|
||||
ip_allow_list=[],
|
||||
ip_deny_list=[],
|
||||
),
|
||||
]
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_cloudflare_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
"prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled.api_client",
|
||||
new=api_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.cloudflare.services.api.api_token_ip_restriction_enabled.api_token_ip_restriction_enabled import (
|
||||
api_token_ip_restriction_enabled,
|
||||
)
|
||||
|
||||
check = api_token_ip_restriction_enabled()
|
||||
result = check.execute()
|
||||
assert len(result) == 2
|
||||
assert result[0].resource_id == "token-1"
|
||||
assert result[0].status == "PASS"
|
||||
assert result[1].resource_id == "token-2"
|
||||
assert result[1].status == "FAIL"
|
||||
@@ -0,0 +1,79 @@
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from prowler.providers.cloudflare.services.api.api_service import (
|
||||
CloudflareAPIToken,
|
||||
)
|
||||
|
||||
|
||||
class Test_CloudflareAPIToken_Model:
|
||||
def test_cloudflare_api_token_model(self):
|
||||
token = CloudflareAPIToken(
|
||||
id="token-123",
|
||||
name="My API Token",
|
||||
status="active",
|
||||
ip_allow_list=["192.0.2.0/24"],
|
||||
ip_deny_list=["10.0.0.0/8"],
|
||||
expires_on=datetime(2026, 12, 31, tzinfo=timezone.utc),
|
||||
issued_on=datetime(2026, 1, 1, tzinfo=timezone.utc),
|
||||
last_used_on=datetime(2026, 4, 1, tzinfo=timezone.utc),
|
||||
modified_on=datetime(2026, 3, 15, tzinfo=timezone.utc),
|
||||
)
|
||||
assert token.id == "token-123"
|
||||
assert token.name == "My API Token"
|
||||
assert token.status == "active"
|
||||
assert token.ip_allow_list == ["192.0.2.0/24"]
|
||||
assert token.ip_deny_list == ["10.0.0.0/8"]
|
||||
assert token.expires_on == datetime(2026, 12, 31, tzinfo=timezone.utc)
|
||||
assert token.issued_on == datetime(2026, 1, 1, tzinfo=timezone.utc)
|
||||
assert token.last_used_on == datetime(2026, 4, 1, tzinfo=timezone.utc)
|
||||
assert token.modified_on == datetime(2026, 3, 15, tzinfo=timezone.utc)
|
||||
|
||||
def test_cloudflare_api_token_defaults(self):
|
||||
token = CloudflareAPIToken(id="token-456")
|
||||
assert token.id == "token-456"
|
||||
assert token.name is None
|
||||
assert token.status is None
|
||||
assert token.ip_allow_list == []
|
||||
assert token.ip_deny_list == []
|
||||
assert token.expires_on is None
|
||||
assert token.issued_on is None
|
||||
assert token.last_used_on is None
|
||||
assert token.modified_on is None
|
||||
|
||||
def test_cloudflare_api_token_with_ip_allow_list_only(self):
|
||||
token = CloudflareAPIToken(
|
||||
id="token-789",
|
||||
name="Restricted Token",
|
||||
status="active",
|
||||
ip_allow_list=["192.0.2.0/24", "198.51.100.0/24"],
|
||||
)
|
||||
assert token.ip_allow_list == ["192.0.2.0/24", "198.51.100.0/24"]
|
||||
assert token.ip_deny_list == []
|
||||
|
||||
def test_cloudflare_api_token_with_ip_deny_list_only(self):
|
||||
token = CloudflareAPIToken(
|
||||
id="token-101",
|
||||
name="Deny-list Token",
|
||||
status="active",
|
||||
ip_deny_list=["10.0.0.0/8", "172.16.0.0/12"],
|
||||
)
|
||||
assert token.ip_allow_list == []
|
||||
assert token.ip_deny_list == ["10.0.0.0/8", "172.16.0.0/12"]
|
||||
|
||||
def test_cloudflare_api_token_disabled_status(self):
|
||||
token = CloudflareAPIToken(
|
||||
id="token-disabled",
|
||||
name="Disabled Token",
|
||||
status="disabled",
|
||||
)
|
||||
assert token.status == "disabled"
|
||||
|
||||
def test_cloudflare_api_token_expired_status(self):
|
||||
token = CloudflareAPIToken(
|
||||
id="token-expired",
|
||||
name="Expired Token",
|
||||
status="expired",
|
||||
expires_on=datetime(2025, 1, 1, tzinfo=timezone.utc),
|
||||
)
|
||||
assert token.status == "expired"
|
||||
assert token.expires_on == datetime(2025, 1, 1, tzinfo=timezone.utc)
|
||||
Reference in New Issue
Block a user