Compare commits

..

8 Commits

Author SHA1 Message Date
Prowler Bot 1192d94648 chore(release): Bump versions to v5.30.2 (#11571)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-06-12 13:48:44 +02:00
Prowler Bot a578f4af34 chore: prepare API and UI changelogs for 5.30.1 release (#11566)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-06-12 12:16:15 +02:00
Prowler Bot d6528b674e fix(ui): show threat map data for okta and google workspace accounts (#11563)
Co-authored-by: Alejandro Bailo <59607668+alejandrobailo@users.noreply.github.com>
2026-06-12 10:18:43 +02:00
Prowler Bot 75decbbedf fix(api): drop_subgraph deletes relationships then nodes to cut Neo4j memory (#11561)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-06-12 09:47:41 +02:00
Prowler Bot 4a14559a5f fix(compliance): resolve provider from scan in attributes endp (#11560)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2026-06-12 09:18:11 +02:00
Prowler Bot c6f8620a0d fix(api): normalize OCI scan region credentials (#11559)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-06-11 17:55:26 +02:00
Prowler Bot ca4889b43e chore(release): Bump versions to v5.30.1 (#11547)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-06-11 15:28:54 +02:00
Prowler Bot 057d061c7e chore(api): Update prowler dependency to v5.30 for release 5.30.0 (#11543)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-06-11 11:15:18 +02:00
52 changed files with 43 additions and 3054 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.31.0
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.30.2
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
+2 -2
View File
@@ -43,7 +43,7 @@ dependencies = [
"defusedxml==0.7.1",
"gunicorn==23.0.0",
"lxml==6.1.0",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@v5.30",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (==1.3.0)",
"sentry-sdk[django] (==2.56.0)",
@@ -68,7 +68,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.32.0"
version = "1.31.2"
[tool.uv]
# Transitive pins matching master to avoid silent drift; bump deliberately.
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.32.0
version: 1.31.2
description: |-
Prowler API specification.
Generated
+3 -3
View File
@@ -4416,7 +4416,7 @@ wheels = [
[[package]]
name = "prowler"
version = "5.30.0"
source = { git = "https://github.com/prowler-cloud/prowler.git?rev=master#f1d741214a60df17158c3fdc97804fd1fde64f3a" }
source = { git = "https://github.com/prowler-cloud/prowler.git?rev=v5.30#f1d741214a60df17158c3fdc97804fd1fde64f3a" }
dependencies = [
{ name = "alibabacloud-actiontrail20200706" },
{ name = "alibabacloud-credentials" },
@@ -4504,7 +4504,7 @@ dependencies = [
[[package]]
name = "prowler-api"
version = "1.32.0"
version = "1.31.2"
source = { virtual = "." }
dependencies = [
{ name = "cartography" },
@@ -4600,7 +4600,7 @@ requires-dist = [
{ name = "matplotlib", specifier = "==3.10.8" },
{ name = "neo4j", specifier = "==6.1.0" },
{ name = "openai", specifier = "==1.109.1" },
{ name = "prowler", git = "https://github.com/prowler-cloud/prowler.git?rev=master" },
{ name = "prowler", git = "https://github.com/prowler-cloud/prowler.git?rev=v5.30" },
{ name = "psycopg2-binary", specifier = "==2.9.9" },
{ name = "pytest-celery", extras = ["redis"], specifier = "==1.3.0" },
{ name = "reportlab", specifier = "==4.4.10" },
@@ -1,7 +1,7 @@
# Build command
# docker build --platform=linux/amd64 --no-cache -t prowler:latest .
ARG PROWLER_VERSION=latest@sha256:aa48c4b5fad9ba70be794fe574166a335309e9688af0f3d35921f1ea36e26713
ARG PROWLER_VERSION=latest
FROM toniblyx/prowler:${PROWLER_VERSION}
@@ -16,7 +16,7 @@
services:
nginx:
image: nginx:alpine@sha256:8b1e78743a03dbb2c95171cc58639fef29abc8816598e27fb910ed2e621e589a
image: nginx:alpine
container_name: prowler-nginx
restart: unless-stopped
ports:
+4 -4
View File
@@ -1,6 +1,6 @@
services:
api-dev-init:
image: busybox:1.37.0@sha256:9532d8c39891ca2ecde4d30d7710e01fb739c87a8b9299685c63704296b16028
image: busybox:1.37.0
volumes:
- ./_data/api:/data
command: ["sh", "-c", "chown -R 1000:1000 /data"]
@@ -64,7 +64,7 @@ services:
condition: service_healthy
postgres:
image: postgres:16.3-alpine3.20@sha256:36ed71227ae36305d26382657c0b96cbaf298427b3f1eaeb10d77a6dea3eec41
image: postgres:16.3-alpine3.20
hostname: "postgres-db"
volumes:
- ./_data/postgres:/var/lib/postgresql/data
@@ -88,7 +88,7 @@ services:
retries: 5
valkey:
image: valkey/valkey:7-alpine3.19@sha256:4054fe7fc607b9326ac7c4691ed26e9670d2ff17a9fb28c2577adecf928acbcc
image: valkey/valkey:7-alpine3.19
hostname: "valkey"
volumes:
- ./_data/valkey:/data
@@ -104,7 +104,7 @@ services:
retries: 3
neo4j:
image: graphstack/dozerdb:5.26.3.0@sha256:a77526ea3918fdc46d1fff70c4aea7d71d3874a26ecec059179d6775845b1247
image: graphstack/dozerdb:5.26.3.0
hostname: "neo4j"
volumes:
- ./_data/neo4j:/data
+4 -4
View File
@@ -6,7 +6,7 @@
#
services:
api-init:
image: busybox:1.37.0@sha256:9532d8c39891ca2ecde4d30d7710e01fb739c87a8b9299685c63704296b16028
image: busybox:1.37.0
volumes:
- ./_data/api:/data
command: ["sh", "-c", "chown -R 1000:1000 /data"]
@@ -60,7 +60,7 @@ services:
start_period: 60s
postgres:
image: postgres:16.3-alpine3.20@sha256:36ed71227ae36305d26382657c0b96cbaf298427b3f1eaeb10d77a6dea3eec41
image: postgres:16.3-alpine3.20
hostname: "postgres-db"
volumes:
- ./_data/postgres:/var/lib/postgresql/data
@@ -80,7 +80,7 @@ services:
retries: 5
valkey:
image: valkey/valkey:7-alpine3.19@sha256:4054fe7fc607b9326ac7c4691ed26e9670d2ff17a9fb28c2577adecf928acbcc
image: valkey/valkey:7-alpine3.19
hostname: "valkey"
volumes:
- ./_data/valkey:/data
@@ -96,7 +96,7 @@ services:
retries: 3
neo4j:
image: graphstack/dozerdb:5.26.3.0@sha256:a77526ea3918fdc46d1fff70c4aea7d71d3874a26ecec059179d6775845b1247
image: graphstack/dozerdb:5.26.3.0
hostname: "neo4j"
volumes:
- ./_data/neo4j:/data
@@ -128,8 +128,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.30.0"
PROWLER_API_VERSION="5.30.0"
PROWLER_UI_VERSION="5.29.0"
PROWLER_API_VERSION="5.29.0"
```
<Note>
+2 -2
View File
@@ -1,7 +1,7 @@
# =============================================================================
# Build stage - Install dependencies and build the application
# =============================================================================
FROM ghcr.io/astral-sh/uv:python3.13-alpine@sha256:f09cc61ffc001f202701fdeae14dbdd50f6ca4cfcf248f41fd3234a302c8534f AS builder
FROM ghcr.io/astral-sh/uv:python3.13-alpine@sha256:8f53782bb232ab0b5558f3071e86e2bbfde884e18815f2b19cc57f2d336e9ee2 AS builder
WORKDIR /app
@@ -25,7 +25,7 @@ RUN --mount=type=cache,target=/root/.cache/uv \
# =============================================================================
# Final stage - Minimal runtime environment
# =============================================================================
FROM python:3.13-alpine@sha256:db66119d6609a3a941a9433b225f4e13d33c459cede097cf3ec2fc4d1bd314b2
FROM python:3.13-alpine@sha256:bb1f2fdb1065c85468775c9d680dcd344f6442a2d1181ef7916b60a623f11d40
LABEL maintainer="https://github.com/prowler-cloud"
-13
View File
@@ -2,19 +2,6 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.31.0] (Prowler UNRELEASED)
### 🚀 Added
- `securityhub_delegated_admin_enabled_all_regions` check for AWS provider, verifying that Security Hub has a delegated administrator, is active in all opted-in regions, and has organization auto-enable on [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259)
- `config_delegated_admin_and_org_aggregator_all_regions` check for AWS provider, verifying that AWS Config has a delegated administrator and an organization aggregator covering all AWS regions [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259)
- `sagemaker_clarify_exists` check for AWS provider [(#11211)](https://github.com/prowler-cloud/prowler/pull/11211)
- `cloudsql_instance_high_availability_enabled` check for GCP provider, verifying Cloud SQL primary instances use `REGIONAL` availability for automatic zone failover [(#11024)](https://github.com/prowler-cloud/prowler/pull/11024)
- `identity_storage_service_level_admins_scoped` check for OCI provider CIS 3.1 control 1.15, ensuring storage service-level administrators exclude delete permissions [(#11523)](https://github.com/prowler-cloud/prowler/pull/11523)
- `cosmosdb_account_automatic_failover_enabled` check for Azure provider [(#11031)](https://github.com/prowler-cloud/prowler/pull/11031)
---
## [5.30.0] (Prowler v5.30.0)
### 🚀 Added
@@ -1293,8 +1293,7 @@
"storage_ensure_private_endpoints_in_storage_accounts",
"storage_secure_transfer_required_is_enabled",
"vm_ensure_using_managed_disks",
"vm_trusted_launch_enabled",
"cosmosdb_account_automatic_failover_enabled"
"vm_trusted_launch_enabled"
]
},
{
+1 -2
View File
@@ -1087,8 +1087,7 @@
"storage_blob_versioning_is_enabled",
"storage_geo_redundant_enabled",
"vm_scaleset_associated_with_load_balancer",
"vm_scaleset_not_empty",
"cosmosdb_account_automatic_failover_enabled"
"vm_scaleset_not_empty"
],
"gcp": [
"compute_instance_automatic_restart_enabled",
@@ -302,9 +302,7 @@
{
"Id": "1.15",
"Description": "Ensure storage service-level admins cannot delete resources they manage",
"Checks": [
"identity_storage_service_level_admins_scoped"
],
"Checks": [],
"Attributes": [
{
"Section": "1. Identity and Access Management",
+1 -1
View File
@@ -49,7 +49,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.31.0"
prowler_version = "5.30.2"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
+9 -10
View File
@@ -58,17 +58,16 @@ def print_prowler_cloud_banner(provider: str = None):
bar = f"{banner_color}{Style.RESET_ALL}"
print(
f"""
{bar} {Style.BRIGHT}You're getting a snapshot 📸. Prowler Cloud gives you the full picture:{Style.RESET_ALL}
{bar} {Style.BRIGHT}You're getting a snapshot. Prowler Cloud gives you the full picture.{Style.RESET_ALL}
{bar}
{bar} {check} {Style.BRIGHT}Continuous Security Monitoring{Style.RESET_ALL} - scheduled scans with history, trends and alerts.
{bar} {check} {Style.BRIGHT}Lighthouse AI + MCP{Style.RESET_ALL} - autonomous triage, custom dashboards, prioritization with prevention and remediation.
{bar} {check} {Style.BRIGHT}Alerts{Style.RESET_ALL} - get notified when anything you want is happening.
{bar} {check} {Style.BRIGHT}Live Compliance{Style.RESET_ALL} - dashboards for 50+ frameworks, always up to date.
{bar} {check} {Style.BRIGHT}Remediation{Style.RESET_ALL} - complete guided remediation including Autonomous remediation with Lighthouse AI.
{bar} {check} {Style.BRIGHT}Attack Path Visualization{Style.RESET_ALL} - see how attackers chain risks to reach your crown jewels.
{bar} {check} {Style.BRIGHT}Bulk Provisioning{Style.RESET_ALL} - add your entire AWS Organization in seconds.
{bar} {check} {Style.BRIGHT}Integrations{Style.RESET_ALL} - Anything with our MCP + Jira, Slack, AWS Security Hub, Amazon S3, SSO and RBAC.
{bar} {check} {Style.BRIGHT}Attack Path Visualization{Style.RESET_ALL} - see how attackers chain risks to reach your crown jewels
{bar} {check} {Style.BRIGHT}Lighthouse AI + MCP{Style.RESET_ALL} - autonomous triage, prioritization and remediation
{bar} {check} {Style.BRIGHT}Organizations{Style.RESET_ALL} - all your AWS accounts under one organization
{bar} {check} {Style.BRIGHT}Continuous scanning{Style.RESET_ALL} - scheduled scans with history, trends and alerts
{bar} {check} {Style.BRIGHT}Integrations{Style.RESET_ALL} - Jira, Slack, AWS Security Hub, Amazon S3, SSO and RBAC
{bar} {check} {Style.BRIGHT}Reports{Style.RESET_ALL} - download ready-to-share PDF reports
{bar} {check} {Style.BRIGHT}Live compliance{Style.RESET_ALL} - dashboards for 50+ frameworks, always up to date
{bar}
{bar} {Fore.BLUE}Start free at 👉 cloud.prowler.com{Style.RESET_ALL}
{bar} {Fore.BLUE}Start free at cloud.prowler.com{Style.RESET_ALL}
"""
)
@@ -2582,7 +2582,6 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -2592,9 +2591,6 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -2608,7 +2604,6 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -7349,7 +7344,6 @@
"lightsail": {
"regions": {
"aws": [
"ap-east-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-south-1",
@@ -7360,11 +7354,9 @@
"ca-central-1",
"eu-central-1",
"eu-north-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -8277,9 +8269,7 @@
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -9230,7 +9220,6 @@
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -9997,8 +9986,6 @@
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-south-2",
@@ -1,44 +0,0 @@
{
"Provider": "aws",
"CheckID": "config_delegated_admin_and_org_aggregator_all_regions",
"CheckTitle": "AWS Config has a delegated administrator and an organization aggregator covering all AWS regions",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "config",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "AwsConfigConfigurationAggregator",
"ResourceGroup": "governance",
"Description": "**AWS Config** has a delegated administrator registered via AWS Organizations and at least one Configuration Aggregator with an OrganizationAggregationSource that covers all AWS regions, ensuring centralized org-wide configuration visibility.",
"Risk": "Without an org-wide **AWS Config** aggregator and a delegated administrator, configuration data is fragmented across accounts and regions, **compliance reporting** is incomplete, and **drift detection** is delayed. Adversaries or misconfigurations can persist in unmonitored accounts, eroding **audit readiness** and **regulatory posture**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/config/latest/developerguide/aggregate-data.html",
"https://docs.aws.amazon.com/config/latest/developerguide/set-up-aggregator-cli.html",
"https://docs.aws.amazon.com/organizations/latest/userguide/services-that-can-integrate-config.html"
],
"Remediation": {
"Code": {
"CLI": "aws organizations register-delegated-administrator --account-id <ADMIN_ACCOUNT_ID> --service-principal config.amazonaws.com && aws configservice put-configuration-aggregator --configuration-aggregator-name org-aggregator --organization-aggregation-source RoleArn=<ROLE_ARN>,AllAwsRegions=true",
"NativeIaC": "",
"Other": "1. From the AWS Organizations management account, register the delegated administrator for config.amazonaws.com\n2. In the delegated admin account, open AWS Config\n3. Create a Configuration Aggregator and select Add my organization as the source\n4. Enable Include all AWS Regions\n5. Confirm an IAM role with AWSConfigRoleForOrganizations is attached\n6. Verify the aggregator status reaches SUCCEEDED for all member accounts",
"Terraform": ""
},
"Recommendation": {
"Text": "Register a **delegated administrator** for AWS Config via AWS Organizations and create at least one **Configuration Aggregator** with an OrganizationAggregationSource that covers **all AWS regions**. This centralizes configuration data across the organization for unified compliance and audit reporting.",
"Url": "https://hub.prowler.com/check/config_delegated_admin_and_org_aggregator_all_regions"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [
"config_recorder_all_regions_enabled",
"guardduty_delegated_admin_enabled_all_regions"
],
"Notes": "This check requires execution from the organization management account or delegated administrator account to access organization-level APIs."
}
@@ -1,115 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.config.config_client import config_client
from prowler.providers.aws.services.config.config_service import Aggregator
class config_delegated_admin_and_org_aggregator_all_regions(Check):
"""Ensure AWS Config has a delegated admin and an org aggregator covering all regions.
This check verifies that:
1. A delegated administrator is registered for the config.amazonaws.com
service principal via AWS Organizations.
2. At least one AWS Config Configuration Aggregator exists with an
OrganizationAggregationSource that covers all AWS regions
(AllAwsRegions=true).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check. One finding per
aggregator-region, or a single synthetic FAIL when no aggregators
exist in any region.
"""
findings = []
has_delegated_admin = (
bool(config_client.delegated_administrators)
and not config_client.delegated_administrators_lookup_failed
)
delegated_admin_unknown = config_client.delegated_administrators_lookup_failed
# No aggregators in any region: emit one synthetic FAIL anchored to the
# audited account in the default region.
if not config_client.aggregators:
synthetic = Aggregator(
name="unknown",
arn=config_client.get_unknown_arn(
region=config_client.region,
resource_type="config-aggregator",
),
region=config_client.region,
all_aws_regions=False,
aws_regions=None,
organization_aggregation_source_present=False,
)
report = Check_Report_AWS(metadata=self.metadata(), resource=synthetic)
if delegated_admin_unknown:
delegated_state = (
"delegated administrator status could not be determined"
)
elif has_delegated_admin:
delegated_state = "delegated administrator configured"
else:
delegated_state = (
"no delegated administrator registered for config.amazonaws.com"
)
report.status = "FAIL"
report.status_extended = (
f"AWS Config has no Organization Aggregator configured in any "
f"region ({delegated_state})."
)
findings.append(report)
return findings
for region, aggregators_in_region in config_client.aggregators.items():
for aggregator in aggregators_in_region:
report = Check_Report_AWS(metadata=self.metadata(), resource=aggregator)
org_aware = aggregator.organization_aggregation_source_present
covers_all = aggregator.all_aws_regions
issues = []
if delegated_admin_unknown:
issues.append(
"delegated administrator status for config.amazonaws.com "
"could not be determined"
)
elif not has_delegated_admin:
issues.append(
"no delegated administrator registered for config.amazonaws.com"
)
if not org_aware:
issues.append(
f"aggregator {aggregator.name} is not an organization aggregator"
)
elif not covers_all:
issues.append(
f"aggregator {aggregator.name} does not cover all AWS regions"
)
if issues:
report.status = "FAIL"
report.status_extended = (
f"AWS Config aggregator {aggregator.name} in region "
f"{region} has issues: {', '.join(issues)}."
)
else:
report.status = "PASS"
report.status_extended = (
f"AWS Config aggregator {aggregator.name} in region "
f"{region} is an organization aggregator covering all "
f"AWS regions with delegated admin configured."
)
# Support muting non-default regions if configured
if report.status == "FAIL" and (
config_client.audit_config.get("mute_non_default_regions", False)
and region != config_client.region
):
report.muted = True
findings.append(report)
return findings
@@ -1,6 +1,5 @@
from typing import Optional
from botocore.client import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.logger import logger
@@ -13,16 +12,10 @@ class Config(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.recorders = {}
self.aggregators: dict[str, list] = {}
self.delegated_administrators: list = []
self.delegated_administrators_lookup_failed: bool = False
self.__threading_call__(self.describe_configuration_recorders)
self.__threading_call__(
self._describe_configuration_recorder_status, self.recorders.values()
)
self.__threading_call__(self._describe_configuration_aggregators)
# Organizations API is not regional; single call.
self._list_config_delegated_administrators()
def _get_recorder_arn_template(self, region):
return f"arn:{self.audited_partition}:config:{region}:{self.audited_account}:recorder"
@@ -80,108 +73,6 @@ class Config(AWSService):
f"{recorder.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_configuration_aggregators(self, regional_client):
"""Describe AWS Config configuration aggregators per region.
An aggregator counts as organization-aware when its
OrganizationAggregationSource key is present in the response.
"""
logger.info("Config - Describing Configuration Aggregators...")
try:
paginator = regional_client.get_paginator(
"describe_configuration_aggregators"
)
region_aggregators: list = []
for page in paginator.paginate():
for aggregator in page.get("ConfigurationAggregators", []):
name = aggregator.get("ConfigurationAggregatorName", "")
arn = aggregator.get("ConfigurationAggregatorArn", "")
org_source = aggregator.get("OrganizationAggregationSource")
org_aware = org_source is not None
all_aws_regions = False
aws_regions: Optional[list] = None
if org_aware:
all_aws_regions = org_source.get("AllAwsRegions", False)
aws_regions = org_source.get("AwsRegions")
if not self.audit_resources or (
is_resource_filtered(arn, self.audit_resources)
):
region_aggregators.append(
Aggregator(
name=name,
arn=arn,
region=regional_client.region,
all_aws_regions=all_aws_regions,
aws_regions=aws_regions,
organization_aggregation_source_present=org_aware,
)
)
if region_aggregators:
self.aggregators[regional_client.region] = region_aggregators
except ClientError as error:
if error.response["Error"]["Code"] in (
"AccessDeniedException",
"AccessDenied",
):
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_config_delegated_administrators(self):
"""List delegated administrators for the AWS Config service principal.
Uses the Organizations API directly (not regional). Sets
delegated_administrators_lookup_failed to True on AccessDenied so callers
can surface the unknown delegated-admin state in findings.
"""
logger.info(
"Config - Listing delegated administrators for config.amazonaws.com..."
)
try:
org_client = self.session.client("organizations")
paginator = org_client.get_paginator("list_delegated_administrators")
for page in paginator.paginate(ServicePrincipal="config.amazonaws.com"):
for admin in page.get("DelegatedAdministrators", []):
self.delegated_administrators.append(
ConfigDelegatedAdministrator(
id=admin.get("Id", ""),
arn=admin.get("Arn", ""),
name=admin.get("Name", ""),
email=admin.get("Email", ""),
status=admin.get("Status", ""),
joined_method=admin.get("JoinedMethod", ""),
)
)
except ClientError as error:
error_code = error.response["Error"]["Code"]
if error_code in (
"AccessDeniedException",
"AccessDenied",
"AWSOrganizationsNotInUseException",
):
self.delegated_administrators_lookup_failed = True
logger.warning(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
self.delegated_administrators_lookup_failed = True
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self.delegated_administrators_lookup_failed = True
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Recorder(BaseModel):
name: str
@@ -189,25 +80,3 @@ class Recorder(BaseModel):
recording: Optional[bool]
last_status: Optional[str]
region: str
class Aggregator(BaseModel):
"""Represents an AWS Config Configuration Aggregator."""
name: str
arn: str
region: str
all_aws_regions: bool = False
aws_regions: Optional[list] = None
organization_aggregation_source_present: bool = False
class ConfigDelegatedAdministrator(BaseModel):
"""Represents a delegated administrator registered for config.amazonaws.com."""
id: str
arn: str
name: str
email: str
status: str
joined_method: str
@@ -1,39 +0,0 @@
{
"Provider": "aws",
"CheckID": "sagemaker_clarify_exists",
"CheckTitle": "Amazon SageMaker Clarify processing jobs exist in the region",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "sagemaker",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Other",
"ResourceGroup": "ai_ml",
"Description": "**SageMaker Clarify** provides bias detection and model explainability for ML workloads.\n\nThis check verifies that at least one SageMaker processing job using the AWS-managed Clarify container image exists in each successfully scanned region. The absence of Clarify jobs indicates that responsible-AI controls such as bias detection and explainability are not in place.",
"Risk": "Without **SageMaker Clarify** processing jobs, ML models may be deployed without bias analysis or explainability reports. This can lead to:\n- **Regulatory non-compliance** with AI governance frameworks\n- **Undetected bias** in model predictions affecting protected groups\n- **Lack of accountability** for ML model decisions in production",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html",
"https://docs.aws.amazon.com/sagemaker/latest/dg-ecr-paths/sagemaker-algo-docker-registry-paths.html"
],
"Remediation": {
"Code": {
"CLI": "aws sagemaker create-processing-job --processing-job-name clarify-bias-check --app-specification ImageUri=<clarify-image-uri> --role-arn <role-arn> --processing-resources 'ClusterConfig={InstanceCount=1,InstanceType=ml.m5.xlarge,VolumeSizeInGB=20}'",
"NativeIaC": "",
"Other": "1. Open the AWS Console and go to Amazon SageMaker\n2. Navigate to Processing > Processing jobs\n3. Click Create processing job\n4. Select the SageMaker Clarify container image for your region\n5. Configure input/output paths and the analysis configuration\n6. Click Create processing job",
"Terraform": ""
},
"Recommendation": {
"Text": "Create SageMaker Clarify processing jobs to evaluate models for bias and explainability before deployment. Integrate Clarify into your ML pipeline to ensure responsible AI practices.",
"Url": "https://hub.prowler.com/check/sagemaker_clarify_exists"
}
},
"Categories": [
"gen-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Results are generated per scanned region. Regions where `ListProcessingJobs` cannot be queried are omitted from the findings."
}
@@ -1,54 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.sagemaker.sagemaker_client import sagemaker_client
class sagemaker_clarify_exists(Check):
"""Check whether at least one SageMaker Clarify processing job exists per region.
A region is reported only when ListProcessingJobs succeeded for it; regions
where the API call failed (e.g. AccessDenied, unsupported region) are
skipped at the service layer and produce no finding.
- PASS: At least one processing job uses the AWS-managed Clarify container
image in the region (one finding per job).
- FAIL: No processing job uses the Clarify container image in the region
(one finding per region).
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the SageMaker Clarify exists check.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for region in sorted(sagemaker_client.processing_jobs_scanned_regions):
clarify_jobs = sorted(
(
job
for job in sagemaker_client.sagemaker_processing_jobs
if job.region == region
and job.image_uri
and "sagemaker-clarify-processing" in job.image_uri
),
key=lambda job: job.name,
)
if clarify_jobs:
for job in clarify_jobs:
report = Check_Report_AWS(metadata=self.metadata(), resource=job)
report.status = "PASS"
report.status_extended = f"SageMaker Clarify processing job {job.name} exists in region {region}."
findings.append(report)
else:
report = Check_Report_AWS(metadata=self.metadata(), resource={})
report.region = region
report.resource_id = "sagemaker-clarify"
report.resource_arn = f"arn:{sagemaker_client.audited_partition}:sagemaker:{region}:{sagemaker_client.audited_account}:processing-job"
report.status = "FAIL"
report.status_extended = (
f"No SageMaker Clarify processing jobs found in region {region}."
)
findings.append(report)
return findings
@@ -15,8 +15,6 @@ class SageMaker(AWSService):
self.sagemaker_notebook_instances = []
self.sagemaker_models = []
self.sagemaker_training_jobs = []
self.sagemaker_processing_jobs = []
self.processing_jobs_scanned_regions = set()
self.sagemaker_domains = []
self.endpoint_configs = {}
self.sagemaker_model_registries = []
@@ -26,7 +24,6 @@ class SageMaker(AWSService):
self.__threading_call__(self._list_notebook_instances)
self.__threading_call__(self._list_models)
self.__threading_call__(self._list_training_jobs)
self.__threading_call__(self._list_processing_jobs)
self.__threading_call__(self._list_endpoint_configs)
self.__threading_call__(self._list_domains)
self.__threading_call__(self._list_model_package_groups)
@@ -40,9 +37,6 @@ class SageMaker(AWSService):
self.__threading_call__(
self._describe_training_job, self.sagemaker_training_jobs
)
self.__threading_call__(
self._describe_processing_job, self.sagemaker_processing_jobs
)
self.__threading_call__(
self._describe_endpoint_config, list(self.endpoint_configs.values())
)
@@ -57,9 +51,6 @@ class SageMaker(AWSService):
self.__threading_call__(
self._list_tags_for_resource, self.sagemaker_training_jobs
)
self.__threading_call__(
self._list_tags_for_resource, self.sagemaker_processing_jobs
)
self.__threading_call__(
self._list_tags_for_resource, list(self.endpoint_configs.values())
)
@@ -137,66 +128,6 @@ class SageMaker(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_processing_jobs(self, regional_client):
"""List SageMaker processing jobs in a region.
Populates ``self.sagemaker_processing_jobs`` with `ProcessingJob`
entries and adds ``regional_client.region`` to
``self.processing_jobs_scanned_regions`` once pagination succeeds, so
regions where ``ListProcessingJobs`` fails are skipped by checks that
consume that set.
Args:
regional_client: Regional SageMaker boto3 client.
"""
logger.info("SageMaker - listing processing jobs...")
try:
list_processing_jobs_paginator = regional_client.get_paginator(
"list_processing_jobs"
)
for page in list_processing_jobs_paginator.paginate():
for processing_job in page["ProcessingJobSummaries"]:
if not self.audit_resources or (
is_resource_filtered(
processing_job["ProcessingJobArn"], self.audit_resources
)
):
self.sagemaker_processing_jobs.append(
ProcessingJob(
name=processing_job["ProcessingJobName"],
region=regional_client.region,
arn=processing_job["ProcessingJobArn"],
)
)
self.processing_jobs_scanned_regions.add(regional_client.region)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_processing_job(self, processing_job):
"""Describe a SageMaker processing job and enrich its image metadata.
Reads ``AppSpecification.ImageUri`` from ``DescribeProcessingJob`` and
stores it on ``processing_job.image_uri``. Errors are logged and
swallowed so a failure in one job does not abort the scan.
Args:
processing_job: ProcessingJob model to enrich in-place.
"""
logger.info("SageMaker - describing processing job...")
try:
regional_client = self.regional_clients[processing_job.region]
describe_processing_job = regional_client.describe_processing_job(
ProcessingJobName=processing_job.name
)
app_spec = describe_processing_job.get("AppSpecification", {})
processing_job.image_uri = app_spec.get("ImageUri")
except Exception as error:
logger.error(
f"{processing_job.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_notebook_instance(self, notebook_instance):
logger.info("SageMaker - describing notebook instances...")
try:
@@ -520,25 +451,6 @@ class TrainingJob(BaseModel):
tags: Optional[list] = []
class ProcessingJob(BaseModel):
"""Represents a SageMaker processing job.
Attributes:
name: Processing job name.
region: AWS region where the job lives.
arn: Processing job ARN.
image_uri: Container image URI from `AppSpecification.ImageUri`,
populated by `_describe_processing_job`.
tags: Resource tags, populated by `_list_tags_for_resource`.
"""
name: str
region: str
arn: str
image_uri: Optional[str] = None
tags: Optional[list] = []
class ProductionVariant(BaseModel):
name: str
initial_instance_count: int
@@ -1,44 +0,0 @@
{
"Provider": "aws",
"CheckID": "securityhub_delegated_admin_enabled_all_regions",
"CheckTitle": "Security Hub has delegated admin configured and is enabled in all regions with organization auto-enable",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "securityhub",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "AwsSecurityHubHub",
"ResourceGroup": "security",
"Description": "**AWS Security Hub** has a delegated administrator configured at the organization level, hubs are active in all opted-in regions, and organization auto-enable is active so that new member accounts are automatically enrolled.",
"Risk": "Without org-wide **AWS Security Hub** configuration, findings can be aggregated inconsistently, delegated admin may be missing in some regions, and new accounts will not be auto-enrolled. This fragments **security posture visibility**, delays **incident response**, and lets misconfigurations and compliance drift go undetected across the organization.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/securityhub/latest/userguide/designate-orgs-admin-account.html",
"https://docs.aws.amazon.com/securityhub/latest/userguide/accounts-orgs-auto-enable.html",
"https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-regions.html"
],
"Remediation": {
"Code": {
"CLI": "aws securityhub enable-organization-admin-account --admin-account-id <ADMIN_ACCOUNT_ID> && aws securityhub update-organization-configuration --auto-enable --auto-enable-standards DEFAULT",
"NativeIaC": "",
"Other": "1. Sign in to the AWS Organizations management account\n2. Open the AWS Organizations console\n3. Navigate to Services > AWS Security Hub\n4. Click Register delegated administrator and enter the security account ID\n5. Switch to the delegated admin account\n6. In Security Hub console, go to Settings > Accounts\n7. Enable auto-enable for new organization accounts\n8. Repeat hub enablement for all opted-in regions",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure a **delegated administrator** for AWS Security Hub via AWS Organizations. Enable Security Hub in **all opted-in regions** and turn on **auto-enable** so new member accounts are automatically enrolled. This ensures uniform security posture monitoring across the entire organization.",
"Url": "https://hub.prowler.com/check/securityhub_delegated_admin_enabled_all_regions"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [
"securityhub_enabled",
"guardduty_delegated_admin_enabled_all_regions"
],
"Notes": "This check requires execution from the organization management account or delegated administrator account to access organization-level APIs."
}
@@ -1,84 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.securityhub.securityhub_client import (
securityhub_client,
)
class securityhub_delegated_admin_enabled_all_regions(Check):
"""Ensure Security Hub has a delegated admin and is enabled in all regions.
This check verifies that:
1. A delegated administrator account is configured for Security Hub
2. Security Hub is active (ACTIVE status) in each region
3. Organization auto-enable is configured for new member accounts
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the check logic.
Returns:
A list of reports containing the result of the check for each region.
"""
findings = []
# Build a set of regions that have an organization admin account configured
regions_with_admin = {
admin.region
for admin in securityhub_client.organization_admin_accounts
if admin.admin_status == "ENABLED"
}
admin_lookup_failed = securityhub_client.organization_admin_lookup_failed
for securityhub in securityhub_client.securityhubs:
report = Check_Report_AWS(metadata=self.metadata(), resource=securityhub)
# Check if this region has a delegated admin
has_delegated_admin = securityhub.region in regions_with_admin
# Check if hub is active
hub_active = securityhub.status == "ACTIVE"
# Check if auto-enable is configured for organization members
auto_enable_on = securityhub.organization_auto_enable
# Determine overall status
issues = []
if admin_lookup_failed:
issues.append("delegated administrator status could not be determined")
elif not has_delegated_admin:
issues.append("no delegated administrator configured")
if not hub_active:
issues.append("Security Hub not enabled")
if (
hub_active
and securityhub.organization_config_available
and not auto_enable_on
):
# Only report auto-enable issue if hub is active and org config data
# is available (i.e., we could actually read AutoEnable from the API).
issues.append("organization auto-enable not configured")
if issues:
report.status = "FAIL"
report.status_extended = (
f"Security Hub in region {securityhub.region} has issues: "
f"{', '.join(issues)}."
)
else:
report.status = "PASS"
report.status_extended = (
f"Security Hub in region {securityhub.region} has delegated "
f"admin configured with hub active and organization auto-enable "
f"enabled."
)
# Support muting non-default regions if configured
if report.status == "FAIL" and (
securityhub_client.audit_config.get("mute_non_default_regions", False)
and securityhub.region != securityhub_client.region
):
report.muted = True
findings.append(report)
return findings
@@ -13,14 +13,8 @@ class SecurityHub(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.securityhubs = []
self.organization_admin_accounts = []
self.organization_admin_lookup_failed: bool = False
self.__threading_call__(self._describe_hub)
self.__threading_call__(self._list_tags, self.securityhubs)
self.__threading_call__(self._list_organization_admin_accounts)
self.__threading_call__(
self._describe_organization_configuration, self.securityhubs
)
def _describe_hub(self, regional_client):
logger.info("SecurityHub - Describing Hub...")
@@ -110,95 +104,6 @@ class SecurityHub(AWSService):
f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_organization_admin_accounts(self, regional_client):
"""List Security Hub delegated administrator accounts for the organization.
This API is only available to the organization management account or
a delegated administrator account.
"""
logger.info("SecurityHub - listing organization admin accounts...")
try:
paginator = regional_client.get_paginator(
"list_organization_admin_accounts"
)
for page in paginator.paginate():
for admin in page.get("AdminAccounts", []):
admin_account = OrganizationAdminAccount(
admin_account_id=admin.get("AdminAccountId"),
admin_status=admin.get("AdminStatus"),
region=regional_client.region,
)
# Avoid duplicates across regions for the same admin account
if not any(
existing.admin_account_id == admin_account.admin_account_id
and existing.region == admin_account.region
for existing in self.organization_admin_accounts
):
self.organization_admin_accounts.append(admin_account)
except ClientError as error:
self.organization_admin_lookup_failed = True
if error.response["Error"]["Code"] in (
"AccessDeniedException",
"InvalidAccessException",
"BadRequestException",
):
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
self.organization_admin_lookup_failed = True
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _describe_organization_configuration(self, securityhub):
"""Describe the organization configuration for a Security Hub instance.
This provides information about auto-enable settings for the organization.
Only invoked for hubs in ACTIVE status.
"""
logger.info("SecurityHub - describing organization configuration...")
try:
if securityhub.status != "ACTIVE":
return
regional_client = self.regional_clients[securityhub.region]
org_config = regional_client.describe_organization_configuration()
securityhub.organization_auto_enable = org_config.get("AutoEnable", False)
securityhub.auto_enable_standards = org_config.get(
"AutoEnableStandards", "NONE"
)
securityhub.organization_config_available = True
except ClientError as error:
if error.response["Error"]["Code"] in (
"AccessDeniedException",
"InvalidAccessException",
"BadRequestException",
):
# Expected when not running from management or delegated admin account
logger.warning(
f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class OrganizationAdminAccount(BaseModel):
"""Represents a Security Hub delegated administrator account."""
admin_account_id: str
admin_status: str # ENABLED or DISABLE_IN_PROGRESS
region: str
class SecurityHubHub(BaseModel):
arn: str
@@ -207,8 +112,4 @@ class SecurityHubHub(BaseModel):
standards: str
integrations: str
region: str
tags: Optional[list] = []
# Organization configuration fields
organization_auto_enable: bool = False
auto_enable_standards: str = "NONE"
organization_config_available: bool = False
tags: Optional[list]
@@ -1,38 +0,0 @@
{
"Provider": "azure",
"CheckID": "cosmosdb_account_automatic_failover_enabled",
"CheckTitle": "Cosmos DB account has automatic failover enabled",
"CheckType": [],
"ServiceName": "cosmosdb",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "microsoft.documentdb/databaseaccounts",
"ResourceGroup": "database",
"Description": "**Azure Cosmos DB accounts** are evaluated for **automatic failover** configuration. When enabled, Cosmos DB automatically promotes a secondary region to primary during a regional outage, ensuring continuous availability without manual intervention.",
"Risk": "Without **automatic failover**, a regional outage requires **manual failover** which delays recovery and risks data unavailability. Applications dependent on the primary region experience downtime until an operator intervenes.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/azure/cosmos-db/how-to-manage-database-account#automatic-failover",
"https://learn.microsoft.com/en-us/azure/cosmos-db/high-availability",
"https://learn.microsoft.com/en-us/azure/cosmos-db/distribute-data-globally"
],
"Remediation": {
"Code": {
"CLI": "az cosmosdb update --name <COSMOS_ACCOUNT_NAME> --resource-group <RESOURCE_GROUP> --enable-automatic-failover true",
"NativeIaC": "```bicep\n// Bicep: Enable automatic failover on a Cosmos DB account\nresource account 'Microsoft.DocumentDB/databaseAccounts@2025-10-15' = {\n name: '<example_resource_name>'\n location: resourceGroup().location\n kind: 'GlobalDocumentDB'\n properties: {\n databaseAccountOfferType: 'Standard'\n locations: [\n { locationName: '<primary_region>', failoverPriority: 0 }\n { locationName: '<secondary_region>', failoverPriority: 1 }\n ]\n enableAutomaticFailover: true // Critical: Promotes a secondary region during a primary region outage\n }\n}\n```",
"Other": "1. Sign in to the Azure portal and open your Cosmos DB account\n2. In the left menu, select Replicate data globally\n3. Click Automatic Failover\n4. Toggle Enable Automatic Failover to On\n5. Set failover priorities for each region\n6. Click Save",
"Terraform": "```hcl\n# Terraform: Enable automatic failover on a Cosmos DB account\nresource \"azurerm_cosmosdb_account\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n resource_group_name = \"<example_resource_name>\"\n location = \"<primary_region>\"\n offer_type = \"Standard\"\n kind = \"GlobalDocumentDB\"\n\n geo_location {\n location = \"<primary_region>\"\n failover_priority = 0\n }\n\n geo_location {\n location = \"<secondary_region>\"\n failover_priority = 1\n }\n\n enable_automatic_failover = true # Critical: Promotes a secondary region during a primary region outage\n}\n```"
},
"Recommendation": {
"Text": "Enable **automatic failover** on Cosmos DB accounts with **multi-region** deployments so a secondary region is promoted automatically when the primary region becomes unavailable. Configure **failover priorities** to reflect your recovery strategy, validate **RTO/RPO** expectations with periodic failover drills, and combine with **multi-region writes** where active-active is required.",
"Url": "https://hub.prowler.com/check/cosmosdb_account_automatic_failover_enabled"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -1,29 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.cosmosdb.cosmosdb_client import cosmosdb_client
class cosmosdb_account_automatic_failover_enabled(Check):
"""Ensure that Cosmos DB accounts have automatic failover enabled."""
def execute(self) -> Check_Report_Azure:
"""Execute the Cosmos DB automatic failover check.
Iterates over every Cosmos DB account fetched by the service and reports
PASS when `enableAutomaticFailover` is True, FAIL otherwise.
Returns:
A list of Check_Report_Azure with one report per Cosmos DB account.
"""
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} does not have automatic failover enabled."
if account.enable_automatic_failover:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has automatic failover enabled."
findings.append(report)
return findings
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import List, Optional
from typing import List
from azure.mgmt.cosmosdb import CosmosDBManagementClient
@@ -36,29 +36,14 @@ class CosmosDB(AzureService):
name=private_endpoint_connection.name,
type=private_endpoint_connection.type,
)
for private_endpoint_connection in (
getattr(account, "private_endpoint_connections", [])
or []
for private_endpoint_connection in getattr(
account, "private_endpoint_connections", []
)
if private_endpoint_connection
],
disable_local_auth=getattr(
account, "disable_local_auth", False
),
enable_automatic_failover=getattr(
account, "enable_automatic_failover", False
),
backup_policy_type=getattr(
getattr(account, "backup_policy", None),
"type",
None,
),
public_network_access=getattr(
account, "public_network_access", None
),
minimal_tls_version=getattr(
account, "minimal_tls_version", None
),
)
)
except Exception as error:
@@ -86,7 +71,3 @@ class Account:
location: str
private_endpoint_connections: List[PrivateEndpointConnection]
disable_local_auth: bool = False
enable_automatic_failover: bool = False
backup_policy_type: Optional[str] = None
public_network_access: Optional[str] = None
minimal_tls_version: Optional[str] = None
@@ -1,38 +0,0 @@
{
"Provider": "gcp",
"CheckID": "cloudsql_instance_high_availability_enabled",
"CheckTitle": "Cloud SQL instance has high availability (REGIONAL) configured",
"CheckType": [],
"ServiceName": "cloudsql",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "sqladmin.googleapis.com/Instance",
"Description": "Ensures that Cloud SQL instances have high availability configured by setting availabilityType to REGIONAL. A REGIONAL instance maintains a standby replica in a different zone within the same region and automatically fails over on zone-level outages.",
"Risk": "Instances with ZONAL availability have no standby replica. A zone-level outage will cause database downtime until manual recovery, violating availability requirements and potentially breaching SLAs and ISMS-P 2.12.1 disaster preparedness controls.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://cloud.google.com/sql/docs/postgres/high-availability",
"https://cloud.google.com/sql/docs/sqlserver/high-availability"
],
"Remediation": {
"Code": {
"CLI": "gcloud sql instances patch <INSTANCE_NAME> --availability-type=REGIONAL",
"NativeIaC": "",
"Other": "1. Go to Google Cloud Console > SQL > Instances.\n2. Click the instance name, then Edit.\n3. Under Availability, select Multiple zones (Highly available).\n4. Click Save.",
"Terraform": "```hcl\nresource \"google_sql_database_instance\" \"example\" {\n name = \"<instance_name>\"\n database_version = \"POSTGRES_15\"\n region = \"<region>\"\n\n settings {\n tier = \"db-custom-2-7680\"\n\n availability_type = \"REGIONAL\" # Critical: enables HA standby replica\n\n backup_configuration {\n enabled = true\n start_time = \"02:00\"\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Set availabilityType to REGIONAL for all production Cloud SQL instances. This creates a standby replica in a different zone and enables automatic failover, reducing RTO in the event of a zone outage.",
"Url": "https://hub.prowler.com/check/cloudsql_instance_high_availability_enabled"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [
"cloudsql_instance_automated_backups"
],
"Notes": "Enabling HA increases instance cost approximately 2x due to the standby replica. ZONAL instances are acceptable for non-production workloads where downtime is tolerable."
}
@@ -1,41 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_GCP
from prowler.providers.gcp.services.cloudsql.cloudsql_client import cloudsql_client
class cloudsql_instance_high_availability_enabled(Check):
"""Check that Cloud SQL primary instances are configured for high availability.
Verifies that each Cloud SQL primary instance has `availabilityType` set to
`REGIONAL`, which provisions a standby replica in a different zone within
the same region and enables automatic failover on zone-level outages. Read
replicas are skipped because they inherit availability from their primary.
"""
def execute(self) -> list[Check_Report_GCP]:
"""Execute the high availability check across all Cloud SQL instances.
Returns:
A list of `Check_Report_GCP` findings, one per Cloud SQL primary
instance. Status is `PASS` when `availability_type == "REGIONAL"`
and `FAIL` otherwise.
"""
findings = []
for instance in cloudsql_client.instances:
if instance.instance_type != "CLOUD_SQL_INSTANCE":
continue
report = Check_Report_GCP(metadata=self.metadata(), resource=instance)
if instance.availability_type == "REGIONAL":
report.status = "PASS"
report.status_extended = (
f"Database instance {instance.name} has high availability "
f"(REGIONAL) configured."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Database instance {instance.name} does not have high "
f"availability configured (current: "
f"{instance.availability_type})."
)
findings.append(report)
return findings
@@ -46,9 +46,6 @@ class CloudSQL(GCPService):
"authorizedNetworks", []
),
flags=settings.get("databaseFlags", []),
availability_type=settings.get(
"availabilityType", "ZONAL"
),
instance_type=instance.get(
"instanceType", "CLOUD_SQL_INSTANCE"
),
@@ -79,7 +76,6 @@ class Instance(BaseModel):
ssl_mode: str
automated_backups: bool
flags: list
availability_type: str = "ZONAL"
instance_type: str = "CLOUD_SQL_INSTANCE"
cmek_key_name: Optional[str] = None
project_id: str
@@ -1,39 +0,0 @@
{
"Provider": "oraclecloud",
"CheckID": "identity_storage_service_level_admins_scoped",
"CheckTitle": "OCI IAM storage service-level admin policies exclude delete permissions",
"CheckType": [],
"ServiceName": "identity",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "Policy",
"ResourceGroup": "IAM",
"Description": "**OCI IAM policies** are reviewed to ensure storage service-level administrator statements that grant `manage` permissions exclude the relevant storage delete permissions with `request.permission`. This supports CIS OCI 3.1 control 1.15 separation of duties for Block Volume, File Storage, and Object Storage administrators.",
"Risk": "Storage service-level administrators with unrestricted `manage` permissions can delete the resources they administer, including volumes, backups, file systems, mount targets, export sets, objects, or buckets. This weakens separation of duties and can lead to data loss, service disruption, or destructive insider activity.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.oracle.com/en-us/iaas/Content/Identity/policyreference/policyreference.htm",
"https://docs.oracle.com/en-us/iaas/Content/Block/home.htm",
"https://docs.oracle.com/en-us/iaas/Content/File/home.htm",
"https://docs.oracle.com/en-us/iaas/Content/Object/home.htm"
],
"Remediation": {
"Code": {
"CLI": "oci iam policy update --policy-id <policy-ocid> --statements \"[\\\"Allow group VolumeUsers to manage volumes in tenancy where request.permission!='VOLUME_DELETE'\\\"]\"",
"NativeIaC": "",
"Other": "1. In OCI Console, go to Identity & Security > Policies\n2. Open each active policy that grants storage service-level administrators `manage` permissions\n3. Edit storage manage statements to exclude the relevant delete permission with `request.permission`\n4. Example: Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE'\n5. Save changes",
"Terraform": "```hcl\nresource \"oci_identity_policy\" \"storage_admins\" {\n compartment_id = var.compartment_id\n name = \"storage-admins\"\n description = \"Storage administrators without delete permissions\"\n\n statements = [\n \"Allow group VolumeUsers to manage volumes in tenancy where request.permission!='VOLUME_DELETE'\",\n \"Allow group VolumeUsers to manage volume-backups in tenancy where request.permission!='VOLUME_BACKUP_DELETE'\",\n \"Allow group FileUsers to manage file-systems in tenancy where request.permission!='FILE_SYSTEM_DELETE'\",\n \"Allow group FileUsers to manage mount-targets in tenancy where request.permission!='MOUNT_TARGET_DELETE'\",\n \"Allow group FileUsers to manage export-sets in tenancy where request.permission!='EXPORT_SET_DELETE'\",\n \"Allow group BucketUsers to manage objects in tenancy where request.permission!='OBJECT_DELETE'\",\n \"Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE'\"\n ]\n}\n```"
},
"Recommendation": {
"Text": "Exclude delete permissions from storage service-level administrator policies. Use `request.permission!='VOLUME_DELETE'`, `request.permission!='VOLUME_BACKUP_DELETE'`, `request.permission!='FILE_SYSTEM_DELETE'`, `request.permission!='MOUNT_TARGET_DELETE'`, `request.permission!='EXPORT_SET_DELETE'`, `request.permission!='OBJECT_DELETE'`, and `request.permission!='BUCKET_DELETE'` as appropriate for each storage manage statement.",
"Url": "https://hub.prowler.com/check/identity_storage_service_level_admins_scoped"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -1,176 +0,0 @@
"""Check storage service-level administrators cannot delete managed resources."""
import re
from prowler.lib.check.models import Check, Check_Report_OCI
from prowler.providers.oraclecloud.services.identity.identity_client import (
identity_client,
)
STORAGE_DELETE_PERMISSIONS_BY_RESOURCE = {
"volumes": {"VOLUME_DELETE"},
"volume-backups": {"VOLUME_BACKUP_DELETE"},
"file-systems": {"FILE_SYSTEM_DELETE"},
"mount-targets": {"MOUNT_TARGET_DELETE"},
"export-sets": {"EXPORT_SET_DELETE"},
"objects": {"OBJECT_DELETE"},
"buckets": {"BUCKET_DELETE"},
"volume-family": {"VOLUME_DELETE", "VOLUME_BACKUP_DELETE"},
"file-family": {"FILE_SYSTEM_DELETE", "MOUNT_TARGET_DELETE", "EXPORT_SET_DELETE"},
"object-family": {"OBJECT_DELETE", "BUCKET_DELETE"},
}
ALL_STORAGE_DELETE_PERMISSIONS = set().union(
*STORAGE_DELETE_PERMISSIONS_BY_RESOURCE.values()
)
STORAGE_DELETE_PERMISSIONS_BY_RESOURCE["all-resources"] = ALL_STORAGE_DELETE_PERMISSIONS
MANAGE_STATEMENT_PATTERN = re.compile(
r"\ballow\s+group\b.+?\bto\s+manage\s+(?P<resource>[a-z-]+)\b",
re.IGNORECASE,
)
QUOTED_LITERAL_PATTERN = re.compile(r"'(?:\\.|[^'\\])*'|\"(?:\\.|[^\"\\])*\"")
def _normalize_statement(statement: str) -> str:
"""Collapse whitespace in an OCI policy statement."""
return " ".join(statement.strip().split())
def _has_disjunctive_condition(statement: str) -> bool:
"""Return True when the WHERE condition can allow alternate branches."""
condition = re.split(r"\bwhere\b", statement, flags=re.IGNORECASE, maxsplit=1)
if len(condition) != 2:
return False
condition_without_literals = QUOTED_LITERAL_PATTERN.sub("", condition[1])
return bool(
re.search(r"\b(any|or)\b|\|\|", condition_without_literals, re.IGNORECASE)
)
def _storage_manage_resource(statement: str) -> str | None:
"""Return the managed storage resource in a policy statement, if any."""
normalized_statement = _normalize_statement(statement)
match = MANAGE_STATEMENT_PATTERN.search(normalized_statement)
if not match:
return None
resource = match.group("resource").lower()
if resource not in STORAGE_DELETE_PERMISSIONS_BY_RESOURCE:
return None
return resource
def _excluded_permissions(statement: str) -> set[str]:
"""Return delete permissions explicitly excluded with request.permission != value."""
if _has_disjunctive_condition(statement):
return set()
exclusions = set()
for permission in ALL_STORAGE_DELETE_PERMISSIONS:
pattern = re.compile(
rf"\brequest\.permission\s*!=\s*['\"]?{re.escape(permission)}['\"]?\b",
re.IGNORECASE,
)
if pattern.search(statement):
exclusions.add(permission)
return exclusions
def _missing_delete_exclusions(statement: str) -> tuple[str, set[str]] | None:
"""Return the storage resource and missing delete exclusions for a statement."""
normalized_statement = _normalize_statement(statement)
resource = _storage_manage_resource(normalized_statement)
if not resource:
return None
required_permissions = STORAGE_DELETE_PERMISSIONS_BY_RESOURCE[resource]
excluded_permissions = _excluded_permissions(normalized_statement)
missing_permissions = required_permissions - excluded_permissions
if not missing_permissions:
return None
return resource, missing_permissions
class identity_storage_service_level_admins_scoped(Check):
"""Ensure storage service-level admins cannot delete resources they manage."""
def execute(self) -> list[Check_Report_OCI]:
"""Execute the storage service-level administrators scoped check.
Returns:
A list of OCI check reports for active non-tenant-admin policies.
"""
findings = []
for policy in identity_client.policies:
if policy.lifecycle_state != "ACTIVE":
continue
if policy.name.upper() == "TENANT ADMIN POLICY":
continue
region = policy.region if hasattr(policy, "region") else "global"
violations = []
has_storage_manage_statement = False
for statement in policy.statements:
if _storage_manage_resource(statement):
has_storage_manage_statement = True
missing_result = _missing_delete_exclusions(statement)
if not missing_result:
continue
resource, missing_permissions = missing_result
violations.append(
f"statement `{_normalize_statement(statement)}` manages {resource} without excluding: {', '.join(sorted(missing_permissions))}"
)
if not has_storage_manage_statement:
continue
report = Check_Report_OCI(
metadata=self.metadata(),
resource=policy,
region=region,
resource_id=policy.id,
resource_name=policy.name,
compartment_id=policy.compartment_id,
)
if violations:
report.status = "FAIL"
report.status_extended = (
f"Policy '{policy.name}' allows storage service-level administrators to manage storage resources without explicitly excluding required delete permissions: "
+ "; ".join(violations)
+ "."
)
else:
report.status = "PASS"
report.status_extended = f"Policy '{policy.name}' excludes required storage delete permissions from storage manage statements."
findings.append(report)
if not findings:
region = (
identity_client.audited_regions[0].key
if identity_client.audited_regions
else "global"
)
report = Check_Report_OCI(
metadata=self.metadata(),
resource={},
region=region,
resource_id=identity_client.audited_tenancy,
resource_name="Tenancy",
compartment_id=identity_client.audited_tenancy,
)
report.status = "PASS"
report.status_extended = "No active storage service-level administrator policies grant manage permissions without excluding delete permissions."
findings.append(report)
return findings
+1 -1
View File
@@ -124,7 +124,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">=3.10,<3.13"
version = "5.31.0"
version = "5.30.2"
[project.scripts]
prowler = "prowler.__main__:prowler"
@@ -1,491 +0,0 @@
from unittest.mock import patch
import botocore
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
orig = botocore.client.BaseClient._make_api_call
AGG_ARN_TEMPLATE = (
"arn:aws:config:{region}:" + AWS_ACCOUNT_NUMBER + ":config-aggregator/{name}"
)
def _aggregator_payload(
name, region, *, org_aware=True, all_regions=True, aws_regions=None
):
payload = {
"ConfigurationAggregatorName": name,
"ConfigurationAggregatorArn": AGG_ARN_TEMPLATE.format(region=region, name=name),
}
if org_aware:
org_source = {
"RoleArn": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/AWSConfigRoleForOrganizations",
"AllAwsRegions": all_regions,
}
if not all_regions and aws_regions:
org_source["AwsRegions"] = aws_regions
payload["OrganizationAggregationSource"] = org_source
return payload
def make_mock_no_aggregators_no_admin():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {"ConfigurationAggregators": []}
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregator_not_org_aware():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"legacy-agg",
AWS_REGION_EU_WEST_1,
org_aware=False,
)
]
}
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_org_aggregator_not_all_regions_with_admin():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"partial-org-agg",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=False,
aws_regions=[AWS_REGION_EU_WEST_1],
)
]
}
if operation_name == "ListDelegatedAdministrators":
return {
"DelegatedAdministrators": [
{
"Id": "123456789012",
"Arn": f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/o-abc123/123456789012",
"Email": "admin@example.com",
"Name": "Security",
"Status": "ACTIVE",
"JoinedMethod": "CREATED",
}
]
}
return orig(self, operation_name, api_params)
return _mock
def make_mock_full_pass():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"org-aggregator",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=True,
)
]
}
if operation_name == "ListDelegatedAdministrators":
return {
"DelegatedAdministrators": [
{
"Id": "123456789012",
"Arn": f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/o-abc123/123456789012",
"Email": "admin@example.com",
"Name": "Security",
"Status": "ACTIVE",
"JoinedMethod": "CREATED",
}
]
}
return orig(self, operation_name, api_params)
return _mock
def make_mock_access_denied_on_orgs():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"org-aggregator",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=True,
)
]
}
if operation_name == "ListDelegatedAdministrators":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "User is not authorized to perform: organizations:ListDelegatedAdministrators",
}
},
operation_name,
)
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregators_access_denied():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "denied",
}
},
operation_name,
)
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregators_other_client_error():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "InternalServerError",
"Message": "boom",
}
},
operation_name,
)
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_aggregators_unexpected_exception():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
raise RuntimeError("simulated transient error")
if operation_name == "ListDelegatedAdministrators":
return {"DelegatedAdministrators": []}
return orig(self, operation_name, api_params)
return _mock
def make_mock_delegated_admins_unexpected_exception():
def _mock(self, operation_name, api_params):
if operation_name == "DescribeConfigurationAggregators":
return {
"ConfigurationAggregators": [
_aggregator_payload(
"org-aggregator",
AWS_REGION_EU_WEST_1,
org_aware=True,
all_regions=True,
)
]
}
if operation_name == "ListDelegatedAdministrators":
raise RuntimeError("simulated transient error")
return orig(self, operation_name, api_params)
return _mock
class Test_config_delegated_admin_and_org_aggregator_all_regions:
@mock_aws
def test_no_aggregators_no_admin(self):
"""Test when no aggregators exist in any region and no delegated admin is set."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_no_aggregators_no_admin(),
):
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
"no Organization Aggregator configured in any region"
in result[0].status_extended
)
assert (
"no delegated administrator registered for config.amazonaws.com"
in result[0].status_extended
)
@mock_aws
def test_aggregator_not_org_aware(self):
"""Test when an aggregator exists but is not an organization aggregator."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregator_not_org_aware(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"is not an organization aggregator"
in eu_west_1_result.status_extended
)
@mock_aws
def test_org_aggregator_not_all_regions_with_admin(self):
"""Test org aggregator that doesn't cover all AWS regions (delegated admin set)."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_org_aggregator_not_all_regions_with_admin(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"does not cover all AWS regions" in eu_west_1_result.status_extended
)
@mock_aws
def test_full_pass(self):
"""Test PASS: delegated admin set and org aggregator covering all AWS regions."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_full_pass(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "PASS"
assert (
"is an organization aggregator covering all AWS regions"
in eu_west_1_result.status_extended
)
assert "delegated admin configured" in eu_west_1_result.status_extended
assert eu_west_1_result.resource_arn == AGG_ARN_TEMPLATE.format(
region=AWS_REGION_EU_WEST_1, name="org-aggregator"
)
@mock_aws
def test_access_denied_on_organizations(self):
"""Test that AccessDenied on Organizations is reported as unknown admin state."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_access_denied_on_orgs(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client",
new=Config(aws_provider),
),
):
from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import (
config_delegated_admin_and_org_aggregator_all_regions,
)
check = config_delegated_admin_and_org_aggregator_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
# The check still runs; aggregator coverage is satisfied but the
# delegated-admin status is unknown, so it must FAIL.
assert eu_west_1_result.status == "FAIL"
assert (
"delegated administrator status for config.amazonaws.com could not be determined"
in eu_west_1_result.status_extended
)
@mock_aws
def test_aggregators_access_denied(self):
"""AccessDenied on DescribeConfigurationAggregators is swallowed: no aggregators recorded for that region."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregators_access_denied(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.aggregators == {}
@mock_aws
def test_aggregators_other_client_error(self):
"""Non-access ClientError on DescribeConfigurationAggregators is logged at error level."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregators_other_client_error(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.aggregators == {}
@mock_aws
def test_aggregators_unexpected_exception(self):
"""Non-ClientError on DescribeConfigurationAggregators is caught by bare except."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_aggregators_unexpected_exception(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.aggregators == {}
@mock_aws
def test_delegated_admins_unexpected_exception(self):
"""Non-ClientError on ListDelegatedAdministrators must still set lookup_failed."""
with patch(
"botocore.client.BaseClient._make_api_call",
new=make_mock_delegated_admins_unexpected_exception(),
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.config.config_service import Config
service = Config(aws_provider)
assert service.delegated_administrators_lookup_failed is True
assert service.delegated_administrators == []
@@ -1,247 +0,0 @@
from unittest import mock
from prowler.providers.aws.services.sagemaker.sagemaker_service import ProcessingJob
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
CLARIFY_IMAGE_URI = f"{AWS_ACCOUNT_NUMBER}.dkr.ecr.{AWS_REGION_US_EAST_1}.amazonaws.com/sagemaker-clarify-processing:1.0"
NON_CLARIFY_IMAGE_URI = f"{AWS_ACCOUNT_NUMBER}.dkr.ecr.{AWS_REGION_US_EAST_1}.amazonaws.com/sagemaker-xgboost:1.0"
CUSTOM_CLARIFY_IMAGE_URI = f"{AWS_ACCOUNT_NUMBER}.dkr.ecr.{AWS_REGION_US_EAST_1}.amazonaws.com/my-clarify-thing:1.0"
PROCESSING_JOB_ARN = f"arn:aws:sagemaker:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:processing-job/clarify-job"
class Test_sagemaker_clarify_exists:
def test_no_processing_jobs_no_scanned_regions(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = []
sagemaker_client.processing_jobs_scanned_regions = set()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 0
def test_no_processing_jobs_region_scanned(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = []
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "sagemaker-clarify"
def test_non_clarify_processing_job(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="xgboost-job",
arn=f"arn:aws:sagemaker:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:processing-job/xgboost-job",
region=AWS_REGION_US_EAST_1,
image_uri=NON_CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_US_EAST_1}."
)
def test_custom_image_with_clarify_in_name_does_not_match(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="my-clarify-thing-job",
arn=f"arn:aws:sagemaker:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:processing-job/my-clarify-thing-job",
region=AWS_REGION_US_EAST_1,
image_uri=CUSTOM_CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_US_EAST_1}."
)
def test_clarify_processing_job_exists(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="clarify-job",
arn=PROCESSING_JOB_ARN,
region=AWS_REGION_US_EAST_1,
image_uri=CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {AWS_REGION_US_EAST_1}
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SageMaker Clarify processing job clarify-job exists in region {AWS_REGION_US_EAST_1}."
)
assert result[0].resource_id == "clarify-job"
assert result[0].resource_arn == PROCESSING_JOB_ARN
def test_mixed_regions(self):
sagemaker_client = mock.MagicMock
sagemaker_client.sagemaker_processing_jobs = [
ProcessingJob(
name="clarify-job",
arn=PROCESSING_JOB_ARN,
region=AWS_REGION_US_EAST_1,
image_uri=CLARIFY_IMAGE_URI,
)
]
sagemaker_client.processing_jobs_scanned_regions = {
AWS_REGION_US_EAST_1,
AWS_REGION_EU_WEST_1,
}
sagemaker_client.audited_partition = "aws"
sagemaker_client.audited_account = AWS_ACCOUNT_NUMBER
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists.sagemaker_client",
sagemaker_client,
),
):
from prowler.providers.aws.services.sagemaker.sagemaker_clarify_exists.sagemaker_clarify_exists import (
sagemaker_clarify_exists,
)
check = sagemaker_clarify_exists()
result = check.execute()
assert len(result) == 2
results_by_region = {r.region: r for r in result}
us_result = results_by_region[AWS_REGION_US_EAST_1]
assert us_result.status == "PASS"
assert (
us_result.status_extended
== f"SageMaker Clarify processing job clarify-job exists in region {AWS_REGION_US_EAST_1}."
)
eu_result = results_by_region[AWS_REGION_EU_WEST_1]
assert eu_result.status == "FAIL"
assert (
eu_result.status_extended
== f"No SageMaker Clarify processing jobs found in region {AWS_REGION_EU_WEST_1}."
)
@@ -396,13 +396,13 @@ class Test_SageMaker_Service:
sagemaker_service = SageMaker(audit_info)
# Check that __threading_call__ was called for _list_tags_for_resource
# (one for each resource type: models, notebooks, training jobs, processing jobs, endpoint configs, domains)
# (one for each resource type: models, notebooks, training jobs, endpoint configs, domains)
tag_calls = [
c
for c in mock_threading_call.call_args_list
if c[0][0] == sagemaker_service._list_tags_for_resource
]
assert len(tag_calls) == 6
assert len(tag_calls) == 5
# Test SageMaker list model package groups
def test_list_model_package_groups(self):
@@ -1,512 +0,0 @@
from unittest.mock import patch
import botocore
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
orig = botocore.client.BaseClient._make_api_call
HUB_ARN = f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:hub/default"
def _active_hub_responses(operation_name):
"""Return a moto-friendly response for hub-describing API calls.
Returns None if the operation is not one of the hub APIs (so the caller
can fall back to the default behavior).
"""
if operation_name == "DescribeHub":
return {
"HubArn": HUB_ARN,
"SubscribedAt": "2024-01-01T00:00:00.000Z",
"AutoEnableControls": True,
}
if operation_name == "GetEnabledStandards":
return {"StandardsSubscriptions": []}
if operation_name == "ListEnabledProductsForImport":
return {"ProductSubscriptions": []}
if operation_name == "ListTagsForResource":
return {"Tags": {}}
return None
def mock_make_api_call_org_admin_and_config(self, operation_name, api_params):
"""Mock organization admin accounts and configuration APIs - PASS scenario."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{
"AdminAccountId": "123456789012",
"AdminStatus": "ENABLED",
}
]
}
if operation_name == "DescribeOrganizationConfiguration":
return {
"AutoEnable": True,
"AutoEnableStandards": "DEFAULT",
}
return orig(self, operation_name, api_params)
def mock_make_api_call_org_admin_no_auto_enable(self, operation_name, api_params):
"""Mock organization admin configured but auto-enable disabled."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{
"AdminAccountId": "123456789012",
"AdminStatus": "ENABLED",
}
]
}
if operation_name == "DescribeOrganizationConfiguration":
return {
"AutoEnable": False,
"AutoEnableStandards": "NONE",
}
return orig(self, operation_name, api_params)
def mock_make_api_call_no_org_admin(self, operation_name, api_params):
"""Mock no organization admin configured."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {"AdminAccounts": []}
if operation_name == "DescribeOrganizationConfiguration":
return {
"AutoEnable": False,
"AutoEnableStandards": "NONE",
}
return orig(self, operation_name, api_params)
def mock_make_api_call_securityhub_not_subscribed(self, operation_name, api_params):
"""Simulate Security Hub not subscribed in the account (InvalidAccessException)."""
if operation_name == "DescribeHub":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "InvalidAccessException",
"Message": "Account is not subscribed to AWS Security Hub",
}
},
operation_name,
)
if operation_name == "ListOrganizationAdminAccounts":
return {"AdminAccounts": []}
return orig(self, operation_name, api_params)
def mock_make_api_call_admin_lookup_access_denied(self, operation_name, api_params):
"""Hub is ACTIVE but ListOrganizationAdminAccounts is denied — lookup-failed path."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
raise botocore.exceptions.ClientError(
{
"Error": {
"Code": "AccessDeniedException",
"Message": "User is not authorized to perform: securityhub:ListOrganizationAdminAccounts",
}
},
operation_name,
)
if operation_name == "DescribeOrganizationConfiguration":
return {"AutoEnable": True, "AutoEnableStandards": "DEFAULT"}
return orig(self, operation_name, api_params)
def mock_make_api_call_admin_lookup_unexpected(self, operation_name, api_params):
"""ListOrganizationAdminAccounts raises a non-ClientError — bare Exception branch."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
raise RuntimeError("simulated transient error")
if operation_name == "DescribeOrganizationConfiguration":
return {"AutoEnable": True, "AutoEnableStandards": "DEFAULT"}
return orig(self, operation_name, api_params)
def mock_make_api_call_describe_org_config_other_client_error(
self, operation_name, api_params
):
"""DescribeOrganizationConfiguration raises a non-access ClientError — else branch."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{"AdminAccountId": "123456789012", "AdminStatus": "ENABLED"}
]
}
if operation_name == "DescribeOrganizationConfiguration":
raise botocore.exceptions.ClientError(
{"Error": {"Code": "InternalServerError", "Message": "boom"}},
operation_name,
)
return orig(self, operation_name, api_params)
def mock_make_api_call_describe_org_config_unexpected(self, operation_name, api_params):
"""DescribeOrganizationConfiguration raises a non-ClientError — bare Exception branch."""
hub_resp = _active_hub_responses(operation_name)
if hub_resp is not None:
return hub_resp
if operation_name == "ListOrganizationAdminAccounts":
return {
"AdminAccounts": [
{"AdminAccountId": "123456789012", "AdminStatus": "ENABLED"}
]
}
if operation_name == "DescribeOrganizationConfiguration":
raise RuntimeError("simulated transient error")
return orig(self, operation_name, api_params)
class Test_securityhub_delegated_admin_enabled_all_regions:
def teardown_method(self):
"""Evict cached securityhub modules so legacy mock.patch-based tests
in the same session see a fresh import path."""
import sys
for mod in (
"prowler.providers.aws.services.securityhub.securityhub_client",
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions",
):
sys.modules.pop(mod, None)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_securityhub_not_subscribed,
)
@mock_aws
def test_no_securityhub(self):
"""Test when Security Hub is not subscribed in any region."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
# Should have findings for each region (with NOT_AVAILABLE hubs)
assert len(result) > 0
# All should fail since hub is not enabled
for finding in result:
assert finding.status == "FAIL"
assert "Security Hub not enabled" in finding.status_extended
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_org_admin,
)
@mock_aws
def test_securityhub_enabled_no_delegated_admin(self):
"""Test when Security Hub is enabled but no delegated admin is configured."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"no delegated administrator configured"
in eu_west_1_result.status_extended
)
assert eu_west_1_result.resource_arn == HUB_ARN
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_org_admin_no_auto_enable,
)
@mock_aws
def test_securityhub_enabled_with_admin_no_auto_enable(self):
"""Test when Security Hub is enabled with delegated admin but auto-enable is off."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"organization auto-enable not configured"
in eu_west_1_result.status_extended
)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_org_admin_and_config,
)
@mock_aws
def test_securityhub_enabled_with_admin_and_auto_enable(self):
"""Test when Security Hub is enabled with delegated admin and auto-enable on (PASS)."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "PASS"
assert "delegated admin configured" in eu_west_1_result.status_extended
assert "auto-enable" in eu_west_1_result.status_extended
assert eu_west_1_result.resource_arn == HUB_ARN
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_admin_lookup_access_denied,
)
@mock_aws
def test_admin_lookup_access_denied(self):
"""AccessDenied on ListOrganizationAdminAccounts must FAIL with unknown-admin message."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=SecurityHub(aws_provider),
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
check = securityhub_delegated_admin_enabled_all_regions()
result = check.execute()
eu_west_1_result = None
for finding in result:
if finding.region == AWS_REGION_EU_WEST_1:
eu_west_1_result = finding
break
assert eu_west_1_result is not None
assert eu_west_1_result.status == "FAIL"
assert (
"delegated administrator status could not be determined"
in eu_west_1_result.status_extended
)
assert (
"no delegated administrator configured"
not in eu_west_1_result.status_extended
)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_admin_lookup_unexpected,
)
@mock_aws
def test_admin_lookup_unexpected_exception(self):
"""Non-ClientError raised from ListOrganizationAdminAccounts still sets lookup_failed."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
service = SecurityHub(aws_provider)
assert service.organization_admin_lookup_failed is True
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=service,
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
result = securityhub_delegated_admin_enabled_all_regions().execute()
assert result and result[0].status == "FAIL"
assert (
"delegated administrator status could not be determined"
in result[0].status_extended
)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_describe_org_config_other_client_error,
)
@mock_aws
def test_describe_org_config_other_client_error(self):
"""Non-access ClientError on DescribeOrganizationConfiguration is logged at error level."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
service = SecurityHub(aws_provider)
# organization_config_available stays False, so the auto-enable issue is suppressed
assert service.securityhubs[0].organization_config_available is False
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=service,
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
result = securityhub_delegated_admin_enabled_all_regions().execute()
# Admin is configured and hub is active; with org config unavailable the
# check should PASS because there are no other detectable issues.
assert result and result[0].status == "PASS"
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_describe_org_config_unexpected,
)
@mock_aws
def test_describe_org_config_unexpected_exception(self):
"""Non-ClientError on DescribeOrganizationConfiguration is caught by bare except."""
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.securityhub.securityhub_service import (
SecurityHub,
)
service = SecurityHub(aws_provider)
assert service.securityhubs[0].organization_config_available is False
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
patch(
"prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client",
new=service,
),
):
from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import (
securityhub_delegated_admin_enabled_all_regions,
)
result = securityhub_delegated_admin_enabled_all_regions().execute()
assert result and result[0].status == "PASS"
@@ -1,115 +0,0 @@
from unittest import mock
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION_ID,
set_mocked_azure_provider,
)
class Test_cosmosdb_account_automatic_failover_enabled:
def test_no_subscriptions(self):
cosmosdb_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled.cosmosdb_client",
new=cosmosdb_client,
),
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled import (
cosmosdb_account_automatic_failover_enabled,
)
cosmosdb_client.accounts = {}
check = cosmosdb_account_automatic_failover_enabled()
result = check.execute()
assert len(result) == 0
def test_pass(self):
cosmosdb_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled.cosmosdb_client",
new=cosmosdb_client,
),
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled import (
cosmosdb_account_automatic_failover_enabled,
)
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import (
Account,
)
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id="/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.DocumentDB/databaseAccounts/test-account",
name="test-account",
kind="GlobalDocumentDB",
type="Microsoft.DocumentDB/databaseAccounts",
tags={},
is_virtual_network_filter_enabled=False,
location="eastus",
private_endpoint_connections=[],
disable_local_auth=False,
enable_automatic_failover=True,
)
]
}
check = cosmosdb_account_automatic_failover_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
def test_fail(self):
cosmosdb_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled.cosmosdb_client",
new=cosmosdb_client,
),
):
from prowler.providers.azure.services.cosmosdb.cosmosdb_account_automatic_failover_enabled.cosmosdb_account_automatic_failover_enabled import (
cosmosdb_account_automatic_failover_enabled,
)
from prowler.providers.azure.services.cosmosdb.cosmosdb_service import (
Account,
)
cosmosdb_client.accounts = {
AZURE_SUBSCRIPTION_ID: [
Account(
id="/subscriptions/sub1/resourceGroups/rg1/providers/Microsoft.DocumentDB/databaseAccounts/test-account",
name="test-account",
kind="GlobalDocumentDB",
type="Microsoft.DocumentDB/databaseAccounts",
tags={},
is_virtual_network_filter_enabled=False,
location="eastus",
private_endpoint_connections=[],
disable_local_auth=False,
enable_automatic_failover=False,
)
]
}
check = cosmosdb_account_automatic_failover_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
-2
View File
@@ -722,7 +722,6 @@ def mock_api_instances_calls(client: MagicMock, service: str):
},
"backupConfiguration": {"enabled": True},
"databaseFlags": [],
"availabilityType": "REGIONAL",
},
},
{
@@ -738,7 +737,6 @@ def mock_api_instances_calls(client: MagicMock, service: str):
},
"backupConfiguration": {"enabled": False},
"databaseFlags": [],
"availabilityType": "ZONAL",
},
},
]
@@ -1,205 +0,0 @@
from unittest import mock
from tests.providers.gcp.gcp_fixtures import (
GCP_EU1_LOCATION,
GCP_PROJECT_ID,
set_mocked_gcp_provider,
)
class Test_cloudsql_instance_high_availability_enabled:
"""Tests for the cloudsql_instance_high_availability_enabled check."""
def test_no_instances(self):
"""No Cloud SQL instances → no findings."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
cloudsql_client.instances = []
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 0
def test_instance_ha_enabled(self):
"""A REGIONAL primary instance → PASS."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-ha",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
availability_type="REGIONAL",
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "db-ha"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_instance_ha_disabled(self):
"""A ZONAL primary instance → FAIL with current availability in status_extended."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-zonal",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
availability_type="ZONAL",
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "ZONAL" in result[0].status_extended
assert result[0].resource_id == "db-zonal"
assert result[0].location == GCP_EU1_LOCATION
assert result[0].project_id == GCP_PROJECT_ID
def test_read_replica_skipped(self):
"""Read replicas (instance_type != CLOUD_SQL_INSTANCE) are skipped."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-replica",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
availability_type="ZONAL",
instance_type="READ_REPLICA_INSTANCE",
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 0
def test_instance_default_availability_type_fails(self):
"""An instance missing availabilityType defaults to ZONAL (service layer) and must FAIL."""
cloudsql_client = mock.MagicMock()
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_gcp_provider(),
),
mock.patch(
"prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled.cloudsql_client",
new=cloudsql_client,
),
):
from prowler.providers.gcp.services.cloudsql.cloudsql_instance_high_availability_enabled.cloudsql_instance_high_availability_enabled import (
cloudsql_instance_high_availability_enabled,
)
from prowler.providers.gcp.services.cloudsql.cloudsql_service import (
Instance,
)
cloudsql_client.instances = [
Instance(
name="db-default",
version="POSTGRES_15",
ip_addresses=[],
region=GCP_EU1_LOCATION,
public_ip=False,
require_ssl=False,
ssl_mode="ENCRYPTED_ONLY",
automated_backups=True,
authorized_networks=[],
flags=[],
project_id=GCP_PROJECT_ID,
# availability_type omitted → model default "ZONAL"
)
]
check = cloudsql_instance_high_availability_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "ZONAL" in result[0].status_extended
@@ -1,326 +0,0 @@
from datetime import datetime
from unittest import mock
import pytest
from prowler.lib.check.models import Check_Report_OCI
from prowler.providers.oraclecloud.services.identity.identity_service import Policy
from tests.providers.oraclecloud.oci_fixtures import (
OCI_COMPARTMENT_ID,
OCI_REGION,
OCI_TENANCY_ID,
set_mocked_oraclecloud_provider,
)
CHECK_PATH = "prowler.providers.oraclecloud.services.identity.identity_storage_service_level_admins_scoped.identity_storage_service_level_admins_scoped"
def _policy(
name: str, statements: list[str], lifecycle_state: str = "ACTIVE"
) -> Policy:
return Policy(
id=f"ocid1.policy.oc1..{name.lower().replace(' ', '-')}",
name=name,
description="Test policy",
compartment_id=OCI_COMPARTMENT_ID,
statements=statements,
time_created=datetime.now(),
lifecycle_state=lifecycle_state,
region=OCI_REGION,
)
def _identity_client(policies: list[Policy]) -> mock.MagicMock:
identity_client = mock.MagicMock()
identity_client.policies = policies
identity_client.audited_tenancy = OCI_TENANCY_ID
identity_client.audited_regions = [mock.MagicMock(key=OCI_REGION)]
return identity_client
def _run_check(policies: list[Policy]) -> list[Check_Report_OCI]:
identity_client = _identity_client(policies)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_oraclecloud_provider(),
),
mock.patch(f"{CHECK_PATH}.identity_client", new=identity_client),
):
from prowler.providers.oraclecloud.services.identity.identity_storage_service_level_admins_scoped.identity_storage_service_level_admins_scoped import (
identity_storage_service_level_admins_scoped,
)
return identity_storage_service_level_admins_scoped().execute()
class Test_identity_storage_service_level_admins_scoped:
def test_no_policies_passes_with_tenancy_finding(self):
result = _run_check([])
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == OCI_TENANCY_ID
assert result[0].resource_name == "Tenancy"
assert (
result[0].status_extended
== "No active storage service-level administrator policies grant manage permissions without excluding delete permissions."
)
def test_manage_volumes_without_delete_exclusion_fails(self):
result = _run_check(
[
_policy(
"Volume Admins",
["Allow group VolumeUsers to manage volumes in tenancy"],
)
]
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_name == "Volume Admins"
assert "VOLUME_DELETE" in result[0].status_extended
assert (
"Allow group VolumeUsers to manage volumes in tenancy"
in result[0].status_extended
)
def test_manage_volumes_with_delete_exclusion_passes(self):
result = _run_check(
[
_policy(
"Volume Admins",
[
"Allow group VolumeUsers to manage volumes in tenancy where request.permission!='VOLUME_DELETE'"
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Policy 'Volume Admins' excludes required storage delete permissions from storage manage statements."
)
def test_delete_exclusion_parser_is_case_and_whitespace_insensitive(self):
result = _run_check(
[
_policy(
"Volume Admins",
[
" allow group VolumeUsers TO manage volumes in tenancy WHERE request.permission != 'volume_delete' "
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
def test_generic_where_clause_does_not_pass(self):
result = _run_check(
[
_policy(
"Bucket Admins",
[
"Allow group BucketUsers to manage buckets in tenancy where request.region='iad'"
],
)
]
)
assert len(result) == 1
assert result[0].status == "FAIL"
assert "BUCKET_DELETE" in result[0].status_extended
assert "request.region='iad'" in result[0].status_extended
@pytest.mark.parametrize(
"statement",
[
"Allow group BucketUsers to manage buckets in tenancy where ANY {request.permission!='BUCKET_DELETE', request.region='iad'}",
"Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE' OR request.region='iad'",
],
)
def test_disjunctive_delete_exclusion_does_not_pass(self, statement):
result = _run_check([_policy("Bucket Admins", [statement])])
assert len(result) == 1
assert result[0].status == "FAIL"
assert "BUCKET_DELETE" in result[0].status_extended
def test_quoted_literals_do_not_make_delete_exclusion_disjunctive(self):
result = _run_check(
[
_policy(
"Bucket Admins",
[
"Allow group BucketUsers to manage buckets in tenancy where request.permission!='BUCKET_DELETE' and target.tag.namespace='any-tag'"
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
@pytest.mark.parametrize(
"resource,permission",
[
("file-systems", "FILE_SYSTEM_DELETE"),
("mount-targets", "MOUNT_TARGET_DELETE"),
("export-sets", "EXPORT_SET_DELETE"),
("volumes", "VOLUME_DELETE"),
("volume-backups", "VOLUME_BACKUP_DELETE"),
("objects", "OBJECT_DELETE"),
("buckets", "BUCKET_DELETE"),
],
)
def test_storage_resources_require_matching_delete_exclusion(
self, resource, permission
):
fail_result = _run_check(
[
_policy(
"Storage Admins",
[f"Allow group StorageUsers to manage {resource} in tenancy"],
)
]
)
pass_result = _run_check(
[
_policy(
"Storage Admins",
[
f"Allow group StorageUsers to manage {resource} in tenancy where request.permission != '{permission}'"
],
)
]
)
assert len(fail_result) == 1
assert fail_result[0].status == "FAIL"
assert permission in fail_result[0].status_extended
assert len(pass_result) == 1
assert pass_result[0].status == "PASS"
def test_file_family_fails_until_all_delete_permissions_are_excluded(self):
partial_result = _run_check(
[
_policy(
"File Admins",
[
"Allow group FileUsers to manage file-family in tenancy where ALL {request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE'}"
],
)
]
)
complete_result = _run_check(
[
_policy(
"File Admins",
[
"Allow group FileUsers to manage file-family in tenancy where ALL {request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE', request.permission!='EXPORT_SET_DELETE'}"
],
)
]
)
assert len(partial_result) == 1
assert partial_result[0].status == "FAIL"
assert "EXPORT_SET_DELETE" in partial_result[0].status_extended
assert len(complete_result) == 1
assert complete_result[0].status == "PASS"
@pytest.mark.parametrize(
"family,missing_permission,statement",
[
(
"volume-family",
"VOLUME_BACKUP_DELETE",
"Allow group VolumeUsers to manage volume-family in tenancy where request.permission!='VOLUME_DELETE'",
),
(
"object-family",
"BUCKET_DELETE",
"Allow group BucketUsers to manage object-family in tenancy where request.permission!='OBJECT_DELETE'",
),
(
"all-resources",
"BUCKET_DELETE",
"Allow group StorageUsers to manage all-resources in tenancy where ALL {request.permission!='VOLUME_DELETE', request.permission!='VOLUME_BACKUP_DELETE', request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE', request.permission!='EXPORT_SET_DELETE', request.permission!='OBJECT_DELETE'}",
),
],
)
def test_families_and_all_resources_fail_unless_all_delete_permissions_are_excluded(
self, family, missing_permission, statement
):
result = _run_check([_policy("Storage Admins", [statement])])
assert len(result) == 1
assert result[0].status == "FAIL"
assert family in result[0].status_extended
assert missing_permission in result[0].status_extended
def test_all_resources_passes_when_all_storage_delete_permissions_are_excluded(
self,
):
result = _run_check(
[
_policy(
"Storage Admins",
[
"Allow group StorageUsers to manage all-resources in tenancy where ALL {request.permission!='VOLUME_DELETE', request.permission!='VOLUME_BACKUP_DELETE', request.permission!='FILE_SYSTEM_DELETE', request.permission!='MOUNT_TARGET_DELETE', request.permission!='EXPORT_SET_DELETE', request.permission!='OBJECT_DELETE', request.permission!='BUCKET_DELETE'}"
],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
def test_inactive_policies_are_ignored(self):
result = _run_check(
[
_policy(
"Inactive Volume Admins",
["Allow group VolumeUsers to manage volumes in tenancy"],
lifecycle_state="INACTIVE",
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Tenancy"
def test_tenant_admin_policy_is_ignored(self):
result = _run_check(
[
_policy(
"Tenant Admin Policy",
["Allow group Administrators to manage all-resources in tenancy"],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Tenancy"
def test_policies_without_storage_manage_statements_are_ignored(self):
result = _run_check(
[
_policy(
"Network Admins",
["Allow group NetworkUsers to manage vcns in tenancy"],
)
]
)
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_name == "Tenancy"
Generated
+1 -1
View File
@@ -3245,7 +3245,7 @@ wheels = [
[[package]]
name = "prowler"
version = "5.31.0"
version = "5.30.2"
source = { editable = "." }
dependencies = [
{ name = "alibabacloud-actiontrail20200706" },