From 5d42ae6e6f99f5fffbfd04507af193802ebacdb8 Mon Sep 17 00:00:00 2001 From: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com> Date: Mon, 19 Aug 2024 18:42:42 +0200 Subject: [PATCH] feat(s3): add `s3_bucket_cross_region_replication` check (#4761) Co-authored-by: Sergio --- .../__init__.py | 0 ...ket_cross_region_replication.metadata.json | 34 + .../s3_bucket_cross_region_replication.py | 37 ++ .../providers/aws/services/s3/s3_service.py | 48 ++ ...s3_bucket_cross_region_replication_test.py | 611 ++++++++++++++++++ .../aws/services/s3/s3_service_test.py | 43 ++ 6 files changed, 773 insertions(+) create mode 100644 prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/__init__.py create mode 100644 prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.metadata.json create mode 100644 prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.py create mode 100644 tests/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication_test.py diff --git a/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/__init__.py b/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.metadata.json b/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.metadata.json new file mode 100644 index 0000000000..8d78df7fb2 --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "s3_bucket_cross_region_replication", + "CheckTitle": "Check if S3 buckets use cross region replication.", + "CheckType": [ + "Secure access management" + ], + "ServiceName": "s3", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:s3:::bucket_name", + "Severity": "low", + "ResourceType": "AwsS3Bucket", + "Description": "Verifying whether S3 buckets have cross-region replication enabled, ensuring data redundancy and availability across multiple AWS regions", + "Risk": "Without cross-region replication in S3 buckets, data is at risk of being lost or inaccessible if an entire region goes down, leading to potential service disruptions and data unavailability.", + "RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication.html", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/s3-controls.html#s3-7", + "Terraform": "https://docs.prowler.com/checks/aws/general-policies/ensure-that-s3-bucket-has-cross-region-replication-enabled#terraform" + }, + "Recommendation": { + "Text": "Ensure that S3 buckets have corss region replication.", + "Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication-walkthrough1.html" + } + }, + "Categories": [ + "redundancy" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.py b/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.py new file mode 100644 index 0000000000..71c2d0f28d --- /dev/null +++ b/prowler/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication.py @@ -0,0 +1,37 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.s3.s3_client import s3_client + + +class s3_bucket_cross_region_replication(Check): + def execute(self): + findings = [] + for arn, bucket in s3_client.buckets.items(): + report = Check_Report_AWS(self.metadata()) + report.region = bucket.region + report.resource_id = bucket.name + report.resource_arn = arn + report.resource_tags = bucket.tags + report.status = "FAIL" + report.status_extended = f"S3 Bucket {bucket.name} does not have correct cross region replication configuration." + if bucket.replication_rules: + for rule in bucket.replication_rules: + if ( + bucket.versioning + and rule.status == "Enabled" + and rule.destination + ): + if rule.destination not in s3_client.buckets: + report.status = "FAIL" + report.status_extended = f"S3 Bucket {bucket.name} has cross region replication rule {rule.id} in bucket {rule.destination.split(':')[-1]} which is out of Prowler's scope." + else: + destination_bucket = s3_client.buckets[rule.destination] + if destination_bucket.region != bucket.region: + report.status = "PASS" + report.status_extended = f"S3 Bucket {bucket.name} has cross region replication rule {rule.id} in bucket {destination_bucket.name} located in region {destination_bucket.region}." + break + else: + report.status = "FAIL" + report.status_extended = f"S3 Bucket {bucket.name} has cross region replication rule {rule.id} in bucket {destination_bucket.name} located in the same region." + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/s3/s3_service.py b/prowler/providers/aws/services/s3/s3_service.py index 950c93f96f..d9c5ae986f 100644 --- a/prowler/providers/aws/services/s3/s3_service.py +++ b/prowler/providers/aws/services/s3/s3_service.py @@ -30,6 +30,7 @@ class S3(AWSService): self._get_object_lock_configuration, self.buckets.values() ) self.__threading_call__(self._get_bucket_tagging, self.buckets.values()) + self.__threading_call__(self._get_bucket_replication, self.buckets.values()) def _list_buckets(self, provider): logger.info("S3 - Listing buckets...") @@ -378,6 +379,46 @@ class S3(AWSService): f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" ) + def _get_bucket_replication(self, bucket): + logger.info("S3 - Get buckets replication...") + try: + regional_client = self.regional_clients[bucket.region] + replication_config = regional_client.get_bucket_replication( + Bucket=bucket.name + )["ReplicationConfiguration"]["Rules"] + if replication_config: + for rule in replication_config: + bucket.replication_rules.append( + ReplicationRule( + id=rule["ID"], + status=rule["Status"], + destination=rule["Destination"]["Bucket"], + ) + ) + except ClientError as error: + if error.response["Error"]["Code"] == "NoSuchBucket": + logger.warning( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + elif ( + error.response["Error"]["Code"] + == "ReplicationConfigurationNotFoundError" + ): + bucket.replication = None + else: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + if regional_client: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + class S3Control(AWSService): def __init__(self, provider): @@ -489,6 +530,12 @@ class AccessPoint(BaseModel): region: str +class ReplicationRule(BaseModel): + id: str + status: str + destination: str + + class Bucket(BaseModel): name: str versioning: bool = False @@ -503,3 +550,4 @@ class Bucket(BaseModel): object_lock: bool = False mfa_delete: bool = False tags: Optional[list] = [] + replication_rules: Optional[list[ReplicationRule]] = [] diff --git a/tests/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication_test.py b/tests/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication_test.py new file mode 100644 index 0000000000..2cc4ffd5e5 --- /dev/null +++ b/tests/providers/aws/services/s3/s3_bucket_cross_region_replication/s3_bucket_cross_region_replication_test.py @@ -0,0 +1,611 @@ +from unittest import mock + +from boto3 import client +from moto import mock_aws + +from tests.providers.aws.utils import ( + AWS_REGION_EU_WEST_1, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + + +class Test_s3_bucket_cross_region_replication: + # No Buckets + @mock_aws + def test_no_buckets(self): + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 0 + + @mock_aws + def test_bucket_no_versioning(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket(Bucket=bucket_name_us) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_us} does not have correct cross region replication configuration." + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_bucket_no_replication(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_us, + VersioningConfiguration={"Status": "Enabled"}, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_us} does not have correct cross region replication configuration." + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_bucket_versioning_enabled_replication_disabled(self): + # EU-WEST-1 Destination Bucket + s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1) + bucket_name_eu = "bucket_test_eu" + s3_client_eu_west_1.create_bucket( + Bucket=bucket_name_eu, + ObjectOwnership="BucketOwnerEnforced", + CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1}, + ) + s3_client_eu_west_1.put_bucket_versioning( + Bucket=bucket_name_eu, + VersioningConfiguration={"Status": "Enabled"}, + ) + # US-EAST-1 Source Bucket + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_us, + VersioningConfiguration={"Status": "Enabled"}, + ) + s3_client_us_east_1.put_bucket_replication( + Bucket=bucket_name_us, + ReplicationConfiguration={ + "Role": "arn:aws:iam", + "Rules": [ + { + "ID": "rule1", + "Status": "Disabled", + "Prefix": "", + "Destination": { + "Bucket": "arn:aws:s3:::bucket_test_eu", + "Account": "", + }, + } + ], + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 2 + + # EU-WEST-1 Destination Bucket + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_eu} does not have correct cross region replication configuration." + ) + assert result[0].resource_id == bucket_name_eu + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_eu}" + ) + assert result[0].region == AWS_REGION_EU_WEST_1 + + # US-EAST-1 Source Bucket + assert result[1].status == "FAIL" + assert ( + result[1].status_extended + == f"S3 Bucket {bucket_name_us} does not have correct cross region replication configuration." + ) + assert result[1].resource_id == bucket_name_us + assert ( + result[1].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}" + ) + assert result[1].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_bucket_versioning_enabled_replication_enabled(self): + # EU-WEST-1 Destination Bucket + s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1) + bucket_name_eu = "bucket_test_eu" + arn_bucket_eu = f"arn:aws:s3:::{bucket_name_eu}" + s3_client_eu_west_1.create_bucket( + Bucket=bucket_name_eu, + ObjectOwnership="BucketOwnerEnforced", + CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1}, + ) + s3_client_eu_west_1.put_bucket_versioning( + Bucket=bucket_name_eu, + VersioningConfiguration={"Status": "Enabled"}, + ) + # US-EAST-1 Source Bucket + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name_us = "bucket_test_us" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_us, + VersioningConfiguration={"Status": "Enabled"}, + ) + repl_rule_id = "rule1" + s3_client_us_east_1.put_bucket_replication( + Bucket=bucket_name_us, + ReplicationConfiguration={ + "Role": "arn:aws:iam", + "Rules": [ + { + "ID": repl_rule_id, + "Status": "Enabled", + "Prefix": "", + "Destination": { + "Bucket": arn_bucket_eu, + "Account": "", + }, + } + ], + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 2 + + # EU-WEST-1 Destination Bucket + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_eu} does not have correct cross region replication configuration." + ) + assert result[0].resource_id == bucket_name_eu + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_eu}" + ) + assert result[0].region == AWS_REGION_EU_WEST_1 + + # US-EAST-1 Source Bucket + assert result[1].status == "PASS" + assert ( + result[1].status_extended + == f"S3 Bucket {bucket_name_us} has cross region replication rule {repl_rule_id} in bucket {bucket_name_eu} located in region {AWS_REGION_EU_WEST_1}." + ) + assert result[1].resource_id == bucket_name_us + assert ( + result[1].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}" + ) + assert result[1].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_buckets_in_same_region(self): + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + # US-EAST-1 Destination Bucket + bucket_name_destination = "bucket_test_destination" + bucket_arn_destination = f"arn:aws:s3:::{bucket_name_destination}" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_destination, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_destination, + VersioningConfiguration={"Status": "Enabled"}, + ) + # US-EAST-1 Source Bucket + bucket_name_source = "bucket_test_source" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_source, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_source, + VersioningConfiguration={"Status": "Enabled"}, + ) + repl_rule_id = "rule1" + s3_client_us_east_1.put_bucket_replication( + Bucket=bucket_name_source, + ReplicationConfiguration={ + "Role": "arn:aws:iam", + "Rules": [ + { + "ID": repl_rule_id, + "Status": "Enabled", + "Prefix": "", + "Destination": { + "Bucket": bucket_arn_destination, + "Account": "", + }, + } + ], + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 2 + + # EU-WEST-1 Destination Bucket + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_destination} does not have correct cross region replication configuration." + ) + assert result[0].resource_id == bucket_name_destination + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_destination}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 + + # US-EAST-1 Source Bucket + assert result[1].status == "FAIL" + assert ( + result[1].status_extended + == f"S3 Bucket {bucket_name_source} has cross region replication rule {repl_rule_id} in bucket {bucket_name_destination} located in the same region." + ) + assert result[1].resource_id == bucket_name_source + assert ( + result[1].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_source}" + ) + assert result[1].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_source_bucket_several_replcation_rules(self): + # EU-WEST-1 Destination Bucket + s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1) + bucket_name_eu = "bucket_test_eu" + arn_bucket_eu = f"arn:aws:s3:::{bucket_name_eu}" + s3_client_eu_west_1.create_bucket( + Bucket=bucket_name_eu, + ObjectOwnership="BucketOwnerEnforced", + CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1}, + ) + s3_client_eu_west_1.put_bucket_versioning( + Bucket=bucket_name_eu, + VersioningConfiguration={"Status": "Enabled"}, + ) + + # US-EAST-1 Destination Bucket + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name_us_destination = "bucket_test_us_destination" + arn_bucket_us_destination = f"arn:aws:s3:::{bucket_name_us_destination}" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us_destination, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_us_destination, + VersioningConfiguration={"Status": "Enabled"}, + ) + + # US-EAST-1 Source Bucket + bucket_name_us_source = "bucket_test_us_source" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us_source, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_us_source, + VersioningConfiguration={"Status": "Enabled"}, + ) + repl_rule_id_1 = "rule1" + repl_rule_id_2 = "rule2" + s3_client_us_east_1.put_bucket_replication( + Bucket=bucket_name_us_source, + ReplicationConfiguration={ + "Role": "arn:aws:iam", + "Rules": [ + { + "ID": repl_rule_id_1, + "Status": "Enabled", + "Prefix": "", + "Destination": { + "Bucket": arn_bucket_eu, + "Account": "", + }, + }, + { + "ID": repl_rule_id_2, + "Status": "Enabled", + "Prefix": "", + "Destination": { + "Bucket": arn_bucket_us_destination, + "Account": "", + }, + }, + ], + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 3 + + # EU-WEST-1 Destination Bucket + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_eu} does not have correct cross region replication configuration." + ) + assert result[0].resource_id == bucket_name_eu + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_eu}" + ) + assert result[0].region == AWS_REGION_EU_WEST_1 + + # US-EAST-1 Destination Bucket + assert result[1].status == "FAIL" + assert ( + result[1].status_extended + == f"S3 Bucket {bucket_name_us_destination} does not have correct cross region replication configuration." + ) + assert result[1].resource_id == bucket_name_us_destination + assert ( + result[1].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us_destination}" + ) + assert result[1].region == AWS_REGION_US_EAST_1 + + # US-EAST-1 Source Bucket + assert result[2].status == "PASS" + assert ( + result[2].status_extended + == f"S3 Bucket {bucket_name_us_source} has cross region replication rule {repl_rule_id_1} in bucket {bucket_name_eu} located in region {AWS_REGION_EU_WEST_1}." + ) + assert result[2].resource_id == bucket_name_us_source + assert ( + result[2].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us_source}" + ) + assert result[2].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_destination_bucket_out_of_scope(self): + # EU-WEST-1 Destination Bucket + s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1) + bucket_name_eu = "bucket_test_eu" + arn_bucket_eu = f"arn:aws:s3:::{bucket_name_eu}" + s3_client_eu_west_1.create_bucket( + Bucket=bucket_name_eu, + ObjectOwnership="BucketOwnerEnforced", + CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1}, + ) + s3_client_eu_west_1.put_bucket_versioning( + Bucket=bucket_name_eu, + VersioningConfiguration={"Status": "Enabled"}, + ) + + # US-EAST-1 Source Bucket + s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1) + bucket_name_us = "bucket_test_us_source" + s3_client_us_east_1.create_bucket( + Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced" + ) + s3_client_us_east_1.put_bucket_versioning( + Bucket=bucket_name_us, + VersioningConfiguration={"Status": "Enabled"}, + ) + repl_rule_id = "rule1" + s3_client_us_east_1.put_bucket_replication( + Bucket=bucket_name_us, + ReplicationConfiguration={ + "Role": "arn:aws:iam", + "Rules": [ + { + "ID": repl_rule_id, + "Status": "Enabled", + "Prefix": "", + "Destination": { + "Bucket": arn_bucket_eu, + "Account": "", + }, + }, + ], + }, + ) + + from prowler.providers.aws.services.s3.s3_service import S3 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ): + with mock.patch( + "prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client", + new=S3(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import ( + s3_bucket_cross_region_replication, + ) + + check = s3_bucket_cross_region_replication() + result = check.execute() + + assert len(result) == 1 + + # US-EAST-1 Source Bucket + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == f"S3 Bucket {bucket_name_us} has cross region replication rule {repl_rule_id} in bucket {arn_bucket_eu.split(':')[-1]} which is out of Prowler's scope." + ) + assert result[0].resource_id == bucket_name_us + assert ( + result[0].resource_arn + == f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}" + ) + assert result[0].region == AWS_REGION_US_EAST_1 diff --git a/tests/providers/aws/services/s3/s3_service_test.py b/tests/providers/aws/services/s3/s3_service_test.py index 4365b190c6..038f9a207c 100644 --- a/tests/providers/aws/services/s3/s3_service_test.py +++ b/tests/providers/aws/services/s3/s3_service_test.py @@ -386,6 +386,49 @@ class Test_S3_Service: assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1 assert s3.buckets[bucket_arn].object_lock + # Test S3 Get Bucket Replication + @mock_aws + def test_get_bucket_replication(self): + # Generate S3 Client + s3_client = client("s3") + # Create S3 Bucket + bucket_name = "test-bucket" + bucket_arn = f"arn:aws:s3:::{bucket_name}" + s3_client.create_bucket( + Bucket=bucket_name, + ObjectOwnership="BucketOwnerEnforced", + ) + s3_client.put_bucket_versioning( + Bucket=bucket_name, + VersioningConfiguration={"Status": "Enabled"}, + ) + s3_client.put_bucket_replication( + Bucket=bucket_name, + ReplicationConfiguration={ + "Role": "arn:aws:iam::123456789012:role/replication-role", + "Rules": [ + { + "ID": "rule1", + "Status": "Enabled", + "Prefix": "", + "Destination": { + "Bucket": bucket_arn, + "StorageClass": "STANDARD", + }, + } + ], + }, + ) + + # S3 client for this test class + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + s3 = S3(aws_provider) + assert len(s3.buckets) == 1 + assert s3.buckets[bucket_arn].name == bucket_name + assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1 + assert s3.buckets[bucket_arn].replication_rules[0].status == "Enabled" + assert s3.buckets[bucket_arn].replication_rules[0].destination == bucket_arn + # Test S3 List Access Points @patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) @mock_aws