diff --git a/permissions/prowler-additions-policy.json b/permissions/prowler-additions-policy.json index c0da045603..eb140e7c09 100644 --- a/permissions/prowler-additions-policy.json +++ b/permissions/prowler-additions-policy.json @@ -39,6 +39,8 @@ "rolesanywhere:ListTagsForResource", "rolesanywhere:ListTrustAnchors", "s3:GetAccountPublicAccessBlock", + "s3:GetObjectAcl", + "s3:ListBucket", "shield:DescribeProtection", "shield:GetSubscriptionState", "securityhub:BatchImportFindings", diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index 4abc4f1384..b80f83a2ca 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -25,6 +25,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - AWS Bedrock AgentCore privilege escalation paths in the IAM privilege escalation checks, covering Runtime, Harness, Code Interpreter and Custom Browser [(#11726)](https://github.com/prowler-cloud/prowler/pull/11726) - `--scan-secrets-validate` flag and `aws.secrets_validate` configuration option to optionally validate the secrets discovered by the secret-scanning checks against the provider APIs; secrets confirmed to be live are reported as critical [(#11694)](https://github.com/prowler-cloud/prowler/pull/11694) - `apigateway_restapi_no_secrets_in_stage_variables` check for AWS provider, scanning API Gateway REST API stage variables for hardcoded secrets such as passwords, API keys, and tokens [(#11188)](https://github.com/prowler-cloud/prowler/pull/11188) +- `s3_bucket_object_public` check for AWS provider, spot-checking a configurable sample of object ACLs in each bucket and flagging objects granted to the AllUsers or AuthenticatedUsers groups; disabled by default and opted into via the `s3_bucket_object_public_enabled` configuration option [(#9517)](https://github.com/prowler-cloud/prowler/pull/9517) ### 🔄 Changed diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json index 199183f46d..197a5aa205 100644 --- a/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json +++ b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.metadata.json @@ -3,16 +3,17 @@ "CheckID": "s3_bucket_object_public", "CheckTitle": "Spot-check S3 bucket objects for public ACLs", "CheckType": [ - "Data Protection" + "Software and Configuration Checks/AWS Security Best Practices", + "Effects/Data Exposure" ], "ServiceName": "s3", "SubServiceName": "", "ResourceIdTemplate": "arn:partition:s3:::resource", "Severity": "low", "ResourceType": "AwsS3Bucket", - "Description": "Spot-checks a configurable random sample of objects in each S3 bucket and flags any whose ACL grants read access to the AllUsers group. This is a sampling-based check, not a comprehensive audit: ListObjectsV2 returns keys in lexicographical order and only a small sample is inspected, so public objects outside the sample can be missed. The check is disabled by default and must be opted into via the s3_bucket_object_public_enabled configuration flag.", + "Description": "Spot-checks a configurable sample of objects in each S3 bucket and flags any whose ACL grants access to the AllUsers or AuthenticatedUsers groups. This is a sampling-based check, not a comprehensive audit, so public objects outside the sample can be missed. It is disabled by default and must be enabled via the s3_bucket_object_public_enabled configuration flag.", "Risk": "Public objects can be accessed by anyone on the internet, potentially leaking sensitive data. A bucket can appear private at the bucket-policy level while still containing individual objects with public ACL grants.", - "RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html", + "RelatedUrl": "", "Remediation": { "Code": { "CLI": "aws s3api put-object-acl --bucket --key --acl private", @@ -22,15 +23,15 @@ }, "Recommendation": { "Text": "For complete coverage, enable the s3_bucket_acl_prohibited check, which enforces the BucketOwnerEnforced Object Ownership setting (AWS's recommended approach since April 2023) and prevents public object ACLs entirely. Use this spot-check as a supplementary tool for manual assessments.", - "Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html" + "Url": "https://hub.prowler.com/check/s3_bucket_object_public" } }, "Categories": [ - "public-access" + "internet-exposed" ], "DependsOn": [], "RelatedTo": [ "s3_bucket_acl_prohibited" ], - "Notes": "Disabled by default. Configure s3_bucket_object_public_enabled, s3_bucket_object_public_max_objects, and s3_bucket_object_public_sample_size in the Prowler configuration. Because ListObjectsV2 returns keys lexicographically and only a sample is inspected, a PASS does not guarantee the bucket is free of public objects; use s3_bucket_acl_prohibited for full assurance." + "Notes": "Disabled by default. Configure s3_bucket_object_public_enabled, s3_bucket_object_public_max_objects, and s3_bucket_object_public_sample_size in the Prowler configuration. Because only a sample of objects is inspected, a PASS does not guarantee the bucket is free of public objects; use s3_bucket_acl_prohibited for full assurance." } diff --git a/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py index b0d2603230..fb9152b85c 100644 --- a/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py +++ b/prowler/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public.py @@ -1,11 +1,12 @@ -import random - -from botocore.exceptions import ClientError - from prowler.lib.check.models import Check, Check_Report_AWS from prowler.providers.aws.services.s3.s3_client import s3_client -ALL_USERS_URI = "http://acs.amazonaws.com/groups/global/AllUsers" +# ACL grantee groups that make an object effectively public. AllUsers is anyone on +# the internet; AuthenticatedUsers is any authenticated AWS principal (any account). +PUBLIC_ACL_URIS = { + "http://acs.amazonaws.com/groups/global/AllUsers", + "http://acs.amazonaws.com/groups/global/AuthenticatedUsers", +} class s3_bucket_object_public(Check): @@ -15,51 +16,47 @@ class s3_bucket_object_public(Check): if not s3_client.audit_config.get("s3_bucket_object_public_enabled", False): return findings - max_objects = s3_client.audit_config.get( - "s3_bucket_object_public_max_objects", 100 - ) - sample_size = s3_client.audit_config.get( - "s3_bucket_object_public_sample_size", 3 - ) - for bucket in s3_client.buckets.values(): + sampling = bucket.object_sampling + # Sampling is populated by the service layer only when the check is + # enabled; skip any bucket that was not sampled. + if sampling is None or not sampling.performed: + continue + report = Check_Report_AWS(metadata=self.metadata(), resource=bucket) - try: - regional_client = s3_client.regional_clients[bucket.region] - objects = regional_client.list_objects_v2( - Bucket=bucket.name, MaxKeys=max_objects - ) + if sampling.error_code is not None: + report.status = "MANUAL" + if sampling.error_code == "AccessDenied": + report.status_extended = ( + f"Access Denied when spot-checking objects in bucket " + f"{bucket.name}." + ) + else: + report.status_extended = ( + f"Could not spot-check objects in bucket {bucket.name}: " + f"{sampling.error_message}." + ) + elif sampling.is_empty: + report.status = "PASS" + report.status_extended = f"S3 Bucket {bucket.name} is empty." + else: + public_objects = [ + obj.key + for obj in sampling.objects + if any( + grantee.type == "Group" and grantee.URI in PUBLIC_ACL_URIS + for grantee in obj.grantees + ) + ] + sampled = len(sampling.objects) - contents = objects.get("Contents", []) - if not contents: - report.status = "PASS" - report.status_extended = f"S3 Bucket {bucket.name} is empty." - findings.append(report) - continue - - all_keys = [obj["Key"] for obj in contents] - sample_keys = random.sample(all_keys, min(len(all_keys), sample_size)) - sampled = len(sample_keys) - - public_objects_found = [] - for key in sample_keys: - acl = regional_client.get_object_acl(Bucket=bucket.name, Key=key) - for grant in acl.get("Grants", []): - grantee = grant.get("Grantee", {}) - if ( - grantee.get("Type") == "Group" - and grantee.get("URI") == ALL_USERS_URI - ): - public_objects_found.append(key) - break - - if public_objects_found: + if public_objects: report.status = "FAIL" report.status_extended = ( f"S3 Bucket {bucket.name} has public objects detected in " f"spot-check sample of {sampled} objects: " - f"{', '.join(public_objects_found)}." + f"{', '.join(public_objects)}." ) else: report.status = "PASS" @@ -70,19 +67,6 @@ class s3_bucket_object_public(Check): f"settings." ) - except ClientError as error: - report.status = "MANUAL" - if error.response["Error"]["Code"] == "AccessDenied": - report.status_extended = ( - f"Access Denied when spot-checking objects in bucket " - f"{bucket.name}." - ) - else: - report.status_extended = ( - f"Could not spot-check objects in bucket {bucket.name}: " - f"{error}." - ) - findings.append(report) return findings diff --git a/prowler/providers/aws/services/s3/s3_service.py b/prowler/providers/aws/services/s3/s3_service.py index 00605fbe6e..fb3abf1f6e 100644 --- a/prowler/providers/aws/services/s3/s3_service.py +++ b/prowler/providers/aws/services/s3/s3_service.py @@ -36,6 +36,10 @@ class S3(AWSService): self.__threading_call__( self._get_bucket_notification_configuration, self.buckets.values() ) + # Object-level ACL sampling is expensive and opt-in, so only run it when + # the s3_bucket_object_public check is explicitly enabled in the config. + if self.audit_config.get("s3_bucket_object_public_enabled", False): + self.__threading_call__(self._get_public_objects, self.buckets.values()) def _list_buckets(self, provider): logger.info("S3 - Listing buckets...") @@ -487,6 +491,63 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _get_public_objects(self, bucket): + logger.info("S3 - Spot-checking bucket objects for public ACLs...") + max_objects = self.audit_config.get("s3_bucket_object_public_max_objects", 100) + sample_size = self.audit_config.get("s3_bucket_object_public_sample_size", 3) + sampling = BucketObjectSampling(performed=True) + regional_client = None + try: + regional_client = self.regional_clients[bucket.region] + contents = regional_client.list_objects_v2( + Bucket=bucket.name, MaxKeys=max_objects + ).get("Contents", []) + + if not contents: + sampling.is_empty = True + bucket.object_sampling = sampling + return + + all_keys = [obj["Key"] for obj in contents] + # Deterministic, evenly-spaced sampling so findings are reproducible + # across scans instead of flipping between PASS/FAIL with a random sample. + if len(all_keys) <= sample_size: + sample_keys = all_keys + else: + step = len(all_keys) // sample_size + sample_keys = [all_keys[i * step] for i in range(sample_size)] + + for key in sample_keys: + acl = regional_client.get_object_acl(Bucket=bucket.name, Key=key) + grantees = [] + for grant in acl.get("Grants", []): + grant_grantee = grant.get("Grantee", {}) + grantee = ACL_Grantee(type=grant_grantee.get("Type", "")) + grantee.display_name = grant_grantee.get("DisplayName") + grantee.ID = grant_grantee.get("ID") + grantee.URI = grant_grantee.get("URI") + grantee.permission = grant.get("Permission") + grantees.append(grantee) + sampling.objects.append(ObjectACL(key=key, grantees=grantees)) + + bucket.object_sampling = sampling + except ClientError as error: + sampling.error_code = error.response["Error"]["Code"] + sampling.error_message = str(error) + bucket.object_sampling = sampling + region = regional_client.region if regional_client else bucket.region + logger.warning( + f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + sampling.error_code = error.__class__.__name__ + sampling.error_message = str(error) + bucket.object_sampling = sampling + region = regional_client.region if regional_client else bucket.region + logger.error( + f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + def _head_bucket(self, bucket_name): logger.info("S3 - Checking if bucket exists...") try: @@ -654,6 +715,19 @@ class PublicAccessBlock(BaseModel): restrict_public_buckets: bool +class ObjectACL(BaseModel): + key: str + grantees: List[ACL_Grantee] = Field(default_factory=list) + + +class BucketObjectSampling(BaseModel): + performed: bool = False + is_empty: bool = False + objects: List[ObjectACL] = Field(default_factory=list) + error_code: Optional[str] = None + error_message: Optional[str] = None + + class AccessPoint(BaseModel): arn: str account_id: str @@ -703,3 +777,4 @@ class Bucket(BaseModel): lifecycle: List[LifeCycleRule] = Field(default_factory=list) replication_rules: List[ReplicationRule] = Field(default_factory=list) notification_config: Dict = Field(default_factory=dict) + object_sampling: Optional[BucketObjectSampling] = None diff --git a/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py b/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py index b3aa2756f0..aaeb9f4108 100644 --- a/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py +++ b/tests/providers/aws/services/s3/s3_bucket_object_public/s3_bucket_object_public_test.py @@ -10,6 +10,12 @@ CHECK_MODULE = ( "prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public" ) +ENABLED_CONFIG = { + "s3_bucket_object_public_enabled": True, + "s3_bucket_object_public_max_objects": 100, + "s3_bucket_object_public_sample_size": 3, +} + class Test_s3_bucket_object_public: @mock_aws @@ -19,6 +25,7 @@ class Test_s3_bucket_object_public: from prowler.providers.aws.services.s3.s3_service import S3 + # No audit_config -> check disabled by default aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) with mock.patch( @@ -37,10 +44,11 @@ class Test_s3_bucket_object_public: assert result == [] @mock_aws - def test_bucket_empty_passes(self): + def test_service_does_not_sample_when_disabled(self): s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) - bucket_name = "bucket-empty" + bucket_name = "bucket-not-sampled" s3_client_us_east_1.create_bucket(Bucket=bucket_name) + s3_client_us_east_1.put_object(Bucket=bucket_name, Key="a.txt", Body=b"x") from prowler.providers.aws.services.s3.s3_service import S3 @@ -51,11 +59,26 @@ class Test_s3_bucket_object_public: return_value=aws_provider, ): s3_service = S3(aws_provider) - s3_service.audit_config = { - "s3_bucket_object_public_enabled": True, - "s3_bucket_object_public_max_objects": 100, - "s3_bucket_object_public_sample_size": 3, - } + bucket = next(iter(s3_service.buckets.values())) + assert bucket.object_sampling is None + + @mock_aws + def test_bucket_empty_passes(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-empty" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( s3_bucket_object_public, @@ -86,18 +109,15 @@ class Test_s3_bucket_object_public: from prowler.providers.aws.services.s3.s3_service import S3 - aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) with mock.patch( "prowler.providers.common.provider.Provider.get_global_provider", return_value=aws_provider, ): s3_service = S3(aws_provider) - s3_service.audit_config = { - "s3_bucket_object_public_enabled": True, - "s3_bucket_object_public_max_objects": 100, - "s3_bucket_object_public_sample_size": 3, - } with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( s3_bucket_object_public, @@ -130,18 +150,15 @@ class Test_s3_bucket_object_public: from prowler.providers.aws.services.s3.s3_service import S3 - aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) with mock.patch( "prowler.providers.common.provider.Provider.get_global_provider", return_value=aws_provider, ): s3_service = S3(aws_provider) - s3_service.audit_config = { - "s3_bucket_object_public_enabled": True, - "s3_bucket_object_public_max_objects": 100, - "s3_bucket_object_public_sample_size": 3, - } with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( s3_bucket_object_public, @@ -158,6 +175,40 @@ class Test_s3_bucket_object_public: "spot-check sample of" ) in result[0].status_extended + @mock_aws + def test_bucket_with_authenticated_users_object_fails(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name = "bucket-authenticated-object" + public_key = "authenticated.txt" + s3_client_us_east_1.create_bucket(Bucket=bucket_name) + s3_client_us_east_1.put_object(Bucket=bucket_name, Key=public_key, Body=b"x") + s3_client_us_east_1.put_object_acl( + Bucket=bucket_name, Key=public_key, ACL="authenticated-read" + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + s3_service = S3(aws_provider) + with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): + from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( + s3_bucket_object_public, + ) + + check = s3_bucket_object_public() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert public_key in result[0].status_extended + @mock_aws def test_access_denied_on_list_objects_reports_manual(self): s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) @@ -166,25 +217,27 @@ class Test_s3_bucket_object_public: from prowler.providers.aws.services.s3.s3_service import S3 - aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) with mock.patch( "prowler.providers.common.provider.Provider.get_global_provider", return_value=aws_provider, ): s3_service = S3(aws_provider) - s3_service.audit_config = { - "s3_bucket_object_public_enabled": True, - "s3_bucket_object_public_max_objects": 100, - "s3_bucket_object_public_sample_size": 3, - } + # Simulate AccessDenied when sampling and re-run sampling for the bucket regional_client = mock.MagicMock() + regional_client.region = AWS_REGION_US_EAST_1 regional_client.list_objects_v2.side_effect = ClientError( {"Error": {"Code": "AccessDenied", "Message": "denied"}}, "ListObjectsV2", ) s3_service.regional_clients[AWS_REGION_US_EAST_1] = regional_client + bucket = next(iter(s3_service.buckets.values())) + bucket.object_sampling = None + s3_service._get_public_objects(bucket) with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import ( @@ -209,25 +262,26 @@ class Test_s3_bucket_object_public: from prowler.providers.aws.services.s3.s3_service import S3 - aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG + ) with mock.patch( "prowler.providers.common.provider.Provider.get_global_provider", return_value=aws_provider, ): s3_service = S3(aws_provider) - s3_service.audit_config = { - "s3_bucket_object_public_enabled": True, - "s3_bucket_object_public_max_objects": 100, - "s3_bucket_object_public_sample_size": 3, - } regional_client = mock.MagicMock() + regional_client.region = AWS_REGION_US_EAST_1 regional_client.list_objects_v2.side_effect = ClientError( {"Error": {"Code": "InternalError", "Message": "boom"}}, "ListObjectsV2", ) s3_service.regional_clients[AWS_REGION_US_EAST_1] = regional_client + bucket = next(iter(s3_service.buckets.values())) + bucket.object_sampling = None + s3_service._get_public_objects(bucket) with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service): from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (