Compare commits

..

7 Commits

Author SHA1 Message Date
Prowler Bot a578f4af34 chore: prepare API and UI changelogs for 5.30.1 release (#11566)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-06-12 12:16:15 +02:00
Prowler Bot d6528b674e fix(ui): show threat map data for okta and google workspace accounts (#11563)
Co-authored-by: Alejandro Bailo <59607668+alejandrobailo@users.noreply.github.com>
2026-06-12 10:18:43 +02:00
Prowler Bot 75decbbedf fix(api): drop_subgraph deletes relationships then nodes to cut Neo4j memory (#11561)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-06-12 09:47:41 +02:00
Prowler Bot 4a14559a5f fix(compliance): resolve provider from scan in attributes endp (#11560)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2026-06-12 09:18:11 +02:00
Prowler Bot c6f8620a0d fix(api): normalize OCI scan region credentials (#11559)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-06-11 17:55:26 +02:00
Prowler Bot ca4889b43e chore(release): Bump versions to v5.30.1 (#11547)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-06-11 15:28:54 +02:00
Prowler Bot 057d061c7e chore(api): Update prowler dependency to v5.30 for release 5.30.0 (#11543)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-06-11 11:15:18 +02:00
51 changed files with 101 additions and 3642 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.31.0
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.30.1
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
-8
View File
@@ -2,14 +2,6 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.32.0] (Prowler UNRELEASED)
### 🚀 Added
- Provider group filters for API endpoints that support cloud provider filtering, including exact and `__in` variants [(#11573)](https://github.com/prowler-cloud/prowler/pull/11573)
---
## [1.31.1] (Prowler v5.30.1)
### 🐞 Fixed
+2 -2
View File
@@ -43,7 +43,7 @@ dependencies = [
"defusedxml==0.7.1",
"gunicorn==23.0.0",
"lxml==6.1.0",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@v5.30",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (==1.3.0)",
"sentry-sdk[django] (==2.56.0)",
@@ -68,7 +68,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.32.0"
version = "1.31.1"
[tool.uv]
# Transitive pins matching master to avoid silent drift; bump deliberately.
+2 -108
View File
@@ -102,7 +102,7 @@ class BaseProviderFilter(FilterSet):
"""
Abstract base filter for models with direct FK to Provider.
Provides standard provider_id, provider_type, and provider_groups filters.
Provides standard provider_id and provider_type filters.
Subclasses must define Meta.model.
"""
@@ -116,16 +116,6 @@ class BaseProviderFilter(FilterSet):
choices=Provider.ProviderChoices.choices,
lookup_expr="in",
)
provider_groups = UUIDFilter(
field_name="provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
class Meta:
abstract = True
@@ -136,7 +126,7 @@ class BaseScanProviderFilter(FilterSet):
"""
Abstract base filter for models with FK to Scan (and Scan has FK to Provider).
Provides standard provider_id, provider_type, and provider_groups filters via scan relationship.
Provides standard provider_id and provider_type filters via scan relationship.
Subclasses must define Meta.model.
"""
@@ -150,16 +140,6 @@ class BaseScanProviderFilter(FilterSet):
choices=Provider.ProviderChoices.choices,
lookup_expr="in",
)
provider_groups = UUIDFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
class Meta:
abstract = True
@@ -180,16 +160,6 @@ class CommonFindingFilters(FilterSet):
provider_type__in = ChoiceInFilter(
choices=Provider.ProviderChoices.choices, field_name="scan__provider__provider"
)
provider_groups = UUIDFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
provider_uid = CharFilter(field_name="scan__provider__uid", lookup_expr="exact")
provider_uid__in = CharInFilter(field_name="scan__provider__uid", lookup_expr="in")
provider_uid__icontains = CharFilter(
@@ -400,12 +370,6 @@ class ProviderFilter(FilterSet):
choices=Provider.ProviderChoices.choices,
lookup_expr="in",
)
provider_groups = UUIDFilter(
field_name="provider_groups__id", lookup_expr="exact", distinct=True
)
provider_groups__in = UUIDInFilter(
field_name="provider_groups__id", lookup_expr="in", distinct=True
)
class Meta:
model = Provider
@@ -431,16 +395,6 @@ class ProviderRelationshipFilterSet(FilterSet):
provider_type__in = ChoiceInFilter(
choices=Provider.ProviderChoices.choices, field_name="provider__provider"
)
provider_groups = UUIDFilter(
field_name="provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
provider_uid = CharFilter(field_name="provider__uid", lookup_expr="exact")
provider_uid__in = CharInFilter(field_name="provider__uid", lookup_expr="in")
provider_uid__icontains = CharFilter(
@@ -1047,16 +1001,6 @@ class FindingGroupSummaryFilter(_CheckTitleToCheckIdMixin, FilterSet):
field_name="provider__provider", choices=Provider.ProviderChoices.choices
)
provider_type__in = CharInFilter(field_name="provider__provider", lookup_expr="in")
provider_groups = UUIDFilter(
field_name="provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
class Meta:
model = FindingGroupDailySummary
@@ -1157,16 +1101,6 @@ class LatestFindingGroupSummaryFilter(_CheckTitleToCheckIdMixin, FilterSet):
field_name="provider__provider", choices=Provider.ProviderChoices.choices
)
provider_type__in = CharInFilter(field_name="provider__provider", lookup_expr="in")
provider_groups = UUIDFilter(
field_name="provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
class Meta:
model = FindingGroupDailySummary
@@ -1372,16 +1306,6 @@ class ScanSummaryFilter(FilterSet):
provider_type__in = ChoiceInFilter(
field_name="scan__provider__provider", choices=Provider.ProviderChoices.choices
)
provider_groups = UUIDFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
region = CharFilter(field_name="region")
class Meta:
@@ -1405,16 +1329,6 @@ class DailySeveritySummaryFilter(FilterSet):
provider_type__in = ChoiceInFilter(
field_name="provider__provider", choices=Provider.ProviderChoices.choices
)
provider_groups = UUIDFilter(
field_name="provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
date_from = DateFilter(method="filter_noop")
date_to = DateFilter(method="filter_noop")
@@ -1671,16 +1585,6 @@ class ThreatScoreSnapshotFilter(FilterSet):
choices=Provider.ProviderChoices.choices,
lookup_expr="in",
)
provider_groups = UUIDFilter(
field_name="provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
compliance_id = CharFilter(field_name="compliance_id", lookup_expr="exact")
compliance_id__in = CharInFilter(field_name="compliance_id", lookup_expr="in")
@@ -1724,16 +1628,6 @@ class ResourceGroupOverviewFilter(FilterSet):
choices=Provider.ProviderChoices.choices,
lookup_expr="in",
)
provider_groups = UUIDFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="exact",
distinct=True,
)
provider_groups__in = UUIDInFilter(
field_name="scan__provider__provider_groups__id",
lookup_expr="in",
distinct=True,
)
resource_group = CharFilter(field_name="resource_group", lookup_expr="exact")
resource_group__in = CharInFilter(field_name="resource_group", lookup_expr="in")
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.32.0
version: 1.31.1
description: |-
Prowler API specification.
+67 -458
View File
@@ -1411,42 +1411,6 @@ class TestProviderViewSet:
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_providers_filter_provider_groups(
self,
authenticated_client,
tenants_fixture,
providers_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
provider1, provider2, *_ = providers_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider2, provider_group=group2
)
response = authenticated_client.get(
reverse("provider-list"), {"filter[provider_groups]": str(group1.id)}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert [item["id"] for item in data] == [str(provider1.id)]
response = authenticated_client.get(
reverse("provider-list"),
{"filter[provider_groups__in]": f"{group1.id},{group2.id}"},
)
assert response.status_code == status.HTTP_200_OK
provider_ids = {item["id"] for item in response.json()["data"]}
assert provider_ids == {str(provider1.id), str(provider2.id)}
assert len(response.json()["data"]) == 2
def test_providers_disable_pagination(
self, authenticated_client, providers_fixture, tenants_fixture
):
@@ -1508,9 +1472,9 @@ class TestProviderViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
def test_providers_retrieve(self, authenticated_client, providers_fixture):
provider1, *_ = providers_fixture
@@ -3751,41 +3715,6 @@ class TestScanViewSet:
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == expected_count
def test_scans_filter_provider_groups(
self,
authenticated_client,
tenants_fixture,
scans_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
scan1, scan2, *_ = scans_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=scan1.provider, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=scan1.provider, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=scan2.provider, provider_group=group2
)
response = authenticated_client.get(
reverse("scan-list"), {"filter[provider_groups]": str(group1.id)}
)
assert response.status_code == status.HTTP_200_OK
assert {item["id"] for item in response.json()["data"]} == {str(scan1.id)}
response = authenticated_client.get(
reverse("scan-list"),
{"filter[provider_groups__in]": f"{group1.id},{group2.id}"},
)
assert response.status_code == status.HTTP_200_OK
scan_ids = {item["id"] for item in response.json()["data"]}
assert scan_ids == {str(scan1.id), str(scan2.id), str(scans_fixture[2].id)}
assert len(response.json()["data"]) == 3
@pytest.mark.parametrize(
"filter_name",
[
@@ -5785,13 +5714,13 @@ class TestAttackPathsScanViewSet:
content_type=API_JSON_CONTENT_TYPE,
)
if i < 10:
assert response.status_code == status.HTTP_200_OK, (
f"Request {i + 1} should succeed with 200 OK, got {response.status_code}"
)
assert (
response.status_code == status.HTTP_200_OK
), f"Request {i + 1} should succeed with 200 OK, got {response.status_code}"
else:
assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS, (
f"Request {i + 1} should be throttled"
)
assert (
response.status_code == status.HTTP_429_TOO_MANY_REQUESTS
), f"Request {i + 1} should be throttled"
# -- Timeout simulation -------------------------------------------------------
@@ -5994,9 +5923,9 @@ class TestResourceViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
@pytest.mark.parametrize(
"filter_name, filter_value, expected_count",
@@ -6067,49 +5996,6 @@ class TestResourceViewSet:
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == expected_count
def test_resource_filter_provider_groups(
self,
authenticated_client,
tenants_fixture,
resources_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
resource1, resource2, resource3, *_ = resources_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=resource1.provider, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=resource1.provider, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=resource3.provider, provider_group=group2
)
response = authenticated_client.get(
reverse("resource-list"),
{"filter[updated_at]": TODAY, "filter[provider_groups]": str(group1.id)},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 2
assert {item["id"] for item in response.json()["data"]} == {
str(resource1.id),
str(resource2.id),
}
response = authenticated_client.get(
reverse("resource-list"),
{
"filter[updated_at]": TODAY,
"filter[provider_groups__in]": f"{group1.id},{group2.id}",
},
)
assert response.status_code == status.HTTP_200_OK
resource_ids = {item["id"] for item in response.json()["data"]}
assert resource_ids == {str(resource1.id), str(resource2.id), str(resource3.id)}
assert len(response.json()["data"]) == 3
def test_resource_filter_by_scan_id(
self, authenticated_client, resources_fixture, scans_fixture
):
@@ -6588,9 +6474,9 @@ class TestResourceViewSet:
(e for e in errors if e["source"]["parameter"] == expected_invalid_param),
None,
)
assert error is not None, (
f"Expected error for parameter '{expected_invalid_param}'"
)
assert (
error is not None
), f"Expected error for parameter '{expected_invalid_param}'"
assert error["code"] == "invalid"
assert error["status"] == "400" # Must be string per JSON:API spec
assert expected_invalid_param in error["detail"]
@@ -7122,16 +7008,16 @@ class TestResourceViewSet:
# Test with completely malformed token
client.credentials(HTTP_AUTHORIZATION="Bearer not.a.valid.jwt.token")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
assert response.status_code == status.HTTP_401_UNAUTHORIZED, (
f"Expected 401 for malformed token but got {response.status_code}"
)
assert (
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"Expected 401 for malformed token but got {response.status_code}"
# Test with empty bearer token
client.credentials(HTTP_AUTHORIZATION="Bearer ")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
assert response.status_code == status.HTTP_401_UNAUTHORIZED, (
f"Expected 401 for empty bearer token but got {response.status_code}"
)
assert (
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"Expected 401 for empty bearer token but got {response.status_code}"
@pytest.mark.django_db
@@ -7266,9 +7152,9 @@ class TestFindingViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
@pytest.mark.parametrize(
"filter_name, filter_value, expected_count",
@@ -7422,40 +7308,6 @@ class TestFindingViewSet:
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 2
def test_finding_filter_provider_groups(
self,
authenticated_client,
tenants_fixture,
findings_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
finding1, finding2, *_ = findings_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=finding1.scan.provider, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=finding1.scan.provider, provider_group=group2
)
response = authenticated_client.get(
reverse("finding-list"),
{"filter[inserted_at]": TODAY, "filter[provider_groups]": str(group1.id)},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 2
response = authenticated_client.get(
reverse("finding-list"),
{
"filter[inserted_at]": TODAY,
"filter[provider_groups__in]": f"{group1.id},{group2.id}",
},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 2
@pytest.mark.parametrize(
"filter_name",
(
@@ -7867,9 +7719,9 @@ class TestJWTFields:
reverse("token-obtain"), data, format="json"
)
assert response.status_code == status.HTTP_200_OK, (
f"Unexpected status code: {response.status_code}"
)
assert (
response.status_code == status.HTTP_200_OK
), f"Unexpected status code: {response.status_code}"
access_token = response.data["attributes"]["access"]
payload = jwt.decode(access_token, options={"verify_signature": False})
@@ -7883,23 +7735,23 @@ class TestJWTFields:
# Verify expected fields
for field in expected_fields:
assert field in payload, f"The field '{field}' is not in the JWT"
assert payload[field] == expected_fields[field], (
f"The value of '{field}' does not match"
)
assert (
payload[field] == expected_fields[field]
), f"The value of '{field}' does not match"
# Verify time fields are integers
for time_field in ["exp", "iat", "nbf"]:
assert time_field in payload, f"The field '{time_field}' is not in the JWT"
assert isinstance(payload[time_field], int), (
f"The field '{time_field}' is not an integer"
)
assert isinstance(
payload[time_field], int
), f"The field '{time_field}' is not an integer"
# Verify identification fields are non-empty strings
for id_field in ["jti", "sub", "tenant_id"]:
assert id_field in payload, f"The field '{id_field}' is not in the JWT"
assert isinstance(payload[id_field], str) and payload[id_field], (
f"The field '{id_field}' is not a valid string"
)
assert (
isinstance(payload[id_field], str) and payload[id_field]
), f"The field '{id_field}' is not a valid string"
@pytest.mark.django_db
@@ -10726,87 +10578,6 @@ class TestOverviewViewSet:
assert combined_attributes["muted"] == 3
assert combined_attributes["total"] == 14
def test_overview_findings_provider_groups_filter(
self,
authenticated_client,
tenants_fixture,
providers_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
provider1, provider2, *_ = providers_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider2, provider_group=group2
)
scan1 = Scan.objects.create(
name="scan-provider-group-one",
provider=provider1,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant=tenant,
)
scan2 = Scan.objects.create(
name="scan-provider-group-two",
provider=provider2,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.COMPLETED,
tenant=tenant,
)
ScanSummary.objects.create(
tenant=tenant,
scan=scan1,
check_id="check-provider-group-one",
service="service-a",
severity="high",
region="region-a",
_pass=5,
fail=1,
muted=2,
total=8,
)
ScanSummary.objects.create(
tenant=tenant,
scan=scan2,
check_id="check-provider-group-two",
service="service-b",
severity="medium",
region="region-b",
_pass=2,
fail=3,
muted=1,
total=6,
)
response = authenticated_client.get(
reverse("overview-findings"),
{"filter[provider_groups]": str(group1.id)},
)
assert response.status_code == status.HTTP_200_OK
attributes = response.json()["data"]["attributes"]
assert attributes["pass"] == 5
assert attributes["fail"] == 1
assert attributes["muted"] == 2
assert attributes["total"] == 8
response = authenticated_client.get(
reverse("overview-findings"),
{"filter[provider_groups__in]": f"{group1.id},{group2.id}"},
)
assert response.status_code == status.HTTP_200_OK
attributes = response.json()["data"]["attributes"]
assert attributes["pass"] == 7
assert attributes["fail"] == 4
assert attributes["muted"] == 3
assert attributes["total"] == 14
def test_overview_findings_severity_provider_id_in_filter(
self, authenticated_client, tenants_fixture, providers_fixture
):
@@ -11575,21 +11346,9 @@ class TestOverviewViewSet:
@pytest.mark.parametrize(
"filter_key,filter_value_fn,expected_total,expected_failed",
[
("filter[provider_id]", lambda p1, *_: str(p1.id), 10, 5),
("filter[provider_id]", lambda p1, _: str(p1.id), 10, 5),
("filter[provider_type]", lambda *_: "aws", 10, 5),
("filter[provider_type__in]", lambda *_: "aws,gcp", 30, 20),
(
"filter[provider_groups]",
lambda p1, _, group1, __: str(group1.id),
10,
5,
),
(
"filter[provider_groups__in]",
lambda p1, _, group1, group2: f"{group1.id},{group2.id}",
30,
20,
),
],
)
def test_overview_categories_filters(
@@ -11597,7 +11356,6 @@ class TestOverviewViewSet:
authenticated_client,
tenants_fixture,
providers_fixture,
provider_groups_fixture,
create_scan_category_summary,
filter_key,
filter_value_fn,
@@ -11606,16 +11364,6 @@ class TestOverviewViewSet:
):
tenant = tenants_fixture[0]
provider1, _, gcp_provider, *_ = providers_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=gcp_provider, provider_group=group2
)
scan1 = Scan.objects.create(
name="categories-scan-1",
@@ -11641,7 +11389,7 @@ class TestOverviewViewSet:
response = authenticated_client.get(
reverse("overview-categories"),
{filter_key: filter_value_fn(provider1, gcp_provider, group1, group2)},
{filter_key: filter_value_fn(provider1, gcp_provider)},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
@@ -11815,22 +11563,10 @@ class TestOverviewViewSet:
@pytest.mark.parametrize(
"filter_key,filter_value_fn,expected_total,expected_failed",
[
("filter[provider_id]", lambda p1, *_: str(p1.id), 10, 5),
("filter[provider_id__in]", lambda p1, p2, *_: f"{p1.id},{p2.id}", 25, 12),
("filter[provider_type]", lambda *_: "aws", 10, 5),
("filter[provider_type__in]", lambda *_: "aws,gcp", 25, 12),
(
"filter[provider_groups]",
lambda p1, p2, group1, group2: str(group1.id),
10,
5,
),
(
"filter[provider_groups__in]",
lambda p1, p2, group1, group2: f"{group1.id},{group2.id}",
25,
12,
),
("filter[provider_id]", lambda p1, p2: str(p1.id), 10, 5),
("filter[provider_id__in]", lambda p1, p2: f"{p1.id},{p2.id}", 25, 12),
("filter[provider_type]", lambda p1, p2: "aws", 10, 5),
("filter[provider_type__in]", lambda p1, p2: "aws,gcp", 25, 12),
],
)
def test_overview_groups_provider_filters(
@@ -11838,7 +11574,6 @@ class TestOverviewViewSet:
authenticated_client,
tenants_fixture,
providers_fixture,
provider_groups_fixture,
create_scan_resource_group_summary,
filter_key,
filter_value_fn,
@@ -11848,16 +11583,6 @@ class TestOverviewViewSet:
tenant = tenants_fixture[0]
provider1 = providers_fixture[0] # AWS
gcp_provider = providers_fixture[2] # GCP
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=gcp_provider, provider_group=group2
)
scan1 = Scan.objects.create(
name="aws-rg-scan",
@@ -11883,7 +11608,7 @@ class TestOverviewViewSet:
response = authenticated_client.get(
reverse("overview-resource-groups"),
{filter_key: filter_value_fn(provider1, gcp_provider, group1, group2)},
{filter_key: filter_value_fn(provider1, gcp_provider)},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
@@ -12058,49 +11783,6 @@ class TestOverviewViewSet:
data = response.json()["data"]
assert len(data) >= 1
def test_compliance_watchlist_provider_groups_filter(
self,
authenticated_client,
provider_compliance_scores_fixture,
providers_fixture,
provider_groups_fixture,
tenants_fixture,
):
tenant = tenants_fixture[0]
provider1, provider2, *_ = providers_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider2, provider_group=group2
)
response = authenticated_client.get(
reverse("overview-compliance-watchlist"),
{"filter[provider_groups]": str(group1.id)},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
by_id = {item["id"]: item["attributes"] for item in data}
assert by_id["aws_cis_2.0"]["requirements_passed"] == 1
assert by_id["aws_cis_2.0"]["requirements_failed"] == 1
assert by_id["aws_cis_2.0"]["requirements_manual"] == 1
response = authenticated_client.get(
reverse("overview-compliance-watchlist"),
{"filter[provider_groups__in]": f"{group1.id},{group2.id}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
by_id = {item["id"]: item["attributes"] for item in data}
assert by_id["aws_cis_2.0"]["requirements_passed"] == 0
assert by_id["aws_cis_2.0"]["requirements_failed"] == 2
assert by_id["aws_cis_2.0"]["requirements_manual"] == 1
def test_compliance_watchlist_empty_result(self, authenticated_client):
response = authenticated_client.get(reverse("overview-compliance-watchlist"))
assert response.status_code == status.HTTP_200_OK
@@ -12235,9 +11917,9 @@ class TestIntegrationViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
@pytest.mark.parametrize(
"integration_type, configuration, credentials",
@@ -13674,9 +13356,9 @@ class TestLighthouseConfigViewSet:
)
# Check that API key is masked with asterisks only
masked_api_key = data["attributes"]["api_key"]
assert all(c == "*" for c in masked_api_key), (
"API key should contain only asterisks"
)
assert all(
c == "*" for c in masked_api_key
), "API key should contain only asterisks"
@pytest.mark.parametrize(
"field_name, invalid_value",
@@ -17253,44 +16935,6 @@ class TestFindingGroupViewSet:
# All fixture findings are from AWS provider
assert len(response.json()["data"]) == 5
def test_finding_groups_provider_groups_filter(
self,
authenticated_client,
tenants_fixture,
finding_groups_fixture,
providers_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
provider1, provider2, *_ = providers_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider2, provider_group=group2
)
response = authenticated_client.get(
reverse("finding-group-list"),
{"filter[inserted_at]": TODAY, "filter[provider_groups]": str(group1.id)},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 4
response = authenticated_client.get(
reverse("finding-group-list"),
{
"filter[inserted_at]": TODAY,
"filter[provider_groups__in]": f"{group1.id},{group2.id}",
},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 5
def test_finding_groups_check_id_filter(
self, authenticated_client, finding_groups_fixture
):
@@ -17465,9 +17109,9 @@ class TestFindingGroupViewSet:
assert len(data) == 2
for item in data:
resource = item["attributes"]["resource"]
assert resource["resource_group"] == "storage", (
"resource_group must be 'storage'"
)
assert (
resource["resource_group"] == "storage"
), "resource_group must be 'storage'"
def test_resources_name_icontains(
self, authenticated_client, finding_groups_fixture
@@ -17781,12 +17425,12 @@ class TestFindingGroupViewSet:
assert response_p1.status_code == status.HTTP_200_OK
p1_check_ids = {item["id"] for item in response_p1.json()["data"]}
# Provider1 has scan1 with 4 checks
assert len(p1_check_ids) == 4, (
f"Provider1 should have 4 checks, got {len(p1_check_ids)}"
)
assert "cloudtrail_enabled" not in p1_check_ids, (
"cloudtrail_enabled should NOT be in provider1"
)
assert (
len(p1_check_ids) == 4
), f"Provider1 should have 4 checks, got {len(p1_check_ids)}"
assert (
"cloudtrail_enabled" not in p1_check_ids
), "cloudtrail_enabled should NOT be in provider1"
# Get finding groups for provider2 only
response_p2 = authenticated_client.get(
@@ -17796,12 +17440,12 @@ class TestFindingGroupViewSet:
assert response_p2.status_code == status.HTTP_200_OK
p2_check_ids = {item["id"] for item in response_p2.json()["data"]}
# Provider2 has scan2 with 1 check
assert len(p2_check_ids) == 1, (
f"Provider2 should have 1 check, got {len(p2_check_ids)}"
)
assert "cloudtrail_enabled" in p2_check_ids, (
"cloudtrail_enabled should be in provider2"
)
assert (
len(p2_check_ids) == 1
), f"Provider2 should have 1 check, got {len(p2_check_ids)}"
assert (
"cloudtrail_enabled" in p2_check_ids
), "cloudtrail_enabled should be in provider2"
# Test provider_type filter actually filters data
def test_finding_groups_provider_type_filter_actually_filters(
@@ -17824,9 +17468,9 @@ class TestFindingGroupViewSet:
{"filter[inserted_at]": TODAY, "filter[provider_type]": "gcp"},
)
assert response_gcp.status_code == status.HTTP_200_OK
assert len(response_gcp.json()["data"]) == 0, (
"GCP filter should return 0 results"
)
assert (
len(response_gcp.json()["data"]) == 0
), "GCP filter should return 0 results"
def test_finding_groups_pagination(
self, authenticated_client, finding_groups_fixture
@@ -18201,41 +17845,6 @@ class TestFindingGroupViewSet:
# All providers in fixture are AWS
assert len(data) == 5
def test_finding_groups_latest_provider_groups_filter(
self,
authenticated_client,
tenants_fixture,
finding_groups_fixture,
providers_fixture,
provider_groups_fixture,
):
tenant = tenants_fixture[0]
provider1, provider2, *_ = providers_fixture
group1, group2, *_ = provider_groups_fixture
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group1
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider1, provider_group=group2
)
ProviderGroupMembership.objects.create(
tenant=tenant, provider=provider2, provider_group=group2
)
response = authenticated_client.get(
reverse("finding-group-latest"),
{"filter[provider_groups]": str(group1.id)},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 4
response = authenticated_client.get(
reverse("finding-group-latest"),
{"filter[provider_groups__in]": f"{group1.id},{group2.id}"},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) == 5
def test_finding_groups_latest_check_id_filter(
self, authenticated_client, finding_groups_fixture
):
+1 -26
View File
@@ -5509,14 +5509,6 @@ class OverviewViewSet(BaseRLSViewSet):
)
filters["provider__provider__in"] = types
provider_groups = params.get("filter[provider_groups]")
if provider_groups:
filters["provider__provider_groups__id"] = provider_groups
provider_groups_in = params.get("filter[provider_groups__in]")
if provider_groups_in:
filters["provider__provider_groups__id__in"] = provider_groups_in.split(",")
return filters
@action(detail=False, methods=["get"], url_name="providers")
@@ -5836,18 +5828,6 @@ class OverviewViewSet(BaseRLSViewSet):
location=OpenApiParameter.QUERY,
description="Filter by multiple provider types (comma-separated)",
),
OpenApiParameter(
name="provider_groups",
type=OpenApiTypes.UUID,
location=OpenApiParameter.QUERY,
description="Filter by provider group ID",
),
OpenApiParameter(
name="provider_groups__in",
type=OpenApiTypes.STR,
location=OpenApiParameter.QUERY,
description="Filter by multiple provider group IDs (comma-separated UUIDs)",
),
],
)
@action(detail=False, methods=["get"], url_name="threatscore")
@@ -6189,8 +6169,6 @@ class OverviewViewSet(BaseRLSViewSet):
"provider_id__in",
"provider_type",
"provider_type__in",
"provider_groups",
"provider_groups__in",
}
filtered_queryset = self._apply_filterset(
base_queryset, CategoryOverviewFilter, exclude_keys=provider_filter_keys
@@ -6260,8 +6238,6 @@ class OverviewViewSet(BaseRLSViewSet):
"provider_id__in",
"provider_type",
"provider_type__in",
"provider_groups",
"provider_groups__in",
}
filtered_queryset = self._apply_filterset(
base_queryset,
@@ -8508,10 +8484,9 @@ class FindingGroupViewSet(BaseRLSViewSet):
This endpoint returns finding groups without requiring date filters,
automatically using the latest available data per check_id.
Provider, provider group, check, and computed filters are still supported.
All other filters (provider_id, provider_type, check_id) are still supported.
""",
tags=["Finding Groups"],
filters=True,
)
@action(detail=False, methods=["get"], url_name="latest")
def latest(self, request):
Generated
+3 -3
View File
@@ -4416,7 +4416,7 @@ wheels = [
[[package]]
name = "prowler"
version = "5.30.0"
source = { git = "https://github.com/prowler-cloud/prowler.git?rev=master#f1d741214a60df17158c3fdc97804fd1fde64f3a" }
source = { git = "https://github.com/prowler-cloud/prowler.git?rev=v5.30#f1d741214a60df17158c3fdc97804fd1fde64f3a" }
dependencies = [
{ name = "alibabacloud-actiontrail20200706" },
{ name = "alibabacloud-credentials" },
@@ -4504,7 +4504,7 @@ dependencies = [
[[package]]
name = "prowler-api"
version = "1.32.0"
version = "1.31.1"
source = { virtual = "." }
dependencies = [
{ name = "cartography" },
@@ -4600,7 +4600,7 @@ requires-dist = [
{ name = "matplotlib", specifier = "==3.10.8" },
{ name = "neo4j", specifier = "==6.1.0" },
{ name = "openai", specifier = "==1.109.1" },
{ name = "prowler", git = "https://github.com/prowler-cloud/prowler.git?rev=master" },
{ name = "prowler", git = "https://github.com/prowler-cloud/prowler.git?rev=v5.30" },
{ name = "psycopg2-binary", specifier = "==2.9.9" },
{ name = "pytest-celery", extras = ["redis"], specifier = "==1.3.0" },
{ name = "reportlab", specifier = "==4.4.10" },
@@ -128,8 +128,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.30.0"
PROWLER_API_VERSION="5.30.0"
PROWLER_UI_VERSION="5.29.0"
PROWLER_API_VERSION="5.29.0"
```
<Note>
-13
View File
@@ -2,19 +2,6 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.31.0] (Prowler UNRELEASED)
### 🚀 Added
- `securityhub_delegated_admin_enabled_all_regions` check for AWS provider, verifying that Security Hub has a delegated administrator, is active in all opted-in regions, and has organization auto-enable on [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259)
- `config_delegated_admin_and_org_aggregator_all_regions` check for AWS provider, verifying that AWS Config has a delegated administrator and an organization aggregator covering all AWS regions [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259)
- `sagemaker_clarify_exists` check for AWS provider [(#11211)](https://github.com/prowler-cloud/prowler/pull/11211)
- `cloudsql_instance_high_availability_enabled` check for GCP provider, verifying Cloud SQL primary instances use `REGIONAL` availability for automatic zone failover [(#11024)](https://github.com/prowler-cloud/prowler/pull/11024)
- `identity_storage_service_level_admins_scoped` check for OCI provider CIS 3.1 control 1.15, ensuring storage service-level administrators exclude delete permissions [(#11523)](https://github.com/prowler-cloud/prowler/pull/11523)
- `cosmosdb_account_automatic_failover_enabled` check for Azure provider [(#11031)](https://github.com/prowler-cloud/prowler/pull/11031)
---
## [5.30.0] (Prowler v5.30.0)
### 🚀 Added
@@ -1293,8 +1293,7 @@
"storage_ensure_private_endpoints_in_storage_accounts",
"storage_secure_transfer_required_is_enabled",
"vm_ensure_using_managed_disks",
"vm_trusted_launch_enabled",
"cosmosdb_account_automatic_failover_enabled"
"vm_trusted_launch_enabled"
]
},
{
+1 -2
View File
@@ -1087,8 +1087,7 @@
"storage_blob_versioning_is_enabled",
"storage_geo_redundant_enabled",
"vm_scaleset_associated_with_load_balancer",
"vm_scaleset_not_empty",
"cosmosdb_account_automatic_failover_enabled"
"vm_scaleset_not_empty"
],
"gcp": [
"compute_instance_automatic_restart_enabled",
@@ -302,9 +302,7 @@
{
"Id": "1.15",
"Description": "Ensure storage service-level admins cannot delete resources they manage",
"Checks": [
"identity_storage_service_level_admins_scoped"
],
"Checks": [],
"Attributes": [
{
"Section": "1. Identity and Access Management",
+1 -1
View File
@@ -49,7 +49,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.31.0"
prowler_version = "5.30.1"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
+9 -10
View File
@@ -58,17 +58,16 @@ def print_prowler_cloud_banner(provider: str = None):
bar = f"{banner_color}{Style.RESET_ALL}"
print(
f"""
{bar} {Style.BRIGHT}You're getting a snapshot 📸. Prowler Cloud gives you the full picture:{Style.RESET_ALL}
{bar} {Style.BRIGHT}You're getting a snapshot. Prowler Cloud gives you the full picture.{Style.RESET_ALL}
{bar}
{bar} {check} {Style.BRIGHT}Continuous Security Monitoring{Style.RESET_ALL} - scheduled scans with history, trends and alerts.
{bar} {check} {Style.BRIGHT}Lighthouse AI + MCP{Style.RESET_ALL} - autonomous triage, custom dashboards, prioritization with prevention and remediation.
{bar} {check} {Style.BRIGHT}Alerts{Style.RESET_ALL} - get notified when anything you want is happening.
{bar} {check} {Style.BRIGHT}Live Compliance{Style.RESET_ALL} - dashboards for 50+ frameworks, always up to date.
{bar} {check} {Style.BRIGHT}Remediation{Style.RESET_ALL} - complete guided remediation including Autonomous remediation with Lighthouse AI.
{bar} {check} {Style.BRIGHT}Attack Path Visualization{Style.RESET_ALL} - see how attackers chain risks to reach your crown jewels.
{bar} {check} {Style.BRIGHT}Bulk Provisioning{Style.RESET_ALL} - add your entire AWS Organization in seconds.
{bar} {check} {Style.BRIGHT}Integrations{Style.RESET_ALL} - Anything with our MCP + Jira, Slack, AWS Security Hub, Amazon S3, SSO and RBAC.
{bar} {check} {Style.BRIGHT}Attack Path Visualization{Style.RESET_ALL} - see how attackers chain risks to reach your crown jewels
{bar} {check} {Style.BRIGHT}Lighthouse AI + MCP{Style.RESET_ALL} - autonomous triage, prioritization and remediation
{bar} {check} {Style.BRIGHT}Organizations{Style.RESET_ALL} - all your AWS accounts under one organization
{bar} {check} {Style.BRIGHT}Continuous scanning{Style.RESET_ALL} - scheduled scans with history, trends and alerts
{bar} {check} {Style.BRIGHT}Integrations{Style.RESET_ALL} - Jira, Slack, AWS Security Hub, Amazon S3, SSO and RBAC
{bar} {check} {Style.BRIGHT}Reports{Style.RESET_ALL} - download ready-to-share PDF reports
{bar} {check} {Style.BRIGHT}Live compliance{Style.RESET_ALL} - dashboards for 50+ frameworks, always up to date
{bar}
{bar} {Fore.BLUE}Start free at 👉 cloud.prowler.com{Style.RESET_ALL}
{bar} {Fore.BLUE}Start free at cloud.prowler.com{Style.RESET_ALL}
"""
)
@@ -2582,7 +2582,6 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -2592,9 +2591,6 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -2608,7 +2604,6 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -7349,7 +7344,6 @@
"lightsail": {
"regions": {
"aws": [
"ap-east-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-south-1",
@@ -7360,11 +7354,9 @@
"ca-central-1",
"eu-central-1",
"eu-north-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -8277,9 +8269,7 @@
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -9230,7 +9220,6 @@
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -9997,8 +9986,6 @@
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-south-2",
@@ -1,44 +0,0 @@
{
"Provider": "aws",
"CheckID": "config_delegated_admin_and_org_aggregator_all_regions",
"CheckTitle": "AWS Config has a delegated administrator and an organization aggregator covering all AWS regions",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "config",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "AwsConfigConfigurationAggregator",
"ResourceGroup": "governance",
"Description": "**AWS Config** has a delegated administrator registered via AWS Organizations and at least one Configuration Aggregator with an OrganizationAggregationSource that covers all AWS regions, ensuring centralized org-wide configuration visibility.",
"Risk": "Without an org-wide **AWS Config** aggregator and a delegated administrator, configuration data is fragmented across accounts and regions, **compliance reporting** is incomplete, and **drift detection** is delayed. Adversaries or misconfigurations can persist in unmonitored accounts, eroding **audit readiness** and **regulatory posture**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/config/latest/developerguide/aggregate-data.html",
"https://docs.aws.amazon.com/config/latest/developerguide/set-up-aggregator-cli.html",
"https://docs.aws.amazon.com/organizations/latest/userguide/services-that-can-integrate-config.html"
],
"Remediation": {
"Code": {
"CLI": "aws organizations register-delegated-administrator --account-id <ADMIN_ACCOUNT_ID> --service-principal config.amazonaws.com && aws configservice put-configuration-aggregator --configuration-aggregator-name org-aggregator --organization-aggregation-source RoleArn=<ROLE_ARN>,AllAwsRegions=true",
"NativeIaC": "",
"Other": "1. From the AWS Organizations management account, register the delegated administrator for config.amazonaws.com\n2. In the delegated admin account, open AWS Config\n3. Create a Configuration Aggregator and select Add my organization as the source\n4. Enable Include all AWS Regions\n5. Confirm an IAM role with AWSConfigRoleForOrganizations is attached\n6. Verify the aggregator status reaches SUCCEEDED for all member accounts",
"Terraform": ""
},
"Recommendation": {
"Text": "Register a **delegated administrator** for AWS Config via AWS Organizations and create at least one **Configuration Aggregator** with an OrganizationAggregationSource that covers **all AWS regions**. This centralizes configuration data across the organization for unified compliance and audit reporting.",
"Url": "https://hub.prowler.com/check/config_delegated_admin_and_org_aggregator_all_regions"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [
"config_recorder_all_regions_enabled",
"guardduty_delegated_admin_enabled_all_regions"
],
"Notes": "This check requires execution from the organization management account or delegated administrator account to access organization-level APIs."
}
@@ -1,115 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.config.config_client import config_client
from prowler.providers.aws.services.config.config_service import Aggregator
class config_delegated_admin_and_org_aggregator_all_regions(Check):
"""Ensure AWS Config has a delegated admin and an org aggregator covering all regions.
This check verifies that:
1. A delegated administrator is registered for the config.amazonaws.com
service principal via AWS Organizations.
2. At least one AWS Config Configuration Aggregator exists with an
OrganizationAggregationSource that covers all AWS regions
(AllAwsRegions=true).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check. One finding per
aggregator-region, or a single synthetic FAIL when no aggregators
exist in any region.
"""
findings = []
has_delegated_admin = (
bool(config_client.delegated_administrators)
and not config_client.delegated_administrators_lookup_failed
)
delegated_admin_unknown = config_client.delegated_administrators_lookup_failed
# No aggregators in any region: emit one synthetic FAIL anchored to the
# audited account in the default region.
if not config_client.aggregators:
synthetic = Aggregator(
name="unknown",
arn=config_client.get_unknown_arn(
region=config_client.region,
resource_type="config-aggregator",
),
region=config_client.region,
all_aws_regions=False,
aws_regions=None,
organization_aggregation_source_present=False,
)
report = Check_Report_AWS(metadata=self.metadata(), resource=synthetic)
if delegated_admin_unknown:
delegated_state = (
"delegated administrator status could not be determined"
)
elif has_delegated_admin:
delegated_state = "delegated administrator configured"
else:
delegated_state = (
"no delegated administrator registered for config.amazonaws.com"
)
report.status = "FAIL"
report.status_extended = (
f"AWS Config has no Organization Aggregator configured in any "
f"region ({delegated_state})."
)
findings.append(report)
return findings
for region, aggregators_in_region in config_client.aggregators.items():
for aggregator in aggregators_in_region:
report = Check_Report_AWS(metadata=self.metadata(), resource=aggregator)
org_aware = aggregator.organization_aggregation_source_present
covers_all = aggregator.all_aws_regions
issues = []
if delegated_admin_unknown:
issues.append(
"delegated administrator status for config.amazonaws.com "
"could not be determined"
)
elif not has_delegated_admin:
issues.append(
"no delegated administrator registered for config.amazonaws.com"
)
if not org_aware:
issues.append(
f"aggregator {aggregator.name} is not an organization aggregator"
)
elif not covers_all:
issues.append(
f"aggregator {aggregator.name} does not cover all AWS regions"
)
if issues:
report.status = "FAIL"
report.status_extended = (
f"AWS Config aggregator {aggregator.name} in region "
f"{region} has issues: {', '.join(issues)}."
)
else:
report.status = "PASS"
report.status_extended = (
f"AWS Config aggregator {aggregator.name} in region "
f"{region} is an organization aggregator covering all "
f"AWS regions with delegated admin configured."
)
# Support muting non-default regions if configured
if report.status == "FAIL" and (
config_client.audit_config.get("mute_non_default_regions", False)
and region != config_client.region
):
report.muted = True
findings.append(report)
return findings
@@ -1,6 +1,5 @@
from typing import Optional
from botocore.client import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
@@ -13,16 +12,10 @@ class Config(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.recorders = {}
self.aggregators: dict[str, list] = {}
self.delegated_administrators: list = []
self.delegated_administrators_lookup_failed: bool = False
self.__threading_call__(self.describe_configuration_recorders)
self.__threading_call__(
self._describe_configuration_recorder_status, self.recorders.values()
)
self.__threading_call__(self._describe_configuration_aggregators)
# Organizations API is not regional; single call.
self._list_config_delegated_administrators()
def _get_recorder_arn_template(self, region):
return f"arn:{self.audited_partition}:config:{region}:{self.audited_account}:recorder"
@@ -80,108 +73,6 @@ class Config(AWSService):
f"{recorder.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_configuration_aggregators(self, regional_client):
"""Describe AWS Config configuration aggregators per region.
An aggregator counts as organization-aware when its
OrganizationAggregationSource key is present in the response.
"""
logger.info("Config - Describing Configuration Aggregators...")
try:
paginator = regional_client.get_paginator(
"describe_configuration_aggregators"
)
region_aggregators: list = []
for page in paginator.paginate():
for aggregator in page.get("ConfigurationAggregators", []):
name = aggregator.get("ConfigurationAggregatorName", "")
arn = aggregator.get("ConfigurationAggregatorArn", "")
org_source = aggregator.get("OrganizationAggregationSource")
org_aware = org_source is not None
all_aws_regions = False
aws_regions: Optional[list] = None
if org_aware:
all_aws_regions = org_source.get("AllAwsRegions", False)
aws_regions = org_source.get("AwsRegions")
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
region_aggregators.append(
Aggregator(
name=name,
arn=arn,
region=regional_client.region,
all_aws_regions=all_aws_regions,
aws_regions=aws_regions,
organization_aggregation_source_present=org_aware,
)
)
if region_aggregators:
self.aggregators[regional_client.region] = region_aggregators
except ClientError as error:
if error.response["Error"]["Code"] in (
"AccessDeniedException",
"AccessDenied",
):
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_config_delegated_administrators(self):
"""List delegated administrators for the AWS Config service principal.
Uses the Organizations API directly (not regional). Sets
delegated_administrators_lookup_failed to True on AccessDenied so callers
can surface the unknown delegated-admin state in findings.
"""
logger.info(
"Config - Listing delegated administrators for config.amazonaws.com..."
)
try:
org_client = self.session.client("organizations")
paginator = org_client.get_paginator("list_delegated_administrators")
for page in paginator.paginate(ServicePrincipal="config.amazonaws.com"):
for admin in page.get("DelegatedAdministrators", []):
self.delegated_administrators.append(
ConfigDelegatedAdministrator(
id=admin.get("Id", ""),
arn=admin.get("Arn", ""),
name=admin.get("Name", ""),
email=admin.get("Email", ""),
status=admin.get("Status", ""),
joined_method=admin.get("JoinedMethod", ""),
)
)
except ClientError as error:
error_code = error.response["Error"]["Code"]
if error_code in (
"AccessDeniedException",
"AccessDenied",
"AWSOrganizationsNotInUseException",
):
self.delegated_administrators_lookup_failed = True
logger.warning(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
self.delegated_administrators_lookup_failed = True
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self.delegated_administrators_lookup_failed = True
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Recorder(BaseModel):
name: str
@@ -189,25 +80,3 @@ class Recorder(BaseModel):
recording: Optional[bool]
last_status: Optional[str]
region: str
class Aggregator(BaseModel):
"""Represents an AWS Config Configuration Aggregator."""
name: str
arn: str
region: str
all_aws_regions: bool = False
aws_regions: Optional[list] = None
organization_aggregation_source_present: bool = False
class ConfigDelegatedAdministrator(BaseModel):
"""Represents a delegated administrator registered for config.amazonaws.com."""
id: str
arn: str
name: str
email: str
status: str
joined_method: str
@@ -1,39 +0,0 @@
{
"Provider": "aws",
"CheckID": "sagemaker_clarify_exists",
"CheckTitle": "Amazon SageMaker Clarify processing jobs exist in the region",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "sagemaker",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Other",
"ResourceGroup": "ai_ml",
"Description": "**SageMaker Clarify** provides bias detection and model explainability for ML workloads.\n\nThis check verifies that at least one SageMaker processing job using the AWS-managed Clarify container image exists in each successfully scanned region. The absence of Clarify jobs indicates that responsible-AI controls such as bias detection and explainability are not in place.",
"Risk": "Without **SageMaker Clarify** processing jobs, ML models may be deployed without bias analysis or explainability reports. This can lead to:\n- **Regulatory non-compliance** with AI governance frameworks\n- **Undetected bias** in model predictions affecting protected groups\n- **Lack of accountability** for ML model decisions in production",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html",
"https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/sagemaker-algo-docker-registry-paths.html"
],
"Remediation": {
"Code": {
"CLI": "aws sagemaker create-processing-job --processing-job-name clarify-bias-check --app-specification ImageUri=<clarify-image-uri> --role-arn <role-arn> --processing-resources 'ClusterConfig={InstanceCount=1,InstanceType=ml.m5.xlarge,VolumeSizeInGB=20}'",
"NativeIaC": "",
"Other": "1. Open the AWS Console and go to Amazon SageMaker\n2. Navigate to Processing > Processing jobs\n3. Click Create processing job\n4. Select the SageMaker Clarify container image for your region\n5. Configure input/output paths and the analysis configuration\n6. Click Create processing job",
"Terraform": ""
},
"Recommendation": {
"Text": "Create SageMaker Clarify processing jobs to evaluate models for bias and explainability before deployment. Integrate Clarify into your ML pipeline to ensure responsible AI practices.",
"Url": "https://hub.prowler.com/check/sagemaker_clarify_exists"
}
},
"Categories": [
"gen-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Results are generated per scanned region. Regions where `ListProcessingJobs` cannot be queried are omitted from the findings."
}
@@ -1,54 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.sagemaker.sagemaker_client import sagemaker_client
class sagemaker_clarify_exists(Check):
"""Check whether at least one SageMaker Clarify processing job exists per region.
A region is reported only when ListProcessingJobs succeeded for it; regions
where the API call failed (e.g. AccessDenied, unsupported region) are
skipped at the service layer and produce no finding.
- PASS: At least one processing job uses the AWS-managed Clarify container
image in the region (one finding per job).
- FAIL: No processing job uses the Clarify container image in the region
(one finding per region).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the SageMaker Clarify exists check.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for region in sorted(sagemaker_client.processing_jobs_scanned_regions):
clarify_jobs = sorted(
(
job
for job in sagemaker_client.sagemaker_processing_jobs
if job.region == region
and job.image_uri
and "sagemaker-clarify-processing" in job.image_uri
),
key=lambda job: job.name,
)
if clarify_jobs:
for job in clarify_jobs:
report = Check_Report_AWS(metadata=self.metadata(), resource=job)
report.status = "PASS"
report.status_extended = f"SageMaker Clarify processing job {job.name} exists in region {region}."
findings.append(report)
else:
report = Check_Report_AWS(metadata=self.metadata(), resource={})
report.region = region
report.resource_id = "sagemaker-clarify"
report.resource_arn = f"arn:{sagemaker_client.audited_partition}:sagemaker:{region}:{sagemaker_client.audited_account}:processing-job"
report.status = "FAIL"
report.status_extended = (
f"No SageMaker Clarify processing jobs found in region {region}."
)
findings.append(report)
return findings
@@ -15,8 +15,6 @@ class SageMaker(AWSService):
self.sagemaker_notebook_instances = []
self.sagemaker_models = []
self.sagemaker_training_jobs = []
self.sagemaker_processing_jobs = []
self.processing_jobs_scanned_regions = set()
self.sagemaker_domains = []
self.endpoint_configs = {}
self.sagemaker_model_registries = []
@@ -26,7 +24,6 @@ class SageMaker(AWSService):
self.__threading_call__(self._list_notebook_instances)
self.__threading_call__(self._list_models)
self.__threading_call__(self._list_training_jobs)
self.__threading_call__(self._list_processing_jobs)
self.__threading_call__(self._list_endpoint_configs)
self.__threading_call__(self._list_domains)
self.__threading_call__(self._list_model_package_groups)
@@ -40,9 +37,6 @@ class SageMaker(AWSService):
self.__threading_call__(
self._describe_training_job, self.sagemaker_training_jobs
)
self.__threading_call__(
self._describe_processing_job, self.sagemaker_processing_jobs
)
self.__threading_call__(
self._describe_endpoint_config, list(self.endpoint_configs.values())
)
@@ -57,9 +51,6 @@ class SageMaker(AWSService):
self.__threading_call__(
self._list_tags_for_resource, self.sagemaker_training_jobs
)
self.__threading_call__(
self._list_tags_for_resource, self.sagemaker_processing_jobs
)
self.__threading_call__(
self._list_tags_for_resource, list(self.endpoint_configs.values())
)
@@ -137,66 +128,6 @@ class SageMaker(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_processing_jobs(self, regional_client):
"""List SageMaker processing jobs in a region.
Populates ``self.sagemaker_processing_jobs`` with `ProcessingJob`
entries and adds ``regional_client.region`` to
``self.processing_jobs_scanned_regions`` once pagination succeeds, so
regions where ``ListProcessingJobs`` fails are skipped by checks that
consume that set.
Args:
regional_client: Regional SageMaker boto3 client.
"""
logger.info("SageMaker - listing processing jobs...")
try:
list_processing_jobs_paginator = regional_client.get_paginator(
"list_processing_jobs"
)
for page in list_processing_jobs_paginator.paginate():
for processing_job in page["ProcessingJobSummaries"]:
if not self.audit_resources or (
is_resource_filtered(
processing_job["ProcessingJobArn"], self.audit_resources
)
):
self.sagemaker_processing_jobs.append(
ProcessingJob(
name=processing_job["ProcessingJobName"],
region=regional_client.region,
arn=processing_job["ProcessingJobArn"],
)
)
self.processing_jobs_scanned_regions.add(regional_client.region)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_processing_job(self, processing_job):
"""Describe a SageMaker processing job and enrich its image metadata.
Reads ``AppSpecification.ImageUri`` from ``DescribeProcessingJob`` and
stores it on ``processing_job.image_uri``. Errors are logged and
swallowed so a failure in one job does not abort the scan.
Args:
processing_job: ProcessingJob model to enrich in-place.
"""
logger.info("SageMaker - describing processing job...")
try:
regional_client = self.regional_clients[processing_job.region]
describe_processing_job = regional_client.describe_processing_job(
ProcessingJobName=processing_job.name
)
app_spec = describe_processing_job.get("AppSpecification", {})
processing_job.image_uri = app_spec.get("ImageUri")
except Exception as error:
logger.error(
f"{processing_job.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_notebook_instance(self, notebook_instance):
logger.info("SageMaker - describing notebook instances...")
try:
@@ -520,25 +451,6 @@ class TrainingJob(BaseModel):
tags: Optional[list] = []
class ProcessingJob(BaseModel):
"""Represents a SageMaker processing job.
Attributes:
name: Processing job name.
region: AWS region where the job lives.
arn: Processing job ARN.
image_uri: Container image URI from `AppSpecification.ImageUri`,
populated by `_describe_processing_job`.
tags: Resource tags, populated by `_list_tags_for_resource`.
"""
name: str
region: str
arn: str
image_uri: Optional[str] = None
tags: Optional[list] = []
class ProductionVariant(BaseModel):
name: str
initial_instance_count: int
@@ -1,44 +0,0 @@
{
"Provider": "aws",
"CheckID": "securityhub_delegated_admin_enabled_all_regions",
"CheckTitle": "Security Hub has delegated admin configured and is enabled in all regions with organization auto-enable",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "securityhub",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "AwsSecurityHubHub",
"ResourceGroup": "security",
"Description": "**AWS Security Hub** has a delegated administrator configured at the organization level, hubs are active in all opted-in regions, and organization auto-enable is active so that new member accounts are automatically enrolled.",
"Risk": "Without org-wide **AWS Security Hub** configuration, findings can be aggregated inconsistently, delegated admin may be missing in some regions, and new accounts will not be auto-enrolled. This fragments **security posture visibility**, delays **incident response**, and lets misconfigurations and compliance drift go undetected across the organization.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/securityhub/latest/userguide/designate-orgs-admin-account.html",
"https://docs.aws.amazon.com/securityhub/latest/userguide/accounts-orgs-auto-enable.html",
"https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-regions.html"
],
"Remediation": {
"Code": {
"CLI": "aws securityhub enable-organization-admin-account --admin-account-id <ADMIN_ACCOUNT_ID> && aws securityhub update-organization-configuration --auto-enable --auto-enable-standards DEFAULT",
"NativeIaC": "",
"Other": "1. Sign in to the AWS Organizations management account\n2. Open the AWS Organizations console\n3. Navigate to Services > AWS Security Hub\n4. Click Register delegated administrator and enter the security account ID\n5. Switch to the delegated admin account\n6. In Security Hub console, go to Settings > Accounts\n7. Enable auto-enable for new organization accounts\n8. Repeat hub enablement for all opted-in regions",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure a **delegated administrator** for AWS Security Hub via AWS Organizations. Enable Security Hub in **all opted-in regions** and turn on **auto-enable** so new member accounts are automatically enrolled. This ensures uniform security posture monitoring across the entire organization.",
"Url": "https://hub.prowler.com/check/securityhub_delegated_admin_enabled_all_regions"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [
"securityhub_enabled",
"guardduty_delegated_admin_enabled_all_regions"
],
"Notes": "This check requires execution from the organization management account or delegated administrator account to access organization-level APIs."
}
@@ -1,84 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.securityhub.securityhub_client import (
securityhub_client,
)
class securityhub_delegated_admin_enabled_all_regions(Check):
"""Ensure Security Hub has a delegated admin and is enabled in all regions.
This check verifies that:
1. A delegated administrator account is configured for Security Hub
2. Security Hub is active (ACTIVE status) in each region
3. Organization auto-enable is configured for new member accounts
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check for each region.
"""
findings = []
# Build a set of regions that have an organization admin account configured
regions_with_admin = {
admin.region
for admin in securityhub_client.organization_admin_accounts
if admin.admin_status == "ENABLED"
}
admin_lookup_failed = securityhub_client.organization_admin_lookup_failed
for securityhub in securityhub_client.securityhubs:
report = Check_Report_AWS(metadata=self.metadata(), resource=securityhub)
# Check if this region has a delegated admin
has_delegated_admin = securityhub.region in regions_with_admin
# Check if hub is active
hub_active = securityhub.status == "ACTIVE"
# Check if auto-enable is configured for organization members
auto_enable_on = securityhub.organization_auto_enable
# Determine overall status
issues = []
if admin_lookup_failed:
issues.append("delegated administrator status could not be determined")
elif not has_delegated_admin:
issues.append("no delegated administrator configured")
if not hub_active:
issues.append("Security Hub not enabled")
if (
hub_active
and securityhub.organization_config_available
and not auto_enable_on
):
# Only report auto-enable issue if hub is active and org config data
# is available (i.e., we could actually read AutoEnable from the API).
issues.append("organization auto-enable not configured")
if issues:
report.status = "FAIL"
report.status_extended = (
f"Security Hub in region {securityhub.region} has issues: "
f"{', '.join(issues)}."
)
else:
report.status = "PASS"
report.status_extended = (
f"Security Hub in region {securityhub.region} has delegated "
f"admin configured with hub active and organization auto-enable "
f"enabled."
)
# Support muting non-default regions if configured
if report.status == "FAIL" and (
securityhub_client.audit_config.get("mute_non_default_regions", False)
and securityhub.region != securityhub_client.region
):
report.muted = True
findings.append(report)
return findings
@@ -13,14 +13,8 @@ class SecurityHub(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.securityhubs = []
self.organization_admin_accounts = []
self.organization_admin_lookup_failed: bool = False
self.__threading_call__(self._describe_hub)
self.__threading_call__(self._list_tags, self.securityhubs)
self.__threading_call__(self._list_organization_admin_accounts)
self.__threading_call__(
self._describe_organization_configuration, self.securityhubs
)
def _describe_hub(self, regional_client):
logger.info("SecurityHub - Describing Hub...")
@@ -110,95 +104,6 @@ class SecurityHub(AWSService):
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_organization_admin_accounts(self, regional_client):
"""List Security Hub delegated administrator accounts for the organization.
This API is only available to the organization management account or
a delegated administrator account.
"""
logger.info("SecurityHub - listing organization admin accounts...")
try:
paginator = regional_client.get_paginator(
"list_organization_admin_accounts"
)
for page in paginator.paginate():
for admin in page.get("AdminAccounts", []):
admin_account = OrganizationAdminAccount(
admin_account_id=admin.get("AdminAccountId"),
admin_status=admin.get("AdminStatus"),
region=regional_client.region,
)
# Avoid duplicates across regions for the same admin account
if not any(
existing.admin_account_id == admin_account.admin_account_id
and existing.region == admin_account.region
for existing in self.organization_admin_accounts
):
self.organization_admin_accounts.append(admin_account)
except ClientError as error:
self.organization_admin_lookup_failed = True
if error.response["Error"]["Code"] in (
"AccessDeniedException",
"InvalidAccessException",
"BadRequestException",
):
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self.organization_admin_lookup_failed = True
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_organization_configuration(self, securityhub):
"""Describe the organization configuration for a Security Hub instance.
This provides information about auto-enable settings for the organization.
Only invoked for hubs in ACTIVE status.
"""
logger.info("SecurityHub - describing organization configuration...")
try:
if securityhub.status != "ACTIVE":
return
regional_client = self.regional_clients[securityhub.region]
org_config = regional_client.describe_organization_configuration()
securityhub.organization_auto_enable = org_config.get("AutoEnable", False)
securityhub.auto_enable_standards = org_config.get(
"AutoEnableStandards", "NONE"
)
securityhub.organization_config_available = True
except ClientError as error:
if error.response["Error"]["Code"] in (
"AccessDeniedException",
"InvalidAccessException",
"BadRequestException",
):
# Expected when not running from management or delegated admin account
logger.warning(
f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class OrganizationAdminAccount(BaseModel):
"""Represents a Security Hub delegated administrator account."""
admin_account_id: str
admin_status: str # ENABLED or DISABLE_IN_PROGRESS
region: str
class SecurityHubHub(BaseModel):
arn: str
@@ -207,8 +112,4 @@ class SecurityHubHub(BaseModel):
standards: str
integrations: str
region: str
tags: Optional[list] = []
# Organization configuration fields
organization_auto_enable: bool = False
auto_enable_standards: str = "NONE"
organization_config_available: bool = False
tags: Optional[list]
@@ -1,38 +0,0 @@
{
"Provider": "azure",
"CheckID": "cosmosdb_account_automatic_failover_enabled",
"CheckTitle": "Cosmos DB account has automatic failover enabled",
"CheckType": [],
"ServiceName": "cosmosdb",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "microsoft.documentdb/databaseaccounts",
"ResourceGroup": "database",
"Description": "**Azure Cosmos DB accounts** are evaluated for **automatic failover** configuration. When enabled, Cosmos DB automatically promotes a secondary region to primary during a regional outage, ensuring continuous availability without manual intervention.",
"Risk": "Without **automatic failover**, a regional outage requires **manual failover** which delays recovery and risks data unavailability. Applications dependent on the primary region experience downtime until an operator intervenes.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/azure/cosmos-db/how-to-manage-database-account#automatic-failover",
"https://learn.microsoft.com/en-us/azure/cosmos-db/high-availability",
"https://learn.microsoft.com/en-us/azure/cosmos-db/distribute-data-globally"
],
"Remediation": {
"Code": {
"CLI": "az cosmosdb update --name <COSMOS_ACCOUNT_NAME> --resource-group <RESOURCE_GROUP> --enable-automatic-failover true",
"NativeIaC": "```bicep\n// Bicep: Enable automatic failover on a Cosmos DB account\nresource account 'Microsoft.DocumentDB/databaseAccounts@2025-10-15' = {\n name: '<example_resource_name>'\n location: resourceGroup().location\n kind: 'GlobalDocumentDB'\n properties: {\n databaseAccountOfferType: 'Standard'\n locations: [\n { locationName: '<primary_region>', failoverPriority: 0 }\n { locationName: '<secondary_region>', failoverPriority: 1 }\n ]\n enableAutomaticFailover: true // Critical: Promotes a secondary region during a primary region outage\n }\n}\n```",
"Other": "1. Sign in to the Azure portal and open your Cosmos DB account\n2. In the left menu, select Replicate data globally\n3. Click Automatic Failover\n4. Toggle Enable Automatic Failover to On\n5. Set failover priorities for each region\n6. Click Save",
"Terraform": "```hcl\n# Terraform: Enable automatic failover on a Cosmos DB account\nresource \"azurerm_cosmosdb_account\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n resource_group_name = \"<example_resource_name>\"\n location = \"<primary_region>\"\n offer_type = \"Standard\"\n kind = \"GlobalDocumentDB\"\n\n geo_location {\n location = \"<primary_region>\"\n failover_priority = 0\n }\n\n geo_location {\n location = \"<secondary_region>\"\n failover_priority = 1\n }\n\n enable_automatic_failover = true # Critical: Promotes a secondary region during a primary region outage\n}\n```"
},
"Recommendation": {
"Text": "Enable **automatic failover** on Cosmos DB accounts with **multi-region** deployments so a secondary region is promoted automatically when the primary region becomes unavailable. Configure **failover priorities** to reflect your recovery strategy, validate **RTO/RPO** expectations with periodic failover drills, and combine with **multi-region writes** where active-active is required.",
"Url": "https://hub.prowler.com/check/cosmosdb_account_automatic_failover_enabled"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -1,29 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.cosmosdb.cosmosdb_client import cosmosdb_client
class cosmosdb_account_automatic_failover_enabled(Check):
"""Ensure that Cosmos DB accounts have automatic failover enabled."""
def execute(self) -> Check_Report_Azure:
"""Execute the Cosmos DB automatic failover check.
Iterates over every Cosmos DB account fetched by the service and reports
PASS when `enableAutomaticFailover` is True, FAIL otherwise.
Returns:
A list of Check_Report_Azure with one report per Cosmos DB account.
"""
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} does not have automatic failover enabled."
if account.enable_automatic_failover:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has automatic failover enabled."
findings.append(report)
return findings
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Optional
from typing import List
from azure.mgmt.cosmosdb import CosmosDBManagementClient
@@ -36,29 +36,14 @@ class CosmosDB(AzureService):
name=private_endpoint_connection.name,
type=private_endpoint_connection.type,
)
for private_endpoint_connection in (
getattr(account, "private_endpoint_connections", [])
or []
for private_endpoint_connection in getattr(
account, "private_endpoint_connections", []
)
if private_endpoint_connection
],
disable_local_auth=getattr(
account, "disable_local_auth", False
),
enable_automatic_failover=getattr(
account, "enable_automatic_failover", False
),
backup_policy_type=getattr(
getattr(account, "backup_policy", None),
"type",
None,
),
public_network_access=getattr(
account, "public_network_access", None
),
minimal_tls_version=getattr(
account, "minimal_tls_version", None
),
)
)
except Exception as error:
@@ -86,7 +71,3 @@ class Account:
location: str
private_endpoint_connections: List[PrivateEndpointConnection]
disable_local_auth: bool = False
enable_automatic_failover: bool = False
backup_policy_type: Optional[str] = None
public_network_access: Optional[str] = None
minimal_tls_version: Optional[str] = None
@@ -1,38 +0,0 @@
{
"Provider": "gcp",
"CheckID": "cloudsql_instance_high_availability_enabled",
"CheckTitle": "Cloud SQL instance has high availability (REGIONAL) configured",
"CheckType": [],
"ServiceName": "cloudsql",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "sqladmin.googleapis.com/Instance",
"Description": "Ensures that Cloud SQL instances have high availability configured by setting availabilityType to REGIONAL. A REGIONAL instance maintains a standby replica in a different zone within the same region and automatically fails over on zone-level outages.",
"Risk": "Instances with ZONAL availability have no standby replica. A zone-level outage will cause database downtime until manual recovery, violating availability requirements and potentially breaching SLAs and ISMS-P 2.12.1 disaster preparedness controls.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://cloud.google.com/sql/docs/postgres/high-availability",
"https://cloud.google.com/sql/docs/sqlserver/high-availability"
],
"Remediation": {
"Code": {
"CLI": "gcloud sql instances patch <INSTANCE_NAME> --availability-type=REGIONAL",
"NativeIaC": "",
"Other": "1. Go to Google Cloud Console > SQL > Instances.\n2. Click the instance name, then Edit.\n3. Under Availability, select Multiple zones (Highly available).\n4. Click Save.",
"Terraform": "```hcl\nresource \"google_sql_database_instance\" \"example\" {\n name = \"<instance_name>\"\n database_version = \"POSTGRES_15\"\n region = \"<region>\"\n\n settings {\n tier = \"db-custom-2-7680\"\n\n availability_type = \"REGIONAL\" # Critical: enables HA standby replica\n\n backup_configuration {\n enabled = true\n start_time = \"02:00\"\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Set availabilityType to REGIONAL for all production Cloud SQL instances. This creates a standby replica in a different zone and enables automatic failover, reducing RTO in the event of a zone outage.",
"Url": "https://hub.prowler.com/check/cloudsql_instance_high_availability_enabled"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [
"cloudsql_instance_automated_backups"
],
"Notes": "Enabling HA increases instance cost approximately 2x due to the standby replica. ZONAL instances are acceptable for non-production workloads where downtime is tolerable."
}
@@ -1,41 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.cloudsql.cloudsql_client import cloudsql_client
class cloudsql_instance_high_availability_enabled(Check):
"""Check that Cloud SQL primary instances are configured for high availability.
Verifies that each Cloud SQL primary instance has `availabilityType` set to
`REGIONAL`, which provisions a standby replica in a different zone within
the same region and enables automatic failover on zone-level outages. Read
replicas are skipped because they inherit availability from their primary.
"""
def execute(self) -> list[Check_Report_GCP]:
"""Execute the high availability check across all Cloud SQL instances.
Returns:
A list of `Check_Report_GCP` findings, one per Cloud SQL primary
instance. Status is `PASS` when `availability_type == "REGIONAL"`
and `FAIL` otherwise.
"""
findings = []
for instance in cloudsql_client.instances:
if instance.instance_type != "CLOUD_SQL_INSTANCE":
continue
report = Check_Report_GCP(metadata=self.metadata(), resource=instance)
if instance.availability_type == "REGIONAL":
report.status = "PASS"
report.status_extended = (
f"Database instance {instance.name} has high availability "
f"(REGIONAL) configured."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Database instance {instance.name} does not have high "
f"availability configured (current: "
f"{instance.availability_type})."
)
findings.append(report)
return findings
@@ -46,9 +46,6 @@ class CloudSQL(GCPService):
"authorizedNetworks", []
),
flags=settings.get("databaseFlags", []),
availability_type=settings.get(
"availabilityType", "ZONAL"
),
instance_type=instance.get(
"instanceType", "CLOUD_SQL_INSTANCE"
),
@@ -79,7 +76,6 @@ class Instance(BaseModel):
ssl_mode: str
automated_backups: bool
flags: list
availability_type: str = "ZONAL"
instance_type: str = "CLOUD_SQL_INSTANCE"
cmek_key_name: Optional[str] = None
project_id: str
@@ -1,39 +0,0 @@
{
"Provider": "oraclecloud",
"CheckID": "identity_storage_service_level_admins_scoped",
"CheckTitle": "OCI IAM storage service-level admin policies exclude delete permissions",
"CheckType": [],
"ServiceName": "identity",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Policy",
"ResourceGroup": "IAM",
"Description": "**OCI IAM policies** are reviewed to ensure storage service-level administrator statements that grant `manage` permissions exclude the relevant storage delete permissions with `request.permission`. This supports CIS OCI 3.1 control 1.15 separation of duties for Block Volume, File Storage, and Object Storage administrators.",
"Risk": "Storage service-level administrators with unrestricted `manage` permissions can delete the resources they administer, including volumes, backups, file systems, mount targets, export sets, objects, or buckets. This weakens separation of duties and can lead to data loss, service disruption, or destructive insider activity.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.oracle.com/en-us/iaas/Content/Identity/policyreference/policyreference.htm",
"https://docs.oracle.com/en-us/iaas/Content/Block/home.htm",
"https://docs.oracle.com/en-us/iaas/Content/File/home.htm",
"https://docs.oracle.com/en-us/iaas/Content/Object/home.htm"
],
"Remediation": {
"Code": {
"CLI": "oci iam policy update --policy-id <policy-ocid> --statements \"[\\\"Allow group VolumeUsers to manage volumes in tenancy where request.permission!='VOLUME_DELETE'\\\"]\"",
"NativeIaC": "",
"Other": "1. In OCI Console, go to Identity & Security > Policies\n2. Open each active policy that grants storage service-level administrators `manage` permissions\n3. Edit storage manage statements to exclude the relevant delete permission with `request.permission`\n4. Example: Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE'\n5. Save changes",
"Terraform": "```hcl\nresource \"oci_identity_policy\" \"storage_admins\" {\n compartment_id = var.compartment_id\n name = \"storage-admins\"\n description = \"Storage administrators without delete permissions\"\n\n statements = [\n \"Allow group VolumeUsers to manage volumes in tenancy where request.permission!='VOLUME_DELETE'\",\n \"Allow group VolumeUsers to manage volume-backups in tenancy where request.permission!='VOLUME_BACKUP_DELETE'\",\n \"Allow group FileUsers to manage file-systems in tenancy where request.permission!='FILE_SYSTEM_DELETE'\",\n \"Allow group FileUsers to manage mount-targets in tenancy where request.permission!='MOUNT_TARGET_DELETE'\",\n \"Allow group FileUsers to manage export-sets in tenancy where request.permission!='EXPORT_SET_DELETE'\",\n \"Allow group BucketUsers to manage objects in tenancy where request.permission!='OBJECT_DELETE'\",\n \"Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE'\"\n ]\n}\n```"
},
"Recommendation": {
"Text": "Exclude delete permissions from storage service-level administrator policies. Use `request.permission!='VOLUME_DELETE'`, `request.permission!='VOLUME_BACKUP_DELETE'`, `request.permission!='FILE_SYSTEM_DELETE'`, `request.permission!='MOUNT_TARGET_DELETE'`, `request.permission!='EXPORT_SET_DELETE'`, `request.permission!='OBJECT_DELETE'`, and `request.permission!='BUCKET_DELETE'` as appropriate for each storage manage statement.",
"Url": "https://hub.prowler.com/check/identity_storage_service_level_admins_scoped"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -1,176 +0,0 @@
"""Check storage service-level administrators cannot delete managed resources."""
import re
from prowler.lib.check.models import Check, Check_Report_OCI
from prowler.providers.oraclecloud.services.identity.identity_client import (
identity_client,
)
STORAGE_DELETE_PERMISSIONS_BY_RESOURCE = {
"volumes": {"VOLUME_DELETE"},
"volume-backups": {"VOLUME_BACKUP_DELETE"},
"file-systems": {"FILE_SYSTEM_DELETE"},
"mount-targets": {"MOUNT_TARGET_DELETE"},
"export-sets": {"EXPORT_SET_DELETE"},
"objects": {"OBJECT_DELETE"},
"buckets": {"BUCKET_DELETE"},
"volume-family": {"VOLUME_DELETE", "VOLUME_BACKUP_DELETE"},
"file-family": {"FILE_SYSTEM_DELETE", "MOUNT_TARGET_DELETE", "EXPORT_SET_DELETE"},
"object-family": {"OBJECT_DELETE", "BUCKET_DELETE"},
}
ALL_STORAGE_DELETE_PERMISSIONS = set().union(
*STORAGE_DELETE_PERMISSIONS_BY_RESOURCE.values()
)
STORAGE_DELETE_PERMISSIONS_BY_RESOURCE["all-resources"] = ALL_STORAGE_DELETE_PERMISSIONS
MANAGE_STATEMENT_PATTERN = re.compile(
r"\ballow\s+group\b.+?\bto\s+manage\s+(?P<resource>[a-z-]+)\b",
re.IGNORECASE,
)
QUOTED_LITERAL_PATTERN = re.compile(r"'(?:\\.|[^'\\])*'|\"(?:\\.|[^\"\\])*\"")
def _normalize_statement(statement: str) -> str:
"""Collapse whitespace in an OCI policy statement."""
return " ".join(statement.strip().split())
def _has_disjunctive_condition(statement: str) -> bool:
"""Return True when the WHERE condition can allow alternate branches."""
condition = re.split(r"\bwhere\b", statement, flags=re.IGNORECASE, maxsplit=1)
if len(condition) != 2:
return False
condition_without_literals = QUOTED_LITERAL_PATTERN.sub("", condition[1])
return bool(
re.search(r"\b(any|or)\b|\|\|", condition_without_literals, re.IGNORECASE)
)
def _storage_manage_resource(statement: str) -> str | None:
"""Return the managed storage resource in a policy statement, if any."""
normalized_statement = _normalize_statement(statement)
match = MANAGE_STATEMENT_PATTERN.search(normalized_statement)
if not match:
return None
resource = match.group("resource").lower()
if resource not in STORAGE_DELETE_PERMISSIONS_BY_RESOURCE:
return None
return resource
def _excluded_permissions(statement: str) -> set[str]:
"""Return delete permissions explicitly excluded with request.permission != value."""
if _has_disjunctive_condition(statement):
return set()
exclusions = set()
for permission in ALL_STORAGE_DELETE_PERMISSIONS:
pattern = re.compile(
rf"\brequest\.permission\s*!=\s*['\"]?{re.escape(permission)}['\"]?\b",
re.IGNORECASE,
)
if pattern.search(statement):
exclusions.add(permission)
return exclusions
def _missing_delete_exclusions(statement: str) -> tuple[str, set[str]] | None:
"""Return the storage resource and missing delete exclusions for a statement."""
normalized_statement = _normalize_statement(statement)
resource = _storage_manage_resource(normalized_statement)
if not resource:
return None
required_permissions = STORAGE_DELETE_PERMISSIONS_BY_RESOURCE[resource]
excluded_permissions = _excluded_permissions(normalized_statement)
missing_permissions = required_permissions - excluded_permissions
if not missing_permissions:
return None
return resource, missing_permissions
class identity_storage_service_level_admins_scoped(Check):
"""Ensure storage service-level admins cannot delete resources they manage."""
def execute(self) -> list[Check_Report_OCI]:
"""Execute the storage service-level administrators scoped check.
Returns:
A list of OCI check reports for active non-tenant-admin policies.
"""
findings = []
for policy in identity_client.policies:
if policy.lifecycle_state != "ACTIVE":
continue
if policy.name.upper() == "TENANT ADMIN POLICY":
continue
region = policy.region if hasattr(policy, "region") else "global"
violations = []
has_storage_manage_statement = False
for statement in policy.statements:
if _storage_manage_resource(statement):
has_storage_manage_statement = True
missing_result = _missing_delete_exclusions(statement)
if not missing_result:
continue
resource, missing_permissions = missing_result
violations.append(
f"statement `{_normalize_statement(statement)}` manages {resource} without excluding: {', '.join(sorted(missing_permissions))}"
)
if not has_storage_manage_statement:
continue
report = Check_Report_OCI(
metadata=self.metadata(),
resource=policy,
region=region,
resource_id=policy.id,
resource_name=policy.name,
compartment_id=policy.compartment_id,
)
if violations:
report.status = "FAIL"
report.status_extended = (
f"Policy '{policy.name}' allows storage service-level administrators to manage storage resources without explicitly excluding required delete permissions: "
+ "; ".join(violations)
+ "."
)
else:
report.status = "PASS"
report.status_extended = f"Policy '{policy.name}' excludes required storage delete permissions from storage manage statements."
findings.append(report)
if not findings:
region = (
identity_client.audited_regions[0].key
if identity_client.audited_regions
else "global"
)
report = Check_Report_OCI(
metadata=self.metadata(),
resource={},
region=region,
resource_id=identity_client.audited_tenancy,
resource_name="Tenancy",
compartment_id=identity_client.audited_tenancy,
)
report.status = "PASS"
report.status_extended = "No active storage service-level administrator policies grant manage permissions without excluding delete permissions."
findings.append(report)
return findings
+1 -1
View File
@@ -124,7 +124,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">=3.10,<3.13"
version = "5.31.0"
version = "5.30.1"
[project.scripts]
prowler = "prowler.__main__:prowler"
@@ -1,491 +0,0 @@
from unittest.mock import patch
import botocore
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
orig = botocore.client.BaseClient._make_api_call
AGG_ARN_TEMPLATE = (
"arn:aws:config:{region}:" + AWS_ACCOUNT_NUMBER + ":config-aggregator/{name}"
)
def _aggregator_payload(
name, region, *, org_aware=True, all_regions=True, aws_regions=None
):
payload = {
"ConfigurationAggregatorName": name,
"ConfigurationAggregatorArn": AGG_ARN_TEMPLATE.format(region=region, name=name),
}
if org_aware:
org_source = {
"RoleArn": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/AWSConfigRoleForOrganizations",
"AllAwsRegions": all_regions,
}
if not all_regions and aws_regions:
org_source["AwsRegions"] = aws_regions
payload["OrganizationAggregationSource"] = org_source
return payload
def make_mock_no_aggregators_no_admin():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {"ConfigurationAggregators": []}
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregator_not_org_aware():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"legacy-agg",
AWS_REGION_EU_WEST_1,
org_aware=False,
)
]
}
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_org_aggregator_not_all_regions_with_admin():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"partial-org-agg",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=False,
aws_regions=[AWS_REGION_EU_WEST_1],
)
]
}
if operation_name == "ListDelegatedAdministrators":
return {
"DelegatedAdministrators": [
{
"Id": "123456789012",
"Arn": f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/o-abc123/123456789012",
"Email": "admin@example.com",
"Name": "Security",
"Status": "ACTIVE",
"JoinedMethod": "CREATED",
}
]
}
return orig(self, operation_name, api_params)
return _mock
def make_mock_full_pass():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"org-aggregator",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=True,
)
]
}
if operation_name == "ListDelegatedAdministrators":
return {
"DelegatedAdministrators": [
{
"Id": "123456789012",
"Arn": f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/o-abc123/123456789012",
"Email": "admin@example.com",
"Name": "Security",
"Status": "ACTIVE",
"JoinedMethod": "CREATED",
}
]
}
return orig(self, operation_name, api_params)
return _mock
def make_mock_access_denied_on_orgs():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"org-aggregator",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=True,
)
]
}
if operation_name == "ListDelegatedAdministrators":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "User is not authorized to perform: organizations:ListDelegatedAdministrators",
}
},
operation_name,
)
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregators_access_denied():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "denied",
}
},
operation_name,
)
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregators_other_client_error():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "InternalServerError",
"Message": "boom",
}
},
operation_name,
)
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregators_unexpected_exception():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
raise RuntimeError("simulated transient error")
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_delegated_admins_unexpected_exception():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"org-aggregator",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=True,
)
]
}
if operation_name == "ListDelegatedAdministrators":
raise RuntimeError("simulated transient error")
return orig(self, operation_name, api_params)
return _mock
class Test_config_delegated_admin_and_org_aggregator_all_regions:
@mock_aws
def test_no_aggregators_no_admin(self):
"""Test when no aggregators exist in any region and no delegated admin is set."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_no_aggregators_no_admin(),
):
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"no Organization Aggregator configured in any region"
in result[0].status_extended
)
assert (
"no delegated administrator registered for config.amazonaws.com"
in result[0].status_extended
)
@mock_aws
def test_aggregator_not_org_aware(self):
"""Test when an aggregator exists but is not an organization aggregator."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregator_not_org_aware(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"is not an organization aggregator"
in eu_west_1_result.status_extended
)
@mock_aws
def test_org_aggregator_not_all_regions_with_admin(self):
"""Test org aggregator that doesn't cover all AWS regions (delegated admin set)."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_org_aggregator_not_all_regions_with_admin(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"does not cover all AWS regions" in eu_west_1_result.status_extended
)
@mock_aws
def test_full_pass(self):
"""Test PASS: delegated admin set and org aggregator covering all AWS regions."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_full_pass(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "PASS"
assert (
"is an organization aggregator covering all AWS regions"
in eu_west_1_result.status_extended
)
assert "delegated admin configured" in eu_west_1_result.status_extended
assert eu_west_1_result.resource_arn == AGG_ARN_TEMPLATE.format(
region=AWS_REGION_EU_WEST_1, name="org-aggregator"
)
@mock_aws
def test_access_denied_on_organizations(self):
"""Test that AccessDenied on Organizations is reported as unknown admin state."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_access_denied_on_orgs(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
# The check still runs; aggregator coverage is satisfied but the
# delegated-admin status is unknown, so it must FAIL.
assert eu_west_1_result.status == "FAIL"
assert (
"delegated administrator status for config.amazonaws.com could not be determined"
in eu_west_1_result.status_extended
)
@mock_aws
def test_aggregators_access_denied(self):
"""AccessDenied on DescribeConfigurationAggregators is swallowed: no aggregators recorded for that region."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregators_access_denied(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.aggregators == {}
@mock_aws
def test_aggregators_other_client_error(self):
"""Non-access ClientError on DescribeConfigurationAggregators is logged at error level."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregators_other_client_error(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.aggregators == {}
@mock_aws
def test_aggregators_unexpected_exception(self):
"""Non-ClientError on DescribeConfigurationAggregators is caught by bare except."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregators_unexpected_exception(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.aggregators == {}
@mock_aws
def test_delegated_admins_unexpected_exception(self):
"""Non-ClientError on ListDelegatedAdministrators must still set lookup_failed."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_delegated_admins_unexpected_exception(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.delegated_administrators_lookup_failed is True
assert service.delegated_administrators == []
@@ -1,247 +0,0 @@
from unittest import mock
from prowler.providers.aws.services.sagemaker.sagemaker_service import ProcessingJob
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
CLARIFY_IMAGE_URI = f"{AWS_ACCOUNT_NUMBER}.dkr.ecr.{AWS_REGION_US_EAST_1}.amazonaws.com/sagemaker-clarify-processing:1.0"
NON_CLARIFY_IMAGE_URI = f"{AWS_ACCOUNT_NUMBER}.dkr.ecr.{AWS_REGION_US_EAST_1}.amazonaws.com/sagemaker-xgboost:1.0"
CUSTOM_CLARIFY_IMAGE_URI = f"{AWS_ACCOUNT_NUMBER}.dkr.ecr.{AWS_REGION_US_EAST_1}.amazonaws.com/my-clarify-thing:1.0"
PROCESSING_JOB_ARN = f"arn:aws:sagemaker:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:processing-job/clarify-job"
class Test_sagemaker_clarify_exists:
def test_no_processing_jobs_no_scanned_regions(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = []
sagemaker_client.processing_jobs_scanned_regions = set()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 0
def test_no_processing_jobs_region_scanned(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = []
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "sagemaker-clarify"
def test_non_clarify_processing_job(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="xgboost-job",
arn=f"arn:aws:sagemaker:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:processing-job/xgboost-job",
region=AWS_REGION_US_EAST_1,
image_uri=NON_CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_US_EAST_1}."
)
def test_custom_image_with_clarify_in_name_does_not_match(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="my-clarify-thing-job",
arn=f"arn:aws:sagemaker:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:processing-job/my-clarify-thing-job",
region=AWS_REGION_US_EAST_1,
image_uri=CUSTOM_CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_US_EAST_1}."
)
def test_clarify_processing_job_exists(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="clarify-job",
arn=PROCESSING_JOB_ARN,
region=AWS_REGION_US_EAST_1,
image_uri=CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SageMaker Clarify processing job clarify-job exists in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "clarify-job"
assert result[0].resource_arn == PROCESSING_JOB_ARN
def test_mixed_regions(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="clarify-job",
arn=PROCESSING_JOB_ARN,
region=AWS_REGION_US_EAST_1,
image_uri=CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {
AWS_REGION_US_EAST_1,
AWS_REGION_EU_WEST_1,
}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 2
results_by_region = {r.region: r for r in result}
us_result = results_by_region[AWS_REGION_US_EAST_1]
assert us_result.status == "PASS"
assert (
us_result.status_extended
== f"SageMaker Clarify processing job clarify-job exists in region {AWS_REGION_US_EAST_1}."
)
eu_result = results_by_region[AWS_REGION_EU_WEST_1]
assert eu_result.status == "FAIL"
assert (
eu_result.status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_EU_WEST_1}."
)
@@ -396,13 +396,13 @@ class Test_SageMaker_Service:
sagemaker_service = SageMaker(audit_info)
# Check that __threading_call__ was called for _list_tags_for_resource
# (one for each resource type: models, notebooks, training jobs, processing jobs, endpoint configs, domains)
# (one for each resource type: models, notebooks, training jobs, endpoint configs, domains)
tag_calls = [
c
for c in mock_threading_call.call_args_list
if c[0][0] == sagemaker_service._list_tags_for_resource
]
assert len(tag_calls) == 6
assert len(tag_calls) == 5
# Test SageMaker list model package groups
def test_list_model_package_groups(self):
@@ -1,512 +0,0 @@
from unittest.mock import patch
import botocore
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
orig = botocore.client.BaseClient._make_api_call
HUB_ARN = f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:hub/default"
def _active_hub_responses(operation_name):
"""Return a moto-friendly response for hub-describing API calls.
Returns None if the operation is not one of the hub APIs (so the caller
can fall back to the default behavior).
"""
if operation_name == "DescribeHub":
return {
"HubArn": HUB_ARN,
"SubscribedAt": "2024-01-01T00:00:00.000Z",
"AutoEnableControls": True,
}
if operation_name == "GetEnabledStandards":
return {"StandardsSubscriptions": []}
if operation_name == "ListEnabledProductsForImport":
return {"ProductSubscriptions": []}
if operation_name == "ListTagsForResource":
return {"Tags": {}}
return None
def mock_make_api_call_org_admin_and_config(self, operation_name, api_params):
"""Mock organization admin accounts and configuration APIs - PASS scenario."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{
"AdminAccountId": "123456789012",
"AdminStatus": "ENABLED",
}
]
}
if operation_name == "DescribeOrganizationConfiguration":
return {
"AutoEnable": True,
"AutoEnableStandards": "DEFAULT",
}
return orig(self, operation_name, api_params)
def mock_make_api_call_org_admin_no_auto_enable(self, operation_name, api_params):
"""Mock organization admin configured but auto-enable disabled."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{
"AdminAccountId": "123456789012",
"AdminStatus": "ENABLED",
}
]
}
if operation_name == "DescribeOrganizationConfiguration":
return {
"AutoEnable": False,
"AutoEnableStandards": "NONE",
}
return orig(self, operation_name, api_params)
def mock_make_api_call_no_org_admin(self, operation_name, api_params):
"""Mock no organization admin configured."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {"AdminAccounts": []}
if operation_name == "DescribeOrganizationConfiguration":
return {
"AutoEnable": False,
"AutoEnableStandards": "NONE",
}
return orig(self, operation_name, api_params)
def mock_make_api_call_securityhub_not_subscribed(self, operation_name, api_params):
"""Simulate Security Hub not subscribed in the account (InvalidAccessException)."""
if operation_name == "DescribeHub":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "InvalidAccessException",
"Message": "Account is not subscribed to AWS Security Hub",
}
},
operation_name,
)
if operation_name == "ListOrganizationAdminAccounts":
return {"AdminAccounts": []}
return orig(self, operation_name, api_params)
def mock_make_api_call_admin_lookup_access_denied(self, operation_name, api_params):
"""Hub is ACTIVE but ListOrganizationAdminAccounts is denied — lookup-failed path."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "User is not authorized to perform: securityhub:ListOrganizationAdminAccounts",
}
},
operation_name,
)
if operation_name == "DescribeOrganizationConfiguration":
return {"AutoEnable": True, "AutoEnableStandards": "DEFAULT"}
return orig(self, operation_name, api_params)
def mock_make_api_call_admin_lookup_unexpected(self, operation_name, api_params):
"""ListOrganizationAdminAccounts raises a non-ClientError — bare Exception branch."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
raise RuntimeError("simulated transient error")
if operation_name == "DescribeOrganizationConfiguration":
return {"AutoEnable": True, "AutoEnableStandards": "DEFAULT"}
return orig(self, operation_name, api_params)
def mock_make_api_call_describe_org_config_other_client_error(
self, operation_name, api_params
):
"""DescribeOrganizationConfiguration raises a non-access ClientError — else branch."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{"AdminAccountId": "123456789012", "AdminStatus": "ENABLED"}
]
}
if operation_name == "DescribeOrganizationConfiguration":
raise botocore.exceptions.ClientError(
{"Error": {"Code": "InternalServerError", "Message": "boom"}},
operation_name,
)
return orig(self, operation_name, api_params)
def mock_make_api_call_describe_org_config_unexpected(self, operation_name, api_params):
"""DescribeOrganizationConfiguration raises a non-ClientError — bare Exception branch."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{"AdminAccountId": "123456789012", "AdminStatus": "ENABLED"}
]
}
if operation_name == "DescribeOrganizationConfiguration":
raise RuntimeError("simulated transient error")
return orig(self, operation_name, api_params)
class Test_securityhub_delegated_admin_enabled_all_regions:
def teardown_method(self):
"""Evict cached securityhub modules so legacy mock.patch-based tests
in the same session see a fresh import path."""
import sys
for mod in (
"prowler.providers.aws.services.securityhub.securityhub_client",
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions",
):
sys.modules.pop(mod, None)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_securityhub_not_subscribed,
)
@mock_aws
def test_no_securityhub(self):
"""Test when Security Hub is not subscribed in any region."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
# Should have findings for each region (with NOT_AVAILABLE hubs)
assert len(result) > 0
# All should fail since hub is not enabled
for finding in result:
assert finding.status == "FAIL"
assert "Security Hub not enabled" in finding.status_extended
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_org_admin,
)
@mock_aws
def test_securityhub_enabled_no_delegated_admin(self):
"""Test when Security Hub is enabled but no delegated admin is configured."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"no delegated administrator configured"
in eu_west_1_result.status_extended
)
assert eu_west_1_result.resource_arn == HUB_ARN
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_org_admin_no_auto_enable,
)
@mock_aws
def test_securityhub_enabled_with_admin_no_auto_enable(self):
"""Test when Security Hub is enabled with delegated admin but auto-enable is off."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"organization auto-enable not configured"
in eu_west_1_result.status_extended
)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_org_admin_and_config,
)
@mock_aws
def test_securityhub_enabled_with_admin_and_auto_enable(self):
"""Test when Security Hub is enabled with delegated admin and auto-enable on (PASS)."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "PASS"
assert "delegated admin configured" in eu_west_1_result.status_extended
assert "auto-enable" in eu_west_1_result.status_extended
assert eu_west_1_result.resource_arn == HUB_ARN
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_admin_lookup_access_denied,
)
@mock_aws
def test_admin_lookup_access_denied(self):
"""AccessDenied on ListOrganizationAdminAccounts must FAIL with unknown-admin message."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"delegated administrator status could not be determined"
in eu_west_1_result.status_extended
)
assert (
"no delegated administrator configured"
not in eu_west_1_result.status_extended
)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_admin_lookup_unexpected,
)
@mock_aws
def test_admin_lookup_unexpected_exception(self):
"""Non-ClientError raised from ListOrganizationAdminAccounts still sets lookup_failed."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
service = SecurityHub(aws_provider)
assert service.organization_admin_lookup_failed is True
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=service,
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
result = securityhub_delegated_admin_enabled_all_regions().execute()
assert result and result[0].status == "FAIL"
assert (
"delegated administrator status could not be determined"
in result[0].status_extended
)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_describe_org_config_other_client_error,
)
@mock_aws
def test_describe_org_config_other_client_error(self):
"""Non-access ClientError on DescribeOrganizationConfiguration is logged at error level."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
service = SecurityHub(aws_provider)
# organization_config_available stays False, so the auto-enable issue is suppressed
assert service.securityhubs[0].organization_config_available is False
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=service,
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
result = securityhub_delegated_admin_enabled_all_regions().execute()
# Admin is configured and hub is active; with org config unavailable the
# check should PASS because there are no other detectable issues.
assert result and result[0].status == "PASS"
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_describe_org_config_unexpected,
)
@mock_aws
def test_describe_org_config_unexpected_exception(self):
"""Non-ClientError on DescribeOrganizationConfiguration is caught by bare except."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
service = SecurityHub(aws_provider)
assert service.securityhubs[0].organization_config_available is False
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=service,
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
result = securityhub_delegated_admin_enabled_all_regions().execute()
assert result and result[0].status == "PASS"
@@ -1,115 +0,0 @@
from unittest import mock
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION_ID,
set_mocked_azure_provider,
)
class Test_cosmosdb_account_automatic_failover_enabled:
def test_no_subscriptions(self):
cosmosdb_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled.cosmosdb_client",
new=cosmosdb_client,
),
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled import (
cosmosdb_account_automatic_failover_enabled,
)
cosmosdb_client.accounts = {}
check = cosmosdb_account_automatic_failover_enabled()
result = check.execute()
assert len(result) == 0
def test_pass(self):
cosmosdb_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled.cosmosdb_client",
new=cosmosdb_client,
),
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled import (
cosmosdb_account_automatic_failover_enabled,
)
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import (
Account,
)
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id="/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.DocumentDB/databaseAccounts/test-account",
name="test-account",
kind="GlobalDocumentDB",
type="Microsoft.DocumentDB/databaseAccounts",
tags={},
is_virtual_network_filter_enabled=False,
location="eastus",
private_endpoint_connections=[],
disable_local_auth=False,
enable_automatic_failover=True,
)
]
}
check = cosmosdb_account_automatic_failover_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
def test_fail(self):
cosmosdb_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled.cosmosdb_client",
new=cosmosdb_client,
),
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled import (
cosmosdb_account_automatic_failover_enabled,
)
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import (
Account,
)
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id="/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.DocumentDB/databaseAccounts/test-account",
name="test-account",
kind="GlobalDocumentDB",
type="Microsoft.DocumentDB/databaseAccounts",
tags={},
is_virtual_network_filter_enabled=False,
location="eastus",
private_endpoint_connections=[],
disable_local_auth=False,
enable_automatic_failover=False,
)
]
}
check = cosmosdb_account_automatic_failover_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
-2
View File
@@ -722,7 +722,6 @@ def mock_api_instances_calls(client: MagicMock, service: str):
},
"backupConfiguration": {"enabled": True},
"databaseFlags": [],
"availabilityType": "REGIONAL",
},
},
{
@@ -738,7 +737,6 @@ def mock_api_instances_calls(client: MagicMock, service: str):
},
"backupConfiguration": {"enabled": False},
"databaseFlags": [],
"availabilityType": "ZONAL",
},
},
]
@@ -1,205 +0,0 @@
from unittest import mock
from tests.providers.gcp.gcp_fixtures import (
GCP_EU1_LOCATION,
GCP_PROJECT_ID,
set_mocked_gcp_provider,
)
class Test_cloudsql_instance_high_availability_enabled:
"""Tests for the cloudsql_instance_high_availability_enabled check."""
def test_no_instances(self):
"""No Cloud SQL instances → no findings."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
cloudsql_client.instances = []
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 0
def test_instance_ha_enabled(self):
"""A REGIONAL primary instance → PASS."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-ha",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
availability_type="REGIONAL",
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "db-ha"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_instance_ha_disabled(self):
"""A ZONAL primary instance → FAIL with current availability in status_extended."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-zonal",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
availability_type="ZONAL",
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "ZONAL" in result[0].status_extended
assert result[0].resource_id == "db-zonal"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_read_replica_skipped(self):
"""Read replicas (instance_type != CLOUD_SQL_INSTANCE) are skipped."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-replica",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
availability_type="ZONAL",
instance_type="READ_REPLICA_INSTANCE",
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 0
def test_instance_default_availability_type_fails(self):
"""An instance missing availabilityType defaults to ZONAL (service layer) and must FAIL."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-default",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
# availability_type omitted → model default "ZONAL"
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "ZONAL" in result[0].status_extended
@@ -1,326 +0,0 @@
from datetime import datetime
from unittest import mock
import pytest
from prowler.lib.check.models import Check_Report_OCI
from prowler.providers.oraclecloud.services.identity.identity_service import Policy
from tests.providers.oraclecloud.oci_fixtures import (
OCI_COMPARTMENT_ID,
OCI_REGION,
OCI_TENANCY_ID,
set_mocked_oraclecloud_provider,
)
CHECK_PATH = "prowler.providers.oraclecloud.services.identity.identity_storage_service_level_admins_scoped.identity_storage_service_level_admins_scoped"
def _policy(
name: str, statements: list[str], lifecycle_state: str = "ACTIVE"
) -> Policy:
return Policy(
id=f"ocid1.policy.oc1..{name.lower().replace(' ', '-')}",
name=name,
description="Test policy",
compartment_id=OCI_COMPARTMENT_ID,
statements=statements,
time_created=datetime.now(),
lifecycle_state=lifecycle_state,
region=OCI_REGION,
)
def _identity_client(policies: list[Policy]) -> mock.MagicMock:
identity_client = mock.MagicMock()
identity_client.policies = policies
identity_client.audited_tenancy = OCI_TENANCY_ID
identity_client.audited_regions = [mock.MagicMock(key=OCI_REGION)]
return identity_client
def _run_check(policies: list[Policy]) -> list[Check_Report_OCI]:
identity_client = _identity_client(policies)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_oraclecloud_provider(),
),
mock.patch(f"{CHECK_PATH}.identity_client", new=identity_client),
):
from prowler.providers.oraclecloud.services.identity.identity_storage_service_level_admins_scoped.identity_storage_service_level_admins_scoped import (
identity_storage_service_level_admins_scoped,
)
return identity_storage_service_level_admins_scoped().execute()
class Test_identity_storage_service_level_admins_scoped:
def test_no_policies_passes_with_tenancy_finding(self):
result = _run_check([])
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == OCI_TENANCY_ID
assert result[0].resource_name == "Tenancy"
assert (
result[0].status_extended
== "No active storage service-level administrator policies grant manage permissions without excluding delete permissions."
)
def test_manage_volumes_without_delete_exclusion_fails(self):
result = _run_check(
[
_policy(
"Volume Admins",
["Allow group VolumeUsers to manage volumes in tenancy"],
)
]
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "Volume Admins"
assert "VOLUME_DELETE" in result[0].status_extended
assert (
"Allow group VolumeUsers to manage volumes in tenancy"
in result[0].status_extended
)
def test_manage_volumes_with_delete_exclusion_passes(self):
result = _run_check(
[
_policy(
"Volume Admins",
[
"Allow group VolumeUsers to manage volumes in tenancy where request.permission!='VOLUME_DELETE'"
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Policy 'Volume Admins' excludes required storage delete permissions from storage manage statements."
)
def test_delete_exclusion_parser_is_case_and_whitespace_insensitive(self):
result = _run_check(
[
_policy(
"Volume Admins",
[
" allow group VolumeUsers TO manage volumes in tenancy WHERE request.permission != 'volume_delete' "
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
def test_generic_where_clause_does_not_pass(self):
result = _run_check(
[
_policy(
"Bucket Admins",
[
"Allow group BucketUsers to manage buckets in tenancy where request.region='iad'"
],
)
]
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert "BUCKET_DELETE" in result[0].status_extended
assert "request.region='iad'" in result[0].status_extended
@pytest.mark.parametrize(
"statement",
[
"Allow group BucketUsers to manage buckets in tenancy where ANY {request.permission!='BUCKET_DELETE', request.region='iad'}",
"Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE' OR request.region='iad'",
],
)
def test_disjunctive_delete_exclusion_does_not_pass(self, statement):
result = _run_check([_policy("Bucket Admins", [statement])])
assert len(result) == 1
assert result[0].status == "FAIL"
assert "BUCKET_DELETE" in result[0].status_extended
def test_quoted_literals_do_not_make_delete_exclusion_disjunctive(self):
result = _run_check(
[
_policy(
"Bucket Admins",
[
"Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE' and target.tag.namespace='any-tag'"
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
@pytest.mark.parametrize(
"resource,permission",
[
("file-systems", "FILE_SYSTEM_DELETE"),
("mount-targets", "MOUNT_TARGET_DELETE"),
("export-sets", "EXPORT_SET_DELETE"),
("volumes", "VOLUME_DELETE"),
("volume-backups", "VOLUME_BACKUP_DELETE"),
("objects", "OBJECT_DELETE"),
("buckets", "BUCKET_DELETE"),
],
)
def test_storage_resources_require_matching_delete_exclusion(
self, resource, permission
):
fail_result = _run_check(
[
_policy(
"Storage Admins",
[f"Allow group StorageUsers to manage {resource} in tenancy"],
)
]
)
pass_result = _run_check(
[
_policy(
"Storage Admins",
[
f"Allow group StorageUsers to manage {resource} in tenancy where request.permission != '{permission}'"
],
)
]
)
assert len(fail_result) == 1
assert fail_result[0].status == "FAIL"
assert permission in fail_result[0].status_extended
assert len(pass_result) == 1
assert pass_result[0].status == "PASS"
def test_file_family_fails_until_all_delete_permissions_are_excluded(self):
partial_result = _run_check(
[
_policy(
"File Admins",
[
"Allow group FileUsers to manage file-family in tenancy where ALL {request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE'}"
],
)
]
)
complete_result = _run_check(
[
_policy(
"File Admins",
[
"Allow group FileUsers to manage file-family in tenancy where ALL {request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE', request.permission!='EXPORT_SET_DELETE'}"
],
)
]
)
assert len(partial_result) == 1
assert partial_result[0].status == "FAIL"
assert "EXPORT_SET_DELETE" in partial_result[0].status_extended
assert len(complete_result) == 1
assert complete_result[0].status == "PASS"
@pytest.mark.parametrize(
"family,missing_permission,statement",
[
(
"volume-family",
"VOLUME_BACKUP_DELETE",
"Allow group VolumeUsers to manage volume-family in tenancy where request.permission!='VOLUME_DELETE'",
),
(
"object-family",
"BUCKET_DELETE",
"Allow group BucketUsers to manage object-family in tenancy where request.permission!='OBJECT_DELETE'",
),
(
"all-resources",
"BUCKET_DELETE",
"Allow group StorageUsers to manage all-resources in tenancy where ALL {request.permission!='VOLUME_DELETE', request.permission!='VOLUME_BACKUP_DELETE', request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE', request.permission!='EXPORT_SET_DELETE', request.permission!='OBJECT_DELETE'}",
),
],
)
def test_families_and_all_resources_fail_unless_all_delete_permissions_are_excluded(
self, family, missing_permission, statement
):
result = _run_check([_policy("Storage Admins", [statement])])
assert len(result) == 1
assert result[0].status == "FAIL"
assert family in result[0].status_extended
assert missing_permission in result[0].status_extended
def test_all_resources_passes_when_all_storage_delete_permissions_are_excluded(
self,
):
result = _run_check(
[
_policy(
"Storage Admins",
[
"Allow group StorageUsers to manage all-resources in tenancy where ALL {request.permission!='VOLUME_DELETE', request.permission!='VOLUME_BACKUP_DELETE', request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE', request.permission!='EXPORT_SET_DELETE', request.permission!='OBJECT_DELETE', request.permission!='BUCKET_DELETE'}"
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
def test_inactive_policies_are_ignored(self):
result = _run_check(
[
_policy(
"Inactive Volume Admins",
["Allow group VolumeUsers to manage volumes in tenancy"],
lifecycle_state="INACTIVE",
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Tenancy"
def test_tenant_admin_policy_is_ignored(self):
result = _run_check(
[
_policy(
"Tenant Admin Policy",
["Allow group Administrators to manage all-resources in tenancy"],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Tenancy"
def test_policies_without_storage_manage_statements_are_ignored(self):
result = _run_check(
[
_policy(
"Network Admins",
["Allow group NetworkUsers to manage vcns in tenancy"],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Tenancy"
Generated
+1 -1
View File
@@ -3245,7 +3245,7 @@ wheels = [
[[package]]
name = "prowler"
version = "5.31.0"
version = "5.30.1"
source = { editable = "." }
dependencies = [
{ name = "alibabacloud-actiontrail20200706" },