From ce27053c2dd2e97b7589402c83bcb78bb08d41e1 Mon Sep 17 00:00:00 2001 From: Zeus Almightee <53497290+ernestprovo23@users.noreply.github.com> Date: Thu, 11 Jun 2026 10:53:28 -0400 Subject: [PATCH] feat(aws): add securityhub + config org-wide delegated admin checks (#11259) Co-authored-by: Lydia Vilchez --- prowler/CHANGELOG.md | 9 + .../__init__.py | 0 ...d_org_aggregator_all_regions.metadata.json | 44 ++ ...ed_admin_and_org_aggregator_all_regions.py | 115 ++++ .../aws/services/config/config_service.py | 131 +++++ .../__init__.py | 0 ...ed_admin_enabled_all_regions.metadata.json | 44 ++ ...hub_delegated_admin_enabled_all_regions.py | 84 +++ .../securityhub/securityhub_service.py | 101 +++- ...min_and_org_aggregator_all_regions_test.py | 491 +++++++++++++++++ ...elegated_admin_enabled_all_regions_test.py | 512 ++++++++++++++++++ 11 files changed, 1530 insertions(+), 1 deletion(-) create mode 100644 prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/__init__.py create mode 100644 prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.metadata.json create mode 100644 prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.py create mode 100644 prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/__init__.py create mode 100644 prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.metadata.json create mode 100644 prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.py create mode 100644 tests/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions_test.py create mode 100644 tests/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions_test.py diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index bb206f5a96..62fbb35d30 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -2,6 +2,15 @@ All notable changes to the **Prowler SDK** are documented in this file. +## [5.31.0] (Prowler UNRELEASED) + +### 🚀 Added + +- `securityhub_delegated_admin_enabled_all_regions` check for AWS provider, verifying that Security Hub has a delegated administrator, is active in all opted-in regions, and has organization auto-enable on [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259) +- `config_delegated_admin_and_org_aggregator_all_regions` check for AWS provider, verifying that AWS Config has a delegated administrator and an organization aggregator covering all AWS regions [(#11259)](https://github.com/prowler-cloud/prowler/pull/11259) + +--- + ## [5.30.0] (Prowler v5.30.0) ### 🚀 Added diff --git a/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/__init__.py b/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.metadata.json b/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.metadata.json new file mode 100644 index 0000000000..cbbd7a66dd --- /dev/null +++ b/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.metadata.json @@ -0,0 +1,44 @@ +{ + "Provider": "aws", + "CheckID": "config_delegated_admin_and_org_aggregator_all_regions", + "CheckTitle": "AWS Config has a delegated administrator and an organization aggregator covering all AWS regions", + "CheckType": [ + "Software and Configuration Checks/AWS Security Best Practices", + "Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices" + ], + "ServiceName": "config", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "AwsConfigConfigurationAggregator", + "ResourceGroup": "governance", + "Description": "**AWS Config** has a delegated administrator registered via AWS Organizations and at least one Configuration Aggregator with an OrganizationAggregationSource that covers all AWS regions, ensuring centralized org-wide configuration visibility.", + "Risk": "Without an org-wide **AWS Config** aggregator and a delegated administrator, configuration data is fragmented across accounts and regions, **compliance reporting** is incomplete, and **drift detection** is delayed. Adversaries or misconfigurations can persist in unmonitored accounts, eroding **audit readiness** and **regulatory posture**.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.aws.amazon.com/config/latest/developerguide/aggregate-data.html", + "https://docs.aws.amazon.com/config/latest/developerguide/set-up-aggregator-cli.html", + "https://docs.aws.amazon.com/organizations/latest/userguide/services-that-can-integrate-config.html" + ], + "Remediation": { + "Code": { + "CLI": "aws organizations register-delegated-administrator --account-id --service-principal config.amazonaws.com && aws configservice put-configuration-aggregator --configuration-aggregator-name org-aggregator --organization-aggregation-source RoleArn=,AllAwsRegions=true", + "NativeIaC": "", + "Other": "1. From the AWS Organizations management account, register the delegated administrator for config.amazonaws.com\n2. In the delegated admin account, open AWS Config\n3. Create a Configuration Aggregator and select Add my organization as the source\n4. Enable Include all AWS Regions\n5. Confirm an IAM role with AWSConfigRoleForOrganizations is attached\n6. Verify the aggregator status reaches SUCCEEDED for all member accounts", + "Terraform": "" + }, + "Recommendation": { + "Text": "Register a **delegated administrator** for AWS Config via AWS Organizations and create at least one **Configuration Aggregator** with an OrganizationAggregationSource that covers **all AWS regions**. This centralizes configuration data across the organization for unified compliance and audit reporting.", + "Url": "https://hub.prowler.com/check/config_delegated_admin_and_org_aggregator_all_regions" + } + }, + "Categories": [ + "forensics-ready" + ], + "DependsOn": [], + "RelatedTo": [ + "config_recorder_all_regions_enabled", + "guardduty_delegated_admin_enabled_all_regions" + ], + "Notes": "This check requires execution from the organization management account or delegated administrator account to access organization-level APIs." +} diff --git a/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.py b/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.py new file mode 100644 index 0000000000..f56ea191f8 --- /dev/null +++ b/prowler/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions.py @@ -0,0 +1,115 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.config.config_client import config_client +from prowler.providers.aws.services.config.config_service import Aggregator + + +class config_delegated_admin_and_org_aggregator_all_regions(Check): + """Ensure AWS Config has a delegated admin and an org aggregator covering all regions. + + This check verifies that: + 1. A delegated administrator is registered for the config.amazonaws.com + service principal via AWS Organizations. + 2. At least one AWS Config Configuration Aggregator exists with an + OrganizationAggregationSource that covers all AWS regions + (AllAwsRegions=true). + """ + + def execute(self) -> list[Check_Report_AWS]: + """Execute the check logic. + + Returns: + A list of reports containing the result of the check. One finding per + aggregator-region, or a single synthetic FAIL when no aggregators + exist in any region. + """ + findings = [] + + has_delegated_admin = ( + bool(config_client.delegated_administrators) + and not config_client.delegated_administrators_lookup_failed + ) + delegated_admin_unknown = config_client.delegated_administrators_lookup_failed + + # No aggregators in any region: emit one synthetic FAIL anchored to the + # audited account in the default region. + if not config_client.aggregators: + synthetic = Aggregator( + name="unknown", + arn=config_client.get_unknown_arn( + region=config_client.region, + resource_type="config-aggregator", + ), + region=config_client.region, + all_aws_regions=False, + aws_regions=None, + organization_aggregation_source_present=False, + ) + report = Check_Report_AWS(metadata=self.metadata(), resource=synthetic) + if delegated_admin_unknown: + delegated_state = ( + "delegated administrator status could not be determined" + ) + elif has_delegated_admin: + delegated_state = "delegated administrator configured" + else: + delegated_state = ( + "no delegated administrator registered for config.amazonaws.com" + ) + report.status = "FAIL" + report.status_extended = ( + f"AWS Config has no Organization Aggregator configured in any " + f"region ({delegated_state})." + ) + findings.append(report) + return findings + + for region, aggregators_in_region in config_client.aggregators.items(): + for aggregator in aggregators_in_region: + report = Check_Report_AWS(metadata=self.metadata(), resource=aggregator) + + org_aware = aggregator.organization_aggregation_source_present + covers_all = aggregator.all_aws_regions + + issues = [] + if delegated_admin_unknown: + issues.append( + "delegated administrator status for config.amazonaws.com " + "could not be determined" + ) + elif not has_delegated_admin: + issues.append( + "no delegated administrator registered for config.amazonaws.com" + ) + if not org_aware: + issues.append( + f"aggregator {aggregator.name} is not an organization aggregator" + ) + elif not covers_all: + issues.append( + f"aggregator {aggregator.name} does not cover all AWS regions" + ) + + if issues: + report.status = "FAIL" + report.status_extended = ( + f"AWS Config aggregator {aggregator.name} in region " + f"{region} has issues: {', '.join(issues)}." + ) + else: + report.status = "PASS" + report.status_extended = ( + f"AWS Config aggregator {aggregator.name} in region " + f"{region} is an organization aggregator covering all " + f"AWS regions with delegated admin configured." + ) + + # Support muting non-default regions if configured + if report.status == "FAIL" and ( + config_client.audit_config.get("mute_non_default_regions", False) + and region != config_client.region + ): + report.muted = True + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/config/config_service.py b/prowler/providers/aws/services/config/config_service.py index 443cee2233..85ab22fd8e 100644 --- a/prowler/providers/aws/services/config/config_service.py +++ b/prowler/providers/aws/services/config/config_service.py @@ -1,5 +1,6 @@ from typing import Optional +from botocore.client import ClientError from pydantic.v1 import BaseModel from prowler.lib.logger import logger @@ -12,10 +13,16 @@ class Config(AWSService): # Call AWSService's __init__ super().__init__(__class__.__name__, provider) self.recorders = {} + self.aggregators: dict[str, list] = {} + self.delegated_administrators: list = [] + self.delegated_administrators_lookup_failed: bool = False self.__threading_call__(self.describe_configuration_recorders) self.__threading_call__( self._describe_configuration_recorder_status, self.recorders.values() ) + self.__threading_call__(self._describe_configuration_aggregators) + # Organizations API is not regional; single call. + self._list_config_delegated_administrators() def _get_recorder_arn_template(self, region): return f"arn:{self.audited_partition}:config:{region}:{self.audited_account}:recorder" @@ -73,6 +80,108 @@ class Config(AWSService): f"{recorder.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _describe_configuration_aggregators(self, regional_client): + """Describe AWS Config configuration aggregators per region. + + An aggregator counts as organization-aware when its + OrganizationAggregationSource key is present in the response. + """ + logger.info("Config - Describing Configuration Aggregators...") + try: + paginator = regional_client.get_paginator( + "describe_configuration_aggregators" + ) + region_aggregators: list = [] + for page in paginator.paginate(): + for aggregator in page.get("ConfigurationAggregators", []): + name = aggregator.get("ConfigurationAggregatorName", "") + arn = aggregator.get("ConfigurationAggregatorArn", "") + org_source = aggregator.get("OrganizationAggregationSource") + org_aware = org_source is not None + all_aws_regions = False + aws_regions: Optional[list] = None + if org_aware: + all_aws_regions = org_source.get("AllAwsRegions", False) + aws_regions = org_source.get("AwsRegions") + if not self.audit_resources or ( + is_resource_filtered(arn, self.audit_resources) + ): + region_aggregators.append( + Aggregator( + name=name, + arn=arn, + region=regional_client.region, + all_aws_regions=all_aws_regions, + aws_regions=aws_regions, + organization_aggregation_source_present=org_aware, + ) + ) + if region_aggregators: + self.aggregators[regional_client.region] = region_aggregators + except ClientError as error: + if error.response["Error"]["Code"] in ( + "AccessDeniedException", + "AccessDenied", + ): + logger.warning( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def _list_config_delegated_administrators(self): + """List delegated administrators for the AWS Config service principal. + + Uses the Organizations API directly (not regional). Sets + delegated_administrators_lookup_failed to True on AccessDenied so callers + can surface the unknown delegated-admin state in findings. + """ + logger.info( + "Config - Listing delegated administrators for config.amazonaws.com..." + ) + try: + org_client = self.session.client("organizations") + paginator = org_client.get_paginator("list_delegated_administrators") + for page in paginator.paginate(ServicePrincipal="config.amazonaws.com"): + for admin in page.get("DelegatedAdministrators", []): + self.delegated_administrators.append( + ConfigDelegatedAdministrator( + id=admin.get("Id", ""), + arn=admin.get("Arn", ""), + name=admin.get("Name", ""), + email=admin.get("Email", ""), + status=admin.get("Status", ""), + joined_method=admin.get("JoinedMethod", ""), + ) + ) + except ClientError as error: + error_code = error.response["Error"]["Code"] + if error_code in ( + "AccessDeniedException", + "AccessDenied", + "AWSOrganizationsNotInUseException", + ): + self.delegated_administrators_lookup_failed = True + logger.warning( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + self.delegated_administrators_lookup_failed = True + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + self.delegated_administrators_lookup_failed = True + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + class Recorder(BaseModel): name: str @@ -80,3 +189,25 @@ class Recorder(BaseModel): recording: Optional[bool] last_status: Optional[str] region: str + + +class Aggregator(BaseModel): + """Represents an AWS Config Configuration Aggregator.""" + + name: str + arn: str + region: str + all_aws_regions: bool = False + aws_regions: Optional[list] = None + organization_aggregation_source_present: bool = False + + +class ConfigDelegatedAdministrator(BaseModel): + """Represents a delegated administrator registered for config.amazonaws.com.""" + + id: str + arn: str + name: str + email: str + status: str + joined_method: str diff --git a/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/__init__.py b/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.metadata.json b/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.metadata.json new file mode 100644 index 0000000000..99748671ae --- /dev/null +++ b/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.metadata.json @@ -0,0 +1,44 @@ +{ + "Provider": "aws", + "CheckID": "securityhub_delegated_admin_enabled_all_regions", + "CheckTitle": "Security Hub has delegated admin configured and is enabled in all regions with organization auto-enable", + "CheckType": [ + "Software and Configuration Checks/AWS Security Best Practices", + "Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices" + ], + "ServiceName": "securityhub", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "AwsSecurityHubHub", + "ResourceGroup": "security", + "Description": "**AWS Security Hub** has a delegated administrator configured at the organization level, hubs are active in all opted-in regions, and organization auto-enable is active so that new member accounts are automatically enrolled.", + "Risk": "Without org-wide **AWS Security Hub** configuration, findings can be aggregated inconsistently, delegated admin may be missing in some regions, and new accounts will not be auto-enrolled. This fragments **security posture visibility**, delays **incident response**, and lets misconfigurations and compliance drift go undetected across the organization.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.aws.amazon.com/securityhub/latest/userguide/designate-orgs-admin-account.html", + "https://docs.aws.amazon.com/securityhub/latest/userguide/accounts-orgs-auto-enable.html", + "https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-regions.html" + ], + "Remediation": { + "Code": { + "CLI": "aws securityhub enable-organization-admin-account --admin-account-id && aws securityhub update-organization-configuration --auto-enable --auto-enable-standards DEFAULT", + "NativeIaC": "", + "Other": "1. Sign in to the AWS Organizations management account\n2. Open the AWS Organizations console\n3. Navigate to Services > AWS Security Hub\n4. Click Register delegated administrator and enter the security account ID\n5. Switch to the delegated admin account\n6. In Security Hub console, go to Settings > Accounts\n7. Enable auto-enable for new organization accounts\n8. Repeat hub enablement for all opted-in regions", + "Terraform": "" + }, + "Recommendation": { + "Text": "Configure a **delegated administrator** for AWS Security Hub via AWS Organizations. Enable Security Hub in **all opted-in regions** and turn on **auto-enable** so new member accounts are automatically enrolled. This ensures uniform security posture monitoring across the entire organization.", + "Url": "https://hub.prowler.com/check/securityhub_delegated_admin_enabled_all_regions" + } + }, + "Categories": [ + "forensics-ready" + ], + "DependsOn": [], + "RelatedTo": [ + "securityhub_enabled", + "guardduty_delegated_admin_enabled_all_regions" + ], + "Notes": "This check requires execution from the organization management account or delegated administrator account to access organization-level APIs." +} diff --git a/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.py b/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.py new file mode 100644 index 0000000000..4828752183 --- /dev/null +++ b/prowler/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions.py @@ -0,0 +1,84 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.securityhub.securityhub_client import ( + securityhub_client, +) + + +class securityhub_delegated_admin_enabled_all_regions(Check): + """Ensure Security Hub has a delegated admin and is enabled in all regions. + + This check verifies that: + 1. A delegated administrator account is configured for Security Hub + 2. Security Hub is active (ACTIVE status) in each region + 3. Organization auto-enable is configured for new member accounts + """ + + def execute(self) -> list[Check_Report_AWS]: + """Execute the check logic. + + Returns: + A list of reports containing the result of the check for each region. + """ + findings = [] + + # Build a set of regions that have an organization admin account configured + regions_with_admin = { + admin.region + for admin in securityhub_client.organization_admin_accounts + if admin.admin_status == "ENABLED" + } + admin_lookup_failed = securityhub_client.organization_admin_lookup_failed + + for securityhub in securityhub_client.securityhubs: + report = Check_Report_AWS(metadata=self.metadata(), resource=securityhub) + + # Check if this region has a delegated admin + has_delegated_admin = securityhub.region in regions_with_admin + + # Check if hub is active + hub_active = securityhub.status == "ACTIVE" + + # Check if auto-enable is configured for organization members + auto_enable_on = securityhub.organization_auto_enable + + # Determine overall status + issues = [] + if admin_lookup_failed: + issues.append("delegated administrator status could not be determined") + elif not has_delegated_admin: + issues.append("no delegated administrator configured") + if not hub_active: + issues.append("Security Hub not enabled") + if ( + hub_active + and securityhub.organization_config_available + and not auto_enable_on + ): + # Only report auto-enable issue if hub is active and org config data + # is available (i.e., we could actually read AutoEnable from the API). + issues.append("organization auto-enable not configured") + + if issues: + report.status = "FAIL" + report.status_extended = ( + f"Security Hub in region {securityhub.region} has issues: " + f"{', '.join(issues)}." + ) + else: + report.status = "PASS" + report.status_extended = ( + f"Security Hub in region {securityhub.region} has delegated " + f"admin configured with hub active and organization auto-enable " + f"enabled." + ) + + # Support muting non-default regions if configured + if report.status == "FAIL" and ( + securityhub_client.audit_config.get("mute_non_default_regions", False) + and securityhub.region != securityhub_client.region + ): + report.muted = True + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/securityhub/securityhub_service.py b/prowler/providers/aws/services/securityhub/securityhub_service.py index 0799c6e048..5e2d445742 100644 --- a/prowler/providers/aws/services/securityhub/securityhub_service.py +++ b/prowler/providers/aws/services/securityhub/securityhub_service.py @@ -13,8 +13,14 @@ class SecurityHub(AWSService): # Call AWSService's __init__ super().__init__(__class__.__name__, provider) self.securityhubs = [] + self.organization_admin_accounts = [] + self.organization_admin_lookup_failed: bool = False self.__threading_call__(self._describe_hub) self.__threading_call__(self._list_tags, self.securityhubs) + self.__threading_call__(self._list_organization_admin_accounts) + self.__threading_call__( + self._describe_organization_configuration, self.securityhubs + ) def _describe_hub(self, regional_client): logger.info("SecurityHub - Describing Hub...") @@ -104,6 +110,95 @@ class SecurityHub(AWSService): f"{resource.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _list_organization_admin_accounts(self, regional_client): + """List Security Hub delegated administrator accounts for the organization. + + This API is only available to the organization management account or + a delegated administrator account. + """ + logger.info("SecurityHub - listing organization admin accounts...") + try: + paginator = regional_client.get_paginator( + "list_organization_admin_accounts" + ) + for page in paginator.paginate(): + for admin in page.get("AdminAccounts", []): + admin_account = OrganizationAdminAccount( + admin_account_id=admin.get("AdminAccountId"), + admin_status=admin.get("AdminStatus"), + region=regional_client.region, + ) + # Avoid duplicates across regions for the same admin account + if not any( + existing.admin_account_id == admin_account.admin_account_id + and existing.region == admin_account.region + for existing in self.organization_admin_accounts + ): + self.organization_admin_accounts.append(admin_account) + except ClientError as error: + self.organization_admin_lookup_failed = True + if error.response["Error"]["Code"] in ( + "AccessDeniedException", + "InvalidAccessException", + "BadRequestException", + ): + logger.warning( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + self.organization_admin_lookup_failed = True + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def _describe_organization_configuration(self, securityhub): + """Describe the organization configuration for a Security Hub instance. + + This provides information about auto-enable settings for the organization. + Only invoked for hubs in ACTIVE status. + """ + logger.info("SecurityHub - describing organization configuration...") + try: + if securityhub.status != "ACTIVE": + return + regional_client = self.regional_clients[securityhub.region] + org_config = regional_client.describe_organization_configuration() + securityhub.organization_auto_enable = org_config.get("AutoEnable", False) + securityhub.auto_enable_standards = org_config.get( + "AutoEnableStandards", "NONE" + ) + securityhub.organization_config_available = True + except ClientError as error: + if error.response["Error"]["Code"] in ( + "AccessDeniedException", + "InvalidAccessException", + "BadRequestException", + ): + # Expected when not running from management or delegated admin account + logger.warning( + f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{securityhub.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class OrganizationAdminAccount(BaseModel): + """Represents a Security Hub delegated administrator account.""" + + admin_account_id: str + admin_status: str # ENABLED or DISABLE_IN_PROGRESS + region: str + class SecurityHubHub(BaseModel): arn: str @@ -112,4 +207,8 @@ class SecurityHubHub(BaseModel): standards: str integrations: str region: str - tags: Optional[list] + tags: Optional[list] = [] + # Organization configuration fields + organization_auto_enable: bool = False + auto_enable_standards: str = "NONE" + organization_config_available: bool = False diff --git a/tests/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions_test.py b/tests/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions_test.py new file mode 100644 index 0000000000..f2acf555d1 --- /dev/null +++ b/tests/providers/aws/services/config/config_delegated_admin_and_org_aggregator_all_regions/config_delegated_admin_and_org_aggregator_all_regions_test.py @@ -0,0 +1,491 @@ +from unittest.mock import patch + +import botocore +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + +orig = botocore.client.BaseClient._make_api_call + + +AGG_ARN_TEMPLATE = ( + "arn:aws:config:{region}:" + AWS_ACCOUNT_NUMBER + ":config-aggregator/{name}" +) + + +def _aggregator_payload( + name, region, *, org_aware=True, all_regions=True, aws_regions=None +): + payload = { + "ConfigurationAggregatorName": name, + "ConfigurationAggregatorArn": AGG_ARN_TEMPLATE.format(region=region, name=name), + } + if org_aware: + org_source = { + "RoleArn": f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/AWSConfigRoleForOrganizations", + "AllAwsRegions": all_regions, + } + if not all_regions and aws_regions: + org_source["AwsRegions"] = aws_regions + payload["OrganizationAggregationSource"] = org_source + return payload + + +def make_mock_no_aggregators_no_admin(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + return {"ConfigurationAggregators": []} + if operation_name == "ListDelegatedAdministrators": + return {"DelegatedAdministrators": []} + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_aggregator_not_org_aware(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + return { + "ConfigurationAggregators": [ + _aggregator_payload( + "legacy-agg", + AWS_REGION_EU_WEST_1, + org_aware=False, + ) + ] + } + if operation_name == "ListDelegatedAdministrators": + return {"DelegatedAdministrators": []} + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_org_aggregator_not_all_regions_with_admin(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + return { + "ConfigurationAggregators": [ + _aggregator_payload( + "partial-org-agg", + AWS_REGION_EU_WEST_1, + org_aware=True, + all_regions=False, + aws_regions=[AWS_REGION_EU_WEST_1], + ) + ] + } + if operation_name == "ListDelegatedAdministrators": + return { + "DelegatedAdministrators": [ + { + "Id": "123456789012", + "Arn": f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/o-abc123/123456789012", + "Email": "admin@example.com", + "Name": "Security", + "Status": "ACTIVE", + "JoinedMethod": "CREATED", + } + ] + } + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_full_pass(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + return { + "ConfigurationAggregators": [ + _aggregator_payload( + "org-aggregator", + AWS_REGION_EU_WEST_1, + org_aware=True, + all_regions=True, + ) + ] + } + if operation_name == "ListDelegatedAdministrators": + return { + "DelegatedAdministrators": [ + { + "Id": "123456789012", + "Arn": f"arn:aws:organizations::{AWS_ACCOUNT_NUMBER}:account/o-abc123/123456789012", + "Email": "admin@example.com", + "Name": "Security", + "Status": "ACTIVE", + "JoinedMethod": "CREATED", + } + ] + } + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_access_denied_on_orgs(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + return { + "ConfigurationAggregators": [ + _aggregator_payload( + "org-aggregator", + AWS_REGION_EU_WEST_1, + org_aware=True, + all_regions=True, + ) + ] + } + if operation_name == "ListDelegatedAdministrators": + raise botocore.exceptions.ClientError( + { + "Error": { + "Code": "AccessDeniedException", + "Message": "User is not authorized to perform: organizations:ListDelegatedAdministrators", + } + }, + operation_name, + ) + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_aggregators_access_denied(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + raise botocore.exceptions.ClientError( + { + "Error": { + "Code": "AccessDeniedException", + "Message": "denied", + } + }, + operation_name, + ) + if operation_name == "ListDelegatedAdministrators": + return {"DelegatedAdministrators": []} + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_aggregators_other_client_error(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + raise botocore.exceptions.ClientError( + { + "Error": { + "Code": "InternalServerError", + "Message": "boom", + } + }, + operation_name, + ) + if operation_name == "ListDelegatedAdministrators": + return {"DelegatedAdministrators": []} + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_aggregators_unexpected_exception(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + raise RuntimeError("simulated transient error") + if operation_name == "ListDelegatedAdministrators": + return {"DelegatedAdministrators": []} + return orig(self, operation_name, api_params) + + return _mock + + +def make_mock_delegated_admins_unexpected_exception(): + def _mock(self, operation_name, api_params): + if operation_name == "DescribeConfigurationAggregators": + return { + "ConfigurationAggregators": [ + _aggregator_payload( + "org-aggregator", + AWS_REGION_EU_WEST_1, + org_aware=True, + all_regions=True, + ) + ] + } + if operation_name == "ListDelegatedAdministrators": + raise RuntimeError("simulated transient error") + return orig(self, operation_name, api_params) + + return _mock + + +class Test_config_delegated_admin_and_org_aggregator_all_regions: + @mock_aws + def test_no_aggregators_no_admin(self): + """Test when no aggregators exist in any region and no delegated admin is set.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_no_aggregators_no_admin(), + ): + aws_provider = set_mocked_aws_provider( + [AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1] + ) + + from prowler.providers.aws.services.config.config_service import Config + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client", + new=Config(aws_provider), + ), + ): + from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import ( + config_delegated_admin_and_org_aggregator_all_regions, + ) + + check = config_delegated_admin_and_org_aggregator_all_regions() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + "no Organization Aggregator configured in any region" + in result[0].status_extended + ) + assert ( + "no delegated administrator registered for config.amazonaws.com" + in result[0].status_extended + ) + + @mock_aws + def test_aggregator_not_org_aware(self): + """Test when an aggregator exists but is not an organization aggregator.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_aggregator_not_org_aware(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.config.config_service import Config + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client", + new=Config(aws_provider), + ), + ): + from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import ( + config_delegated_admin_and_org_aggregator_all_regions, + ) + + check = config_delegated_admin_and_org_aggregator_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "FAIL" + assert ( + "is not an organization aggregator" + in eu_west_1_result.status_extended + ) + + @mock_aws + def test_org_aggregator_not_all_regions_with_admin(self): + """Test org aggregator that doesn't cover all AWS regions (delegated admin set).""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_org_aggregator_not_all_regions_with_admin(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.config.config_service import Config + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client", + new=Config(aws_provider), + ), + ): + from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import ( + config_delegated_admin_and_org_aggregator_all_regions, + ) + + check = config_delegated_admin_and_org_aggregator_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "FAIL" + assert ( + "does not cover all AWS regions" in eu_west_1_result.status_extended + ) + + @mock_aws + def test_full_pass(self): + """Test PASS: delegated admin set and org aggregator covering all AWS regions.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_full_pass(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.config.config_service import Config + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client", + new=Config(aws_provider), + ), + ): + from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import ( + config_delegated_admin_and_org_aggregator_all_regions, + ) + + check = config_delegated_admin_and_org_aggregator_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "PASS" + assert ( + "is an organization aggregator covering all AWS regions" + in eu_west_1_result.status_extended + ) + assert "delegated admin configured" in eu_west_1_result.status_extended + assert eu_west_1_result.resource_arn == AGG_ARN_TEMPLATE.format( + region=AWS_REGION_EU_WEST_1, name="org-aggregator" + ) + + @mock_aws + def test_access_denied_on_organizations(self): + """Test that AccessDenied on Organizations is reported as unknown admin state.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_access_denied_on_orgs(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.config.config_service import Config + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions.config_client", + new=Config(aws_provider), + ), + ): + from prowler.providers.aws.services.config.config_delegated_admin_and_org_aggregator_all_regions.config_delegated_admin_and_org_aggregator_all_regions import ( + config_delegated_admin_and_org_aggregator_all_regions, + ) + + check = config_delegated_admin_and_org_aggregator_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + # The check still runs; aggregator coverage is satisfied but the + # delegated-admin status is unknown, so it must FAIL. + assert eu_west_1_result.status == "FAIL" + assert ( + "delegated administrator status for config.amazonaws.com could not be determined" + in eu_west_1_result.status_extended + ) + + @mock_aws + def test_aggregators_access_denied(self): + """AccessDenied on DescribeConfigurationAggregators is swallowed: no aggregators recorded for that region.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_aggregators_access_denied(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + from prowler.providers.aws.services.config.config_service import Config + + service = Config(aws_provider) + assert service.aggregators == {} + + @mock_aws + def test_aggregators_other_client_error(self): + """Non-access ClientError on DescribeConfigurationAggregators is logged at error level.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_aggregators_other_client_error(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + from prowler.providers.aws.services.config.config_service import Config + + service = Config(aws_provider) + assert service.aggregators == {} + + @mock_aws + def test_aggregators_unexpected_exception(self): + """Non-ClientError on DescribeConfigurationAggregators is caught by bare except.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_aggregators_unexpected_exception(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + from prowler.providers.aws.services.config.config_service import Config + + service = Config(aws_provider) + assert service.aggregators == {} + + @mock_aws + def test_delegated_admins_unexpected_exception(self): + """Non-ClientError on ListDelegatedAdministrators must still set lookup_failed.""" + with patch( + "botocore.client.BaseClient._make_api_call", + new=make_mock_delegated_admins_unexpected_exception(), + ): + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + from prowler.providers.aws.services.config.config_service import Config + + service = Config(aws_provider) + assert service.delegated_administrators_lookup_failed is True + assert service.delegated_administrators == [] diff --git a/tests/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions_test.py b/tests/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions_test.py new file mode 100644 index 0000000000..01af147e9a --- /dev/null +++ b/tests/providers/aws/services/securityhub/securityhub_delegated_admin_enabled_all_regions/securityhub_delegated_admin_enabled_all_regions_test.py @@ -0,0 +1,512 @@ +from unittest.mock import patch + +import botocore +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, + set_mocked_aws_provider, +) + +orig = botocore.client.BaseClient._make_api_call + +HUB_ARN = f"arn:aws:securityhub:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:hub/default" + + +def _active_hub_responses(operation_name): + """Return a moto-friendly response for hub-describing API calls. + + Returns None if the operation is not one of the hub APIs (so the caller + can fall back to the default behavior). + """ + if operation_name == "DescribeHub": + return { + "HubArn": HUB_ARN, + "SubscribedAt": "2024-01-01T00:00:00.000Z", + "AutoEnableControls": True, + } + if operation_name == "GetEnabledStandards": + return {"StandardsSubscriptions": []} + if operation_name == "ListEnabledProductsForImport": + return {"ProductSubscriptions": []} + if operation_name == "ListTagsForResource": + return {"Tags": {}} + return None + + +def mock_make_api_call_org_admin_and_config(self, operation_name, api_params): + """Mock organization admin accounts and configuration APIs - PASS scenario.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + return { + "AdminAccounts": [ + { + "AdminAccountId": "123456789012", + "AdminStatus": "ENABLED", + } + ] + } + if operation_name == "DescribeOrganizationConfiguration": + return { + "AutoEnable": True, + "AutoEnableStandards": "DEFAULT", + } + return orig(self, operation_name, api_params) + + +def mock_make_api_call_org_admin_no_auto_enable(self, operation_name, api_params): + """Mock organization admin configured but auto-enable disabled.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + return { + "AdminAccounts": [ + { + "AdminAccountId": "123456789012", + "AdminStatus": "ENABLED", + } + ] + } + if operation_name == "DescribeOrganizationConfiguration": + return { + "AutoEnable": False, + "AutoEnableStandards": "NONE", + } + return orig(self, operation_name, api_params) + + +def mock_make_api_call_no_org_admin(self, operation_name, api_params): + """Mock no organization admin configured.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + return {"AdminAccounts": []} + if operation_name == "DescribeOrganizationConfiguration": + return { + "AutoEnable": False, + "AutoEnableStandards": "NONE", + } + return orig(self, operation_name, api_params) + + +def mock_make_api_call_securityhub_not_subscribed(self, operation_name, api_params): + """Simulate Security Hub not subscribed in the account (InvalidAccessException).""" + if operation_name == "DescribeHub": + raise botocore.exceptions.ClientError( + { + "Error": { + "Code": "InvalidAccessException", + "Message": "Account is not subscribed to AWS Security Hub", + } + }, + operation_name, + ) + if operation_name == "ListOrganizationAdminAccounts": + return {"AdminAccounts": []} + return orig(self, operation_name, api_params) + + +def mock_make_api_call_admin_lookup_access_denied(self, operation_name, api_params): + """Hub is ACTIVE but ListOrganizationAdminAccounts is denied — lookup-failed path.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + raise botocore.exceptions.ClientError( + { + "Error": { + "Code": "AccessDeniedException", + "Message": "User is not authorized to perform: securityhub:ListOrganizationAdminAccounts", + } + }, + operation_name, + ) + if operation_name == "DescribeOrganizationConfiguration": + return {"AutoEnable": True, "AutoEnableStandards": "DEFAULT"} + return orig(self, operation_name, api_params) + + +def mock_make_api_call_admin_lookup_unexpected(self, operation_name, api_params): + """ListOrganizationAdminAccounts raises a non-ClientError — bare Exception branch.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + raise RuntimeError("simulated transient error") + if operation_name == "DescribeOrganizationConfiguration": + return {"AutoEnable": True, "AutoEnableStandards": "DEFAULT"} + return orig(self, operation_name, api_params) + + +def mock_make_api_call_describe_org_config_other_client_error( + self, operation_name, api_params +): + """DescribeOrganizationConfiguration raises a non-access ClientError — else branch.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + return { + "AdminAccounts": [ + {"AdminAccountId": "123456789012", "AdminStatus": "ENABLED"} + ] + } + if operation_name == "DescribeOrganizationConfiguration": + raise botocore.exceptions.ClientError( + {"Error": {"Code": "InternalServerError", "Message": "boom"}}, + operation_name, + ) + return orig(self, operation_name, api_params) + + +def mock_make_api_call_describe_org_config_unexpected(self, operation_name, api_params): + """DescribeOrganizationConfiguration raises a non-ClientError — bare Exception branch.""" + hub_resp = _active_hub_responses(operation_name) + if hub_resp is not None: + return hub_resp + if operation_name == "ListOrganizationAdminAccounts": + return { + "AdminAccounts": [ + {"AdminAccountId": "123456789012", "AdminStatus": "ENABLED"} + ] + } + if operation_name == "DescribeOrganizationConfiguration": + raise RuntimeError("simulated transient error") + return orig(self, operation_name, api_params) + + +class Test_securityhub_delegated_admin_enabled_all_regions: + def teardown_method(self): + """Evict cached securityhub modules so legacy mock.patch-based tests + in the same session see a fresh import path.""" + import sys + + for mod in ( + "prowler.providers.aws.services.securityhub.securityhub_client", + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions", + ): + sys.modules.pop(mod, None) + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_securityhub_not_subscribed, + ) + @mock_aws + def test_no_securityhub(self): + """Test when Security Hub is not subscribed in any region.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=SecurityHub(aws_provider), + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + check = securityhub_delegated_admin_enabled_all_regions() + result = check.execute() + + # Should have findings for each region (with NOT_AVAILABLE hubs) + assert len(result) > 0 + # All should fail since hub is not enabled + for finding in result: + assert finding.status == "FAIL" + assert "Security Hub not enabled" in finding.status_extended + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_no_org_admin, + ) + @mock_aws + def test_securityhub_enabled_no_delegated_admin(self): + """Test when Security Hub is enabled but no delegated admin is configured.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=SecurityHub(aws_provider), + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + check = securityhub_delegated_admin_enabled_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "FAIL" + assert ( + "no delegated administrator configured" + in eu_west_1_result.status_extended + ) + assert eu_west_1_result.resource_arn == HUB_ARN + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_org_admin_no_auto_enable, + ) + @mock_aws + def test_securityhub_enabled_with_admin_no_auto_enable(self): + """Test when Security Hub is enabled with delegated admin but auto-enable is off.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=SecurityHub(aws_provider), + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + check = securityhub_delegated_admin_enabled_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "FAIL" + assert ( + "organization auto-enable not configured" + in eu_west_1_result.status_extended + ) + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_org_admin_and_config, + ) + @mock_aws + def test_securityhub_enabled_with_admin_and_auto_enable(self): + """Test when Security Hub is enabled with delegated admin and auto-enable on (PASS).""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=SecurityHub(aws_provider), + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + check = securityhub_delegated_admin_enabled_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "PASS" + assert "delegated admin configured" in eu_west_1_result.status_extended + assert "auto-enable" in eu_west_1_result.status_extended + assert eu_west_1_result.resource_arn == HUB_ARN + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_admin_lookup_access_denied, + ) + @mock_aws + def test_admin_lookup_access_denied(self): + """AccessDenied on ListOrganizationAdminAccounts must FAIL with unknown-admin message.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=SecurityHub(aws_provider), + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + check = securityhub_delegated_admin_enabled_all_regions() + result = check.execute() + + eu_west_1_result = None + for finding in result: + if finding.region == AWS_REGION_EU_WEST_1: + eu_west_1_result = finding + break + + assert eu_west_1_result is not None + assert eu_west_1_result.status == "FAIL" + assert ( + "delegated administrator status could not be determined" + in eu_west_1_result.status_extended + ) + assert ( + "no delegated administrator configured" + not in eu_west_1_result.status_extended + ) + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_admin_lookup_unexpected, + ) + @mock_aws + def test_admin_lookup_unexpected_exception(self): + """Non-ClientError raised from ListOrganizationAdminAccounts still sets lookup_failed.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + service = SecurityHub(aws_provider) + assert service.organization_admin_lookup_failed is True + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=service, + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + result = securityhub_delegated_admin_enabled_all_regions().execute() + assert result and result[0].status == "FAIL" + assert ( + "delegated administrator status could not be determined" + in result[0].status_extended + ) + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_describe_org_config_other_client_error, + ) + @mock_aws + def test_describe_org_config_other_client_error(self): + """Non-access ClientError on DescribeOrganizationConfiguration is logged at error level.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + service = SecurityHub(aws_provider) + # organization_config_available stays False, so the auto-enable issue is suppressed + assert service.securityhubs[0].organization_config_available is False + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=service, + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + result = securityhub_delegated_admin_enabled_all_regions().execute() + # Admin is configured and hub is active; with org config unavailable the + # check should PASS because there are no other detectable issues. + assert result and result[0].status == "PASS" + + @patch( + "botocore.client.BaseClient._make_api_call", + new=mock_make_api_call_describe_org_config_unexpected, + ) + @mock_aws + def test_describe_org_config_unexpected_exception(self): + """Non-ClientError on DescribeOrganizationConfiguration is caught by bare except.""" + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.securityhub.securityhub_service import ( + SecurityHub, + ) + + service = SecurityHub(aws_provider) + assert service.securityhubs[0].organization_config_available is False + + with ( + patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), + patch( + "prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions.securityhub_client", + new=service, + ), + ): + from prowler.providers.aws.services.securityhub.securityhub_delegated_admin_enabled_all_regions.securityhub_delegated_admin_enabled_all_regions import ( + securityhub_delegated_admin_enabled_all_regions, + ) + + result = securityhub_delegated_admin_enabled_all_regions().execute() + assert result and result[0].status == "PASS"