diff --git a/prowler/providers/aws/services/s3/s3_access_point_public_access_block/__init__.py b/prowler/providers/aws/services/s3/s3_access_point_public_access_block/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block.metadata.json b/prowler/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block.metadata.json new file mode 100644 index 0000000000..024aa4e041 --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block.metadata.json @@ -0,0 +1,32 @@ +{ + "Provider": "aws", + "CheckID": "s3_access_point_public_access_block", + "CheckTitle": "Block Public Access Settings enabled on Access Points.", + "CheckType": [ + "Data Protection" + ], + "ServiceName": "s3", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:s3:::bucket_name", + "Severity": "critical", + "ResourceType": "AwsS3AccessPoint", + "Description": "Ensures that public access is blocked on S3 Access Points.", + "Risk": "Leaving S3 access points open to the public in AWS can lead to data exposure, breaches, compliance violations, unauthorized access, and data integrity issues.", + "RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-points.html#access-points-policies", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/s3-controls.html#s3-19", + "Terraform": "" + }, + "Recommendation": { + "Text": "Ensure S3 access points are private by default, applying strict access controls, and regularly auditing permissions to prevent unauthorized public access.", + "Url": "https://docs.aws.amazon.com/config/latest/developerguide/s3-access-point-public-access-blocks.html" + } + }, + "Categories": [], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block.py b/prowler/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block.py new file mode 100644 index 0000000000..b4746fe81f --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block.py @@ -0,0 +1,27 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.s3.s3control_client import s3control_client + + +class s3_access_point_public_access_block(Check): + def execute(self): + findings = [] + for arn, access_point in s3control_client.access_points.items(): + report = Check_Report_AWS(self.metadata()) + report.region = access_point.region + report.resource_id = access_point.name + report.resource_arn = arn + report.status = "PASS" + report.status_extended = f"Access Point {access_point.name} of bucket {access_point.bucket} does have Public Access Block enabled." + + if not ( + access_point.public_access_block.block_public_acls + and access_point.public_access_block.ignore_public_acls + and access_point.public_access_block.block_public_policy + and access_point.public_access_block.restrict_public_buckets + ): + report.status = "FAIL" + report.status_extended = f"Access Point {access_point.name} of bucket {access_point.bucket} does not have Public Access Block enabled." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/s3/s3_service.py b/prowler/providers/aws/services/s3/s3_service.py index 80b8fce49c..f525ecf814 100644 --- a/prowler/providers/aws/services/s3/s3_service.py +++ b/prowler/providers/aws/services/s3/s3_service.py @@ -9,25 +9,24 @@ from prowler.lib.scan_filters.scan_filters import is_resource_filtered from prowler.providers.aws.lib.service.service import AWSService -################## S3 class S3(AWSService): def __init__(self, provider): # Call AWSService's __init__ super().__init__(__class__.__name__, provider) self.account_arn_template = f"arn:{self.audited_partition}:s3:{self.region}:{self.audited_account}:account" self.regions_with_buckets = [] - self.buckets = self.__list_buckets__(provider) - self.__threading_call__(self.__get_bucket_versioning__, self.buckets) - self.__threading_call__(self.__get_bucket_logging__, self.buckets) - self.__threading_call__(self.__get_bucket_policy__, self.buckets) - self.__threading_call__(self.__get_bucket_acl__, self.buckets) - self.__threading_call__(self.__get_public_access_block__, self.buckets) - self.__threading_call__(self.__get_bucket_encryption__, self.buckets) - self.__threading_call__(self.__get_bucket_ownership_controls__, self.buckets) - self.__threading_call__(self.__get_object_lock_configuration__, self.buckets) - self.__threading_call__(self.__get_bucket_tagging__, self.buckets) + self.buckets = self._list_buckets(provider) + self.__threading_call__(self._get_bucket_versioning, self.buckets) + self.__threading_call__(self._get_bucket_logging, self.buckets) + self.__threading_call__(self._get_bucket_policy, self.buckets) + self.__threading_call__(self._get_bucket_acl, self.buckets) + self.__threading_call__(self._get_public_access_block, self.buckets) + self.__threading_call__(self._get_bucket_encryption, self.buckets) + self.__threading_call__(self._get_bucket_ownership_controls, self.buckets) + self.__threading_call__(self._get_object_lock_configuration, self.buckets) + self.__threading_call__(self._get_bucket_tagging, self.buckets) - def __list_buckets__(self, provider): + def _list_buckets(self, provider): logger.info("S3 - Listing buckets...") buckets = [] try: @@ -92,7 +91,7 @@ class S3(AWSService): ) return buckets - def __get_bucket_versioning__(self, bucket): + def _get_bucket_versioning(self, bucket): logger.info("S3 - Get buckets versioning...") try: regional_client = self.regional_clients[bucket.region] @@ -124,7 +123,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_bucket_encryption__(self, bucket): + def _get_bucket_encryption(self, bucket): logger.info("S3 - Get buckets encryption...") try: regional_client = self.regional_clients[bucket.region] @@ -156,7 +155,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_bucket_logging__(self, bucket): + def _get_bucket_logging(self, bucket): logger.info("S3 - Get buckets logging...") try: regional_client = self.regional_clients[bucket.region] @@ -185,7 +184,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_public_access_block__(self, bucket): + def _get_public_access_block(self, bucket): logger.info("S3 - Get buckets public access block...") try: regional_client = self.regional_clients[bucket.region] @@ -228,7 +227,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_bucket_acl__(self, bucket): + def _get_bucket_acl(self, bucket): logger.info("S3 - Get buckets acl...") try: regional_client = self.regional_clients[bucket.region] @@ -265,7 +264,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_bucket_policy__(self, bucket): + def _get_bucket_policy(self, bucket): logger.info("S3 - Get buckets policy...") try: regional_client = self.regional_clients[bucket.region] @@ -293,7 +292,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_bucket_ownership_controls__(self, bucket): + def _get_bucket_ownership_controls(self, bucket): logger.info("S3 - Get buckets ownership controls...") try: regional_client = self.regional_clients[bucket.region] @@ -321,7 +320,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_object_lock_configuration__(self, bucket): + def _get_object_lock_configuration(self, bucket): logger.info("S3 - Get buckets ownership controls...") try: regional_client = self.regional_clients[bucket.region] @@ -351,7 +350,7 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) - def __get_bucket_tagging__(self, bucket): + def _get_bucket_tagging(self, bucket): logger.info("S3 - Get buckets logging...") try: regional_client = self.regional_clients[bucket.region] @@ -381,14 +380,16 @@ class S3(AWSService): ) -################## S3Control class S3Control(AWSService): def __init__(self, provider): # Call AWSService's __init__ - super().__init__(__class__.__name__, provider, global_service=True) - self.account_public_access_block = self.__get_public_access_block__() + super().__init__(__class__.__name__, provider) + self.account_public_access_block = self._get_public_access_block() + self.access_points = {} + self.__threading_call__(self._list_access_points) + self.__threading_call__(self._get_access_point, self.access_points.values()) - def __get_public_access_block__(self): + def _get_public_access_block(self): logger.info("S3 - Get account public access block...") try: public_access_block = self.client.get_public_access_block( @@ -413,6 +414,58 @@ class S3Control(AWSService): f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _list_access_points(self, regional_client): + logger.info("S3 - Listing account access points...") + try: + list_access_points = regional_client.list_access_points( + AccountId=self.audited_account + )["AccessPointList"] + for ap in list_access_points: + self.access_points[ap["AccessPointArn"]] = AccessPoint( + account_id=self.audited_account, + name=ap["Name"], + bucket=ap["Bucket"], + region=regional_client.region, + ) + except ClientError as error: + if error.response["Error"]["Code"] == "NoSuchMultiRegionAccessPoint": + logger.warning( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def _get_access_point(self, ap): + logger.info("S3 - Get account access point...") + try: + access_point = self.regional_clients[ap.region].get_access_point( + AccountId=ap.account_id, Name=ap.name + ) + ap.public_access_block = PublicAccessBlock( + block_public_acls=access_point.get( + "PublicAccessBlockConfiguration", {} + ).get("BlockPublicAcls", False), + ignore_public_acls=access_point.get( + "PublicAccessBlockConfiguration", {} + ).get("IgnorePublicAcls", False), + block_public_policy=access_point.get( + "PublicAccessBlockConfiguration", {} + ).get("BlockPublicPolicy", False), + restrict_public_buckets=access_point.get( + "PublicAccessBlockConfiguration", {} + ).get("RestrictPublicBuckets", False), + ) + except Exception as error: + logger.error( + f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + class ACL_Grantee(BaseModel): display_name: Optional[str] @@ -429,6 +482,14 @@ class PublicAccessBlock(BaseModel): restrict_public_buckets: bool +class AccessPoint(BaseModel): + account_id: str + name: str + bucket: str + public_access_block: Optional[PublicAccessBlock] + region: str + + class Bucket(BaseModel): name: str arn: str diff --git a/tests/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block_test.py b/tests/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block_test.py new file mode 100644 index 0000000000..d4fb68ee67 --- /dev/null +++ b/tests/providers/aws/services/s3/s3_access_point_public_access_block/s3_access_point_public_access_block_test.py @@ -0,0 +1,318 @@ +from unittest import mock +from unittest.mock import patch + +from prowler.providers.aws.services.s3.s3_service import ( + AccessPoint, + PublicAccessBlock, + S3Control, +) +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_EU_WEST_1, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + + +class Test_s3_access_point_public_access_block: + def test_no_access_points(self): + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + new=S3Control(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block import ( + s3_access_point_public_access_block, + ) + + s3control_client = mock.MagicMock() + s3control_client.access_points = {} + + with patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + s3control_client, + ): + check = s3_access_point_public_access_block() + result = check.execute() + + assert len(result) == 0 + + def test_access_points_with_public_access_block(self): + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + # US-EAST-1 Access Point + ap_name_us = "test-access-point-us-east-1" + bucket_name_us = "test-bucket-us-east-1" + arn_us = f"arn:aws:s3:us-east-1:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_us}" + + # EU-WEST-1 Access Point + ap_name_eu = "test-access-point-eu-west-1" + bucket_name_eu = "test-bucket-eu-west-1" + arn_eu = f"arn:aws:s3:eu-west-1:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_eu}" + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + new=S3Control(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block import ( + s3_access_point_public_access_block, + ) + + s3control_client = mock.MagicMock() + s3control_client.access_points = { + arn_us: AccessPoint( + account_id=AWS_ACCOUNT_NUMBER, + name=ap_name_us, + bucket=bucket_name_us, + region="us-east-1", + public_access_block=PublicAccessBlock( + block_public_acls=True, + ignore_public_acls=True, + block_public_policy=True, + restrict_public_buckets=True, + ), + ), + arn_eu: AccessPoint( + account_id=AWS_ACCOUNT_NUMBER, + name=ap_name_eu, + bucket=bucket_name_eu, + region="eu-west-1", + public_access_block=PublicAccessBlock( + block_public_acls=True, + ignore_public_acls=True, + block_public_policy=True, + restrict_public_buckets=True, + ), + ), + } + + with patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + s3control_client, + ): + check = s3_access_point_public_access_block() + result = check.execute() + + # ALL REGIONS + assert len(result) == 2 + + # AWS_REGION_US_EAST_1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Access Point {ap_name_us} of bucket {bucket_name_us} does have Public Access Block enabled." + ) + assert result[0].resource_id == ap_name_us + assert ( + result[0].resource_arn + == f"arn:aws:s3:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_us}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 + + # AWS_REGION_EU_WEST_1 + assert result[1].status == "PASS" + assert ( + result[1].status_extended + == f"Access Point {ap_name_eu} of bucket {bucket_name_eu} does have Public Access Block enabled." + ) + assert result[1].resource_id == ap_name_eu + assert ( + result[1].resource_arn + == f"arn:aws:s3:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_eu}" + ) + assert result[1].region == AWS_REGION_EU_WEST_1 + + def test_access_points_without_public_access_block(self): + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + # US-EAST-1 Access Point + ap_name_us = "test-access-point-us-east-1" + bucket_name_us = "test-bucket-us-east-1" + arn_us = f"arn:aws:s3:us-east-1:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_us}" + + # EU-WEST-1 Access Point + ap_name_eu = "test-access-point-eu-west-1" + bucket_name_eu = "test-bucket-eu-west-1" + arn_eu = f"arn:aws:s3:eu-west-1:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_eu}" + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + new=S3Control(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block import ( + s3_access_point_public_access_block, + ) + + s3control_client = mock.MagicMock() + s3control_client.access_points = { + arn_us: AccessPoint( + account_id=AWS_ACCOUNT_NUMBER, + name=ap_name_us, + bucket=bucket_name_us, + region="us-east-1", + public_access_block=PublicAccessBlock( + block_public_acls=False, + ignore_public_acls=False, + block_public_policy=False, + restrict_public_buckets=False, + ), + ), + arn_eu: AccessPoint( + account_id=AWS_ACCOUNT_NUMBER, + name=ap_name_eu, + bucket=bucket_name_eu, + region="eu-west-1", + public_access_block=PublicAccessBlock( + block_public_acls=False, + ignore_public_acls=False, + block_public_policy=False, + restrict_public_buckets=False, + ), + ), + } + + with patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + s3control_client, + ): + check = s3_access_point_public_access_block() + result = check.execute() + + # ALL REGIONS + assert len(result) == 2 + + # AWS_REGION_US_EAST_1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"Access Point {ap_name_us} of bucket {bucket_name_us} does not have Public Access Block enabled." + ) + assert result[0].resource_id == ap_name_us + assert ( + result[0].resource_arn + == f"arn:aws:s3:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_us}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 + + # AWS_REGION_EU_WEST_1 + assert result[1].status == "FAIL" + assert ( + result[1].status_extended + == f"Access Point {ap_name_eu} of bucket {bucket_name_eu} does not have Public Access Block enabled." + ) + assert result[1].resource_id == ap_name_eu + assert ( + result[1].resource_arn + == f"arn:aws:s3:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_eu}" + ) + assert result[1].region == AWS_REGION_EU_WEST_1 + + def test_access_points_without_one_public_access_block(self): + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + # US-EAST-1 Access Point + ap_name_us = "test-access-point-us-east-1" + bucket_name_us = "test-bucket-us-east-1" + arn_us = f"arn:aws:s3:us-east-1:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_us}" + + # EU-WEST-1 Access Point + ap_name_eu = "test-access-point-eu-west-1" + bucket_name_eu = "test-bucket-eu-west-1" + arn_eu = f"arn:aws:s3:eu-west-1:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_eu}" + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + new=S3Control(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block import ( + s3_access_point_public_access_block, + ) + + s3control_client = mock.MagicMock() + s3control_client.access_points = { + arn_us: AccessPoint( + account_id=AWS_ACCOUNT_NUMBER, + name=ap_name_us, + bucket=bucket_name_us, + region="us-east-1", + public_access_block=PublicAccessBlock( + block_public_acls=True, + ignore_public_acls=True, + block_public_policy=True, + restrict_public_buckets=True, + ), + ), + arn_eu: AccessPoint( + account_id=AWS_ACCOUNT_NUMBER, + name=ap_name_eu, + bucket=bucket_name_eu, + region="eu-west-1", + public_access_block=PublicAccessBlock( + block_public_acls=True, + ignore_public_acls=False, + block_public_policy=True, + restrict_public_buckets=True, + ), + ), + } + + with patch( + "prowler.providers.aws.services.s3.s3_access_point_public_access_block.s3_access_point_public_access_block.s3control_client", + s3control_client, + ): + check = s3_access_point_public_access_block() + result = check.execute() + + # ALL REGIONS + assert len(result) == 2 + + # AWS_REGION_US_EAST_1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"Access Point {ap_name_us} of bucket {bucket_name_us} does have Public Access Block enabled." + ) + assert result[0].resource_id == ap_name_us + assert ( + result[0].resource_arn + == f"arn:aws:s3:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_us}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 + + # AWS_REGION_EU_WEST_1 + assert result[1].status == "FAIL" + assert ( + result[1].status_extended + == f"Access Point {ap_name_eu} of bucket {bucket_name_eu} does not have Public Access Block enabled." + ) + assert result[1].resource_id == ap_name_eu + assert ( + result[1].resource_arn + == f"arn:aws:s3:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/{ap_name_eu}" + ) + assert result[1].region == AWS_REGION_EU_WEST_1 diff --git a/tests/providers/aws/services/s3/s3_service_test.py b/tests/providers/aws/services/s3/s3_service_test.py index c5cc0ac450..696b719be2 100644 --- a/tests/providers/aws/services/s3/s3_service_test.py +++ b/tests/providers/aws/services/s3/s3_service_test.py @@ -1,5 +1,7 @@ import json +from unittest.mock import patch +import botocore from boto3 import client from moto import mock_aws @@ -10,9 +12,27 @@ from tests.providers.aws.utils import ( set_mocked_aws_provider, ) +# Original botocore _make_api_call function +orig = botocore.client.BaseClient._make_api_call + + +# Mocked botocore _make_api_call function +def mock_make_api_call(self, operation_name, kwarg): + if operation_name == "ListAccessPoints": + return { + "AccessPointList": [ + { + "Name": "test-access-point", + "Bucket": "test-bucket", + "AccessPointArn": f"arn:aws:s3:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/test-access-point", + } + ] + } + + return orig(self, operation_name, kwarg) + class Test_S3_Service: - # Test S3 Service @mock_aws def test_service(self): @@ -47,7 +67,7 @@ class Test_S3_Service: # Test S3 List Buckets @mock_aws - def test__list_buckets__(self): + def test_list_buckets(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -68,7 +88,7 @@ class Test_S3_Service: # Test S3 Get Bucket Versioning @mock_aws - def test__get_bucket_versioning__(self): + def test_get_bucket_versioning(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -92,7 +112,7 @@ class Test_S3_Service: # Test S3 Get Bucket ACL @mock_aws - def test__get_bucket_acl__(self): + def test_get_bucket_acl(self): s3_client = client("s3") bucket_name = "test-bucket" s3_client.create_bucket(Bucket=bucket_name) @@ -131,7 +151,7 @@ class Test_S3_Service: # Test S3 Get Bucket Logging @mock_aws - def test__get_bucket_logging__(self): + def test_get_bucket_logging(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -205,7 +225,7 @@ class Test_S3_Service: # Test S3 Get Bucket Policy @mock_aws - def test__get_bucket_policy__(self): + def test_get_bucket_policy(self): s3_client = client("s3") bucket_name = "test-bucket" s3_client.create_bucket(Bucket=bucket_name) @@ -226,7 +246,7 @@ class Test_S3_Service: # Test S3 Get Bucket Encryption @mock_aws - def test__get_bucket_encryption__(self): + def test_get_bucket_encryption(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -259,7 +279,7 @@ class Test_S3_Service: # Test S3 Get Bucket Ownership Controls @mock_aws - def test__get_bucket_ownership_controls__(self): + def test_get_bucket_ownership_controls(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -281,7 +301,7 @@ class Test_S3_Service: # Test S3 Get Public Access Block @mock_aws - def test__get_public_access_block__(self): + def test_get_public_access_block(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -314,7 +334,7 @@ class Test_S3_Service: # Test S3 Get Bucket Tagging @mock_aws - def test__get_bucket_tagging__(self): + def test_get_bucket_tagging(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -339,7 +359,7 @@ class Test_S3_Service: # Test S3 Control Account Get Public Access Block @mock_aws - def test__get_public_access_block__s3_control(self): + def test_get_public_access_blocks3_control(self): # Generate S3Control Client s3control_client = client("s3control", region_name=AWS_REGION_US_EAST_1) s3control_client.put_public_access_block( @@ -361,7 +381,7 @@ class Test_S3_Service: # Test S3 Get Bucket Object Lock @mock_aws - def test__get_object_lock_configuration__(self): + def test_get_object_lock_configuration(self): # Generate S3 Client s3_client = client("s3") # Create S3 Bucket @@ -382,3 +402,108 @@ class Test_S3_Service: == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name}" ) assert s3.buckets[0].object_lock + + # Test S3 List Access Points + @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + @mock_aws + def test_list_access_points(self): + arn = f"arn:aws:s3:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/test-access-point" + + # Generate S3 Client + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + + # Generate Bucket + s3_client.create_bucket( + Bucket="test-bucket", ObjectOwnership="BucketOwnerEnforced" + ) + sse_config = { + "Rules": [ + { + "ApplyServerSideEncryptionByDefault": { + "SSEAlgorithm": "AES256", + } + } + ] + } + s3_client.put_bucket_encryption( + Bucket="test-bucket", ServerSideEncryptionConfiguration=sse_config + ) + + # Generate S3Control Client + s3control_client = client("s3control", region_name=AWS_REGION_US_EAST_1) + + s3control_client.create_access_point( + AccountId=AWS_ACCOUNT_NUMBER, + Name="test-access-point", + Bucket="test-bucket", + PublicAccessBlockConfiguration={ + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": True, + }, + ) + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + s3control = S3Control(aws_provider) + + assert len(s3control.access_points) == 1 + assert s3control.access_points[arn].account_id == AWS_ACCOUNT_NUMBER + assert s3control.access_points[arn].name == "test-access-point" + assert s3control.access_points[arn].bucket == "test-bucket" + assert s3control.access_points[arn].region == AWS_REGION_US_EAST_1 + + # Test S3 Get Access Point + @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) + @mock_aws + def test_get_access_point(self): + arn = f"arn:aws:s3:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:accesspoint/test-access-point" + + # Generate S3 Client + s3_client = client("s3", region_name=AWS_REGION_US_EAST_1) + + # Generate Bucket + s3_client.create_bucket( + Bucket="test-bucket", ObjectOwnership="BucketOwnerEnforced" + ) + sse_config = { + "Rules": [ + { + "ApplyServerSideEncryptionByDefault": { + "SSEAlgorithm": "AES256", + } + } + ] + } + s3_client.put_bucket_encryption( + Bucket="test-bucket", ServerSideEncryptionConfiguration=sse_config + ) + + # Generate S3Control Client + s3control_client = client("s3control", region_name=AWS_REGION_US_EAST_1) + + s3control_client.create_access_point( + AccountId=AWS_ACCOUNT_NUMBER, + Name="test-access-point", + Bucket="test-bucket", + PublicAccessBlockConfiguration={ + "BlockPublicAcls": True, + "IgnorePublicAcls": True, + "BlockPublicPolicy": True, + "RestrictPublicBuckets": True, + }, + ) + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + s3control = S3Control(aws_provider) + + assert len(s3control.access_points) == 1 + assert s3control.access_points[arn].account_id == AWS_ACCOUNT_NUMBER + assert s3control.access_points[arn].name == "test-access-point" + assert s3control.access_points[arn].bucket == "test-bucket" + assert s3control.access_points[arn].region == AWS_REGION_US_EAST_1 + assert s3control.access_points[arn].public_access_block + assert s3control.access_points[arn].public_access_block.block_public_acls + assert s3control.access_points[arn].public_access_block.ignore_public_acls + assert s3control.access_points[arn].public_access_block.block_public_policy + assert s3control.access_points[arn].public_access_block.restrict_public_buckets