diff --git a/permissions/prowler-additions-policy.json b/permissions/prowler-additions-policy.json index c0da045603..eb140e7c09 100644 --- a/permissions/prowler-additions-policy.json +++ b/permissions/prowler-additions-policy.json @@ -39,6 +39,8 @@ "rolesanywhere:ListTagsForResource", "rolesanywhere:ListTrustAnchors", "s3:GetAccountPublicAccessBlock", + "s3:GetObjectAcl", + "s3:ListBucket", "shield:DescribeProtection", "shield:GetSubscriptionState", "securityhub:BatchImportFindings", diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 9161cee8b8..11579b54ed 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -26,6 +26,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - AWS Bedrock AgentCore privilege escalation paths in the IAM privilege escalation checks, covering Runtime, Harness, Code Interpreter and Custom Browser [(#11726)](https://github.com/prowler-cloud/prowler/pull/11726) - `--scan-secrets-validate` flag and `aws.secrets_validate` configuration option to optionally validate the secrets discovered by the secret-scanning checks against the provider APIs; secrets confirmed to be live are reported as critical [(#11694)](https://github.com/prowler-cloud/prowler/pull/11694) - `apigateway_restapi_no_secrets_in_stage_variables` check for AWS provider, scanning API Gateway REST API stage variables for hardcoded secrets such as passwords, API keys, and tokens [(#11188)](https://github.com/prowler-cloud/prowler/pull/11188) +- `s3_bucket_object_public` check for AWS provider, spot-checking a configurable sample of object ACLs in each bucket and flagging objects granted to the AllUsers or AuthenticatedUsers groups; disabled by default and opted into via the `s3_bucket_object_public_enabled` configuration option [(#9517)](https://github.com/prowler-cloud/prowler/pull/9517) - Azure provider now supports `--azure-resource-group` to scope resource-level checks to specific resource groups across all accessible subscriptions [(#10657)](https://github.com/prowler-cloud/prowler/pull/10657) ### 🔄 Changed @@ -33,6 +34,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - Replaced the `detect-secrets` library with [Kingfisher](https://github.com/mongodb/kingfisher) as the engine for the secret-scanning checks; scans run fully offline by default and obvious placeholder values are no longer reported as findings [(#11694)](https://github.com/prowler-cloud/prowler/pull/11694) - Removed the `detect_secrets_plugins` configuration option, which is no longer used by the new secret-scanning engine [(#11694)](https://github.com/prowler-cloud/prowler/pull/11694) - `awslambda_function_no_secrets_in_code` now supports a `secrets_ignore_files` audit-config option to skip files inside the deployment package by glob pattern (e.g. `*.deps.json`), suppressing .NET dependency-manifest false positives without masking real secrets [(#11222)](https://github.com/prowler-cloud/prowler/pull/11222) +- AWS scans for EBS snapshots, Backup recovery points, CloudWatch log groups, Lambda functions, ECS task definitions, and CodeArtifact packages now support configurable resource analysis limits via `aws.max_scanned_resources_per_service`; limits are disabled by default and only positive values cap analyzed resources [(#11228)](https://github.com/prowler-cloud/prowler/pull/11228) ### 🐞 Fixed @@ -46,10 +48,6 @@ All notable changes to the **Prowler SDK** are documented in this file. - GitHub default branch protection checks now evaluate repository rulesets in addition to classic branch protection, avoiding false positives for repositories that enforce protection through rulesets [(#11723)](https://github.com/prowler-cloud/prowler/pull/11723) - Okta, Alibaba Cloud and OpenStack scan-config sections are now validated against a registered schema instead of being silently accepted, so their configurable thresholds (session/idle timeouts, retention days, image-sharing and secret-scanning settings) log a warning and fall back to the built-in default whenever a value is out of range [(#11725)](https://github.com/prowler-cloud/prowler/pull/11725) -### 🔄 Changed - -- AWS scans for EBS snapshots, Backup recovery points, CloudWatch log groups, Lambda functions, ECS task definitions, and CodeArtifact packages now support configurable resource analysis limits via `aws.max_scanned_resources_per_service`; limits are disabled by default and only positive values cap analyzed resources [(#11228)](https://github.com/prowler-cloud/prowler/pull/11228) - --- ## [5.31.1] (Prowler v5.31.1) diff --git a/prowler/config/config.yaml b/prowler/config/config.yaml index d542da0b75..3a059e11d0 100644 --- a/prowler/config/config.yaml +++ b/prowler/config/config.yaml @@ -483,6 +483,18 @@ aws: # Minimum retention period in hours for Kinesis streams min_kinesis_stream_retention_hours: 168 # 7 days + # AWS S3 Configuration + # aws.s3_bucket_object_public + # This check performs a spot-check by sampling object ACLs within a bucket, so + # it is disabled by default. For complete coverage, rely on s3_bucket_acl_prohibited + # which enforces BucketOwnerEnforced Object Ownership (AWS's recommended approach). + # Set s3_bucket_object_public_enabled to True to opt in. + s3_bucket_object_public_enabled: False + # Maximum number of objects to list per bucket (upper bound for the sampling pool) + s3_bucket_object_public_max_objects: 100 + # Number of objects to randomly sample from the listed pool and inspect ACLs for + s3_bucket_object_public_sample_size: 3 + # AWS CodeBuild Configuration # aws.codebuild_project_uses_allowed_github_organizations codebuild_github_allowed_organizations: diff --git a/prowler/config/schema/aws.py b/prowler/config/schema/aws.py index 4a31458f7d..4e52093029 100644 --- a/prowler/config/schema/aws.py +++ b/prowler/config/schema/aws.py @@ -458,3 +458,31 @@ class AWSProviderConfig(ProviderConfigBase): le=8760, description="Hours of Kinesis stream retention. Range: 24..8760 (1 day .. 1 year).", ) + + # --- S3 -------------------------------------------------------------- + s3_bucket_object_public_enabled: Optional[bool] = Field( + default=None, + description=( + "Enable the s3_bucket_object_public spot-check, which samples object " + "ACLs per bucket. Disabled by default because it lists and reads object " + "ACLs, which is expensive on large buckets." + ), + ) + s3_bucket_object_public_max_objects: Optional[int] = Field( + default=None, + ge=1, + le=1000, + description=( + "Max objects to list per bucket as the sampling pool. Range: 1..1000 " + "(ListObjectsV2 returns at most 1000 keys per page)." + ), + ) + s3_bucket_object_public_sample_size: Optional[int] = Field( + default=None, + ge=1, + le=1000, + description=( + "Number of objects sampled from the listed pool for ACL inspection. " + "Range: 1..1000. Must be positive to avoid a no-op or invalid sample." + ), + ) diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_public/__init__.py b/prowler/providers/aws/services/s3/s3_bucket_object_public/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json new file mode 100644 index 0000000000..197a5aa205 --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "aws", + "CheckID": "s3_bucket_object_public", + "CheckTitle": "Spot-check S3 bucket objects for public ACLs", + "CheckType": [ + "Software and Configuration Checks/AWS Security Best Practices", + "Effects/Data Exposure" + ], + "ServiceName": "s3", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:s3:::resource", + "Severity": "low", + "ResourceType": "AwsS3Bucket", + "Description": "Spot-checks a configurable sample of objects in each S3 bucket and flags any whose ACL grants access to the AllUsers or AuthenticatedUsers groups. This is a sampling-based check, not a comprehensive audit, so public objects outside the sample can be missed. It is disabled by default and must be enabled via the s3_bucket_object_public_enabled configuration flag.", + "Risk": "Public objects can be accessed by anyone on the internet, potentially leaking sensitive data. A bucket can appear private at the bucket-policy level while still containing individual objects with public ACL grants.", + "RelatedUrl": "", + "Remediation": { + "Code": { + "CLI": "aws s3api put-object-acl --bucket --key --acl private", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "For complete coverage, enable the s3_bucket_acl_prohibited check, which enforces the BucketOwnerEnforced Object Ownership setting (AWS's recommended approach since April 2023) and prevents public object ACLs entirely. Use this spot-check as a supplementary tool for manual assessments.", + "Url": "https://hub.prowler.com/check/s3_bucket_object_public" + } + }, + "Categories": [ + "internet-exposed" + ], + "DependsOn": [], + "RelatedTo": [ + "s3_bucket_acl_prohibited" + ], + "Notes": "Disabled by default. Configure s3_bucket_object_public_enabled, s3_bucket_object_public_max_objects, and s3_bucket_object_public_sample_size in the Prowler configuration. Because only a sample of objects is inspected, a PASS does not guarantee the bucket is free of public objects; use s3_bucket_acl_prohibited for full assurance." +} diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py new file mode 100644 index 0000000000..9912da561b --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py @@ -0,0 +1,82 @@ +from typing import List + +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.s3.s3_client import s3_client + +# ACL grantee groups that make an object effectively public. AllUsers is anyone on +# the internet; AuthenticatedUsers is any authenticated AWS principal (any account). +PUBLIC_ACL_URIS = { + "http://acs.amazonaws.com/groups/global/AllUsers", + "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", +} + + +class s3_bucket_object_public(Check): + """Spot-check a sample of S3 bucket objects for public ACL grants.""" + + def execute(self) -> List[Check_Report_AWS]: + """Evaluate sampled object ACLs for AllUsers/AuthenticatedUsers grants. + + Returns: + List[Check_Report_AWS]: One report per sampled bucket (empty when the + check is disabled via configuration). + """ + findings = [] + + if not s3_client.audit_config.get("s3_bucket_object_public_enabled", False): + return findings + + for bucket in s3_client.buckets.values(): + sampling = bucket.object_sampling + # Sampling is populated by the service layer only when the check is + # enabled; skip any bucket that was not sampled. + if sampling is None or not sampling.performed: + continue + + report = Check_Report_AWS(metadata=self.metadata(), resource=bucket) + + if sampling.error_code is not None: + report.status = "MANUAL" + if sampling.error_code == "AccessDenied": + report.status_extended = ( + f"Access Denied when spot-checking objects in bucket " + f"{bucket.name}." + ) + else: + report.status_extended = ( + f"Could not spot-check objects in bucket {bucket.name}: " + f"{sampling.error_message}." + ) + elif sampling.is_empty: + report.status = "PASS" + report.status_extended = f"S3 Bucket {bucket.name} is empty." + else: + public_objects = [ + obj.key + for obj in sampling.objects + if any( + grantee.type == "Group" and grantee.URI in PUBLIC_ACL_URIS + for grantee in obj.grantees + ) + ] + sampled = len(sampling.objects) + + if public_objects: + report.status = "FAIL" + report.status_extended = ( + f"S3 Bucket {bucket.name} has public objects detected in " + f"spot-check sample of {sampled} objects: " + f"{', '.join(public_objects)}." + ) + else: + report.status = "PASS" + report.status_extended = ( + f"No public objects detected in spot-check sample of " + f"{sampled} objects in bucket {bucket.name}. For complete " + f"assurance, ensure ACLs are disabled via Object Ownership " + f"settings." + ) + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/s3/s3_service.py b/prowler/providers/aws/services/s3/s3_service.py index 00605fbe6e..94768138df 100644 --- a/prowler/providers/aws/services/s3/s3_service.py +++ b/prowler/providers/aws/services/s3/s3_service.py @@ -36,6 +36,10 @@ class S3(AWSService): self.__threading_call__( self._get_bucket_notification_configuration, self.buckets.values() ) + # Object-level ACL sampling is expensive and opt-in, so only run it when + # the s3_bucket_object_public check is explicitly enabled in the config. + if self.audit_config.get("s3_bucket_object_public_enabled", False): + self.__threading_call__(self._get_public_objects, self.buckets.values()) def _list_buckets(self, provider): logger.info("S3 - Listing buckets...") @@ -487,6 +491,69 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _get_public_objects(self, bucket): + logger.info("S3 - Spot-checking bucket objects for public ACLs...") + max_objects = self.audit_config.get("s3_bucket_object_public_max_objects", 100) + sample_size = self.audit_config.get("s3_bucket_object_public_sample_size", 3) + # Guard against misconfigured non-positive values: a zero sample size would + # raise ZeroDivisionError and a negative one would silently sample nothing. + if not isinstance(max_objects, int) or max_objects <= 0: + max_objects = 100 + if not isinstance(sample_size, int) or sample_size <= 0: + sample_size = 3 + sampling = BucketObjectSampling(performed=True) + regional_client = None + try: + regional_client = self.regional_clients[bucket.region] + contents = regional_client.list_objects_v2( + Bucket=bucket.name, MaxKeys=max_objects + ).get("Contents", []) + + if not contents: + sampling.is_empty = True + bucket.object_sampling = sampling + return + + all_keys = [obj["Key"] for obj in contents] + # Deterministic, evenly-spaced sampling so findings are reproducible + # across scans instead of flipping between PASS/FAIL with a random sample. + if len(all_keys) <= sample_size: + sample_keys = all_keys + else: + step = len(all_keys) // sample_size + sample_keys = [all_keys[i * step] for i in range(sample_size)] + + for key in sample_keys: + acl = regional_client.get_object_acl(Bucket=bucket.name, Key=key) + grantees = [] + for grant in acl.get("Grants", []): + grant_grantee = grant.get("Grantee", {}) + grantee = ACL_Grantee(type=grant_grantee.get("Type", "")) + grantee.display_name = grant_grantee.get("DisplayName") + grantee.ID = grant_grantee.get("ID") + grantee.URI = grant_grantee.get("URI") + grantee.permission = grant.get("Permission") + grantees.append(grantee) + sampling.objects.append(ObjectACL(key=key, grantees=grantees)) + + bucket.object_sampling = sampling + except ClientError as error: + sampling.error_code = error.response["Error"]["Code"] + sampling.error_message = str(error) + bucket.object_sampling = sampling + region = regional_client.region if regional_client else bucket.region + logger.warning( + f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + sampling.error_code = error.__class__.__name__ + sampling.error_message = str(error) + bucket.object_sampling = sampling + region = regional_client.region if regional_client else bucket.region + logger.error( + f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + def _head_bucket(self, bucket_name): logger.info("S3 - Checking if bucket exists...") try: @@ -654,6 +721,19 @@ class PublicAccessBlock(BaseModel): restrict_public_buckets: bool +class ObjectACL(BaseModel): + key: str + grantees: List[ACL_Grantee] = Field(default_factory=list) + + +class BucketObjectSampling(BaseModel): + performed: bool = False + is_empty: bool = False + objects: List[ObjectACL] = Field(default_factory=list) + error_code: Optional[str] = None + error_message: Optional[str] = None + + class AccessPoint(BaseModel): arn: str account_id: str @@ -703,3 +783,4 @@ class Bucket(BaseModel): lifecycle: List[LifeCycleRule] = Field(default_factory=list) replication_rules: List[ReplicationRule] = Field(default_factory=list) notification_config: Dict = Field(default_factory=dict) + object_sampling: Optional[BucketObjectSampling] = None diff --git a/tests/providers/aws/services/s3/s3_bucket_object_public/__init__.py b/tests/providers/aws/services/s3/s3_bucket_object_public/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py b/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py new file mode 100644 index 0000000000..2ac143ff47 --- /dev/null +++ b/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py @@ -0,0 +1,302 @@ +from unittest import mock + +from boto3 import client +from botocore.exceptions import ClientError +from moto import mock_aws + +from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider + +CHECK_MODULE = ( + "prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public" +) + +ENABLED_CONFIG = { + "s3_bucket_object_public_enabled": True, + "s3_bucket_object_public_max_objects": 100, + "s3_bucket_object_public_sample_size": 3, +} + + +class Test_s3_bucket_object_public: + @mock_aws + def test_check_disabled_by_default_returns_no_findings(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + s3_client_us_east_1.create_bucket(Bucket="bucket-disabled") + + from prowler.providers.aws.services.s3.s3_service import S3 + + # No audit_config -> check disabled by default + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert result == [] + + @mock_aws + def test_no_buckets_returns_no_findings(self): + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert result == [] + + @mock_aws + def test_bucket_empty_passes(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-empty" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].status_extended == ( + f"S3 Bucket {bucket_name} is empty." + ) + assert result[0].resource_id == bucket_name + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_bucket_with_only_private_objects_passes(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-private-objects" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + s3_client_us_east_1.put_object( + Bucket=bucket_name, Key="private-1.txt", Body=b"x" + ) + s3_client_us_east_1.put_object( + Bucket=bucket_name, Key="private-2.txt", Body=b"x" + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert "No public objects detected in spot-check sample of" in ( + result[0].status_extended + ) + assert bucket_name in result[0].status_extended + assert ( + "For complete assurance, ensure ACLs are disabled via " + "Object Ownership settings." + ) in result[0].status_extended + + @mock_aws + def test_bucket_with_public_object_fails(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-public-object" + public_key = "public.txt" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + s3_client_us_east_1.put_object(Bucket=bucket_name, Key=public_key, Body=b"x") + s3_client_us_east_1.put_object_acl( + Bucket=bucket_name, Key=public_key, ACL="public-read" + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert public_key in result[0].status_extended + assert ( + f"S3 Bucket {bucket_name} has public objects detected in " + "spot-check sample of" + ) in result[0].status_extended + + @mock_aws + def test_bucket_with_authenticated_users_object_fails(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-authenticated-object" + public_key = "authenticated.txt" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + s3_client_us_east_1.put_object(Bucket=bucket_name, Key=public_key, Body=b"x") + s3_client_us_east_1.put_object_acl( + Bucket=bucket_name, Key=public_key, ACL="authenticated-read" + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert public_key in result[0].status_extended + + @mock_aws + def test_access_denied_on_list_objects_reports_manual(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-access-denied" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + + # Simulate AccessDenied when sampling and re-run sampling for the bucket + regional_client = mock.MagicMock() + regional_client.region = AWS_REGION_US_EAST_1 + regional_client.list_objects_v2.side_effect = ClientError( + {"Error": {"Code": "AccessDenied", "Message": "denied"}}, + "ListObjectsV2", + ) + s3_service.regional_clients[AWS_REGION_US_EAST_1] = regional_client + bucket = next(iter(s3_service.buckets.values())) + bucket.object_sampling = None + s3_service._get_public_objects(bucket) + + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "MANUAL" + assert result[0].status_extended == ( + f"Access Denied when spot-checking objects in bucket " + f"{bucket_name}." + ) + + @mock_aws + def test_other_client_error_reports_manual(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-other-error" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + + regional_client = mock.MagicMock() + regional_client.region = AWS_REGION_US_EAST_1 + regional_client.list_objects_v2.side_effect = ClientError( + {"Error": {"Code": "InternalError", "Message": "boom"}}, + "ListObjectsV2", + ) + s3_service.regional_clients[AWS_REGION_US_EAST_1] = regional_client + bucket = next(iter(s3_service.buckets.values())) + bucket.object_sampling = None + s3_service._get_public_objects(bucket) + + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "MANUAL" + assert ( + f"Could not spot-check objects in bucket {bucket_name}" + ) in result[0].status_extended diff --git a/tests/providers/aws/services/s3/s3_service_test.py b/tests/providers/aws/services/s3/s3_service_test.py index 7dec192f2e..d6d9575a11 100644 --- a/tests/providers/aws/services/s3/s3_service_test.py +++ b/tests/providers/aws/services/s3/s3_service_test.py @@ -642,3 +642,47 @@ class Test_S3_Service: assert s3control.access_points[arn].public_access_block.ignore_public_acls assert s3control.access_points[arn].public_access_block.block_public_policy assert s3control.access_points[arn].public_access_block.restrict_public_buckets + + # Test S3 object ACL sampling is skipped unless the check is enabled + @mock_aws + def test_get_public_objects_disabled_by_default(self): + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "test-bucket" + bucket_arn = f"arn:aws:s3:::{bucket_name}" + s3_client.create_bucket(Bucket=bucket_name) + s3_client.put_object(Bucket=bucket_name, Key="a.txt", Body=b"x") + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + s3 = S3(aws_provider) + + assert s3.buckets[bucket_arn].object_sampling is None + + # Test S3 object ACL sampling detects a public object when enabled + @mock_aws + def test_get_public_objects_detects_public_acl(self): + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "test-bucket" + bucket_arn = f"arn:aws:s3:::{bucket_name}" + s3_client.create_bucket(Bucket=bucket_name) + s3_client.put_object(Bucket=bucket_name, Key="public.txt", Body=b"x") + s3_client.put_object_acl( + Bucket=bucket_name, Key="public.txt", ACL="public-read" + ) + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], + audit_config={"s3_bucket_object_public_enabled": True}, + ) + s3 = S3(aws_provider) + + sampling = s3.buckets[bucket_arn].object_sampling + assert sampling is not None + assert sampling.performed is True + assert sampling.is_empty is False + assert len(sampling.objects) == 1 + assert sampling.objects[0].key == "public.txt" + assert any( + grantee.type == "Group" + and grantee.URI == "http://acs.amazonaws.com/groups/global/AllUsers" + for grantee in sampling.objects[0].grantees + )