feat(s3): add s3_bucket_cross_region_replication check (#4761)

Co-authored-by: Sergio <sergio@prowler.com>
This commit is contained in:
Hugo Pereira Brito
2024-08-19 18:42:42 +02:00
committed by GitHub
parent 38b73fb0c0
commit 5d42ae6e6f
6 changed files with 773 additions and 0 deletions
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_cross_region_replication",
"CheckTitle": "Check if S3 buckets use cross region replication.",
"CheckType": [
"Secure access management"
],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:s3:::bucket_name",
"Severity": "low",
"ResourceType": "AwsS3Bucket",
"Description": "Verifying whether S3 buckets have cross-region replication enabled, ensuring data redundancy and availability across multiple AWS regions",
"Risk": "Without cross-region replication in S3 buckets, data is at risk of being lost or inaccessible if an entire region goes down, leading to potential service disruptions and data unavailability.",
"RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/s3-controls.html#s3-7",
"Terraform": "https://docs.prowler.com/checks/aws/general-policies/ensure-that-s3-bucket-has-cross-region-replication-enabled#terraform"
},
"Recommendation": {
"Text": "Ensure that S3 buckets have corss region replication.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication-walkthrough1.html"
}
},
"Categories": [
"redundancy"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,37 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_cross_region_replication(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} does not have correct cross region replication configuration."
if bucket.replication_rules:
for rule in bucket.replication_rules:
if (
bucket.versioning
and rule.status == "Enabled"
and rule.destination
):
if rule.destination not in s3_client.buckets:
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has cross region replication rule {rule.id} in bucket {rule.destination.split(':')[-1]} which is out of Prowler's scope."
else:
destination_bucket = s3_client.buckets[rule.destination]
if destination_bucket.region != bucket.region:
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has cross region replication rule {rule.id} in bucket {destination_bucket.name} located in region {destination_bucket.region}."
break
else:
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has cross region replication rule {rule.id} in bucket {destination_bucket.name} located in the same region."
findings.append(report)
return findings
@@ -30,6 +30,7 @@ class S3(AWSService):
self._get_object_lock_configuration, self.buckets.values()
)
self.__threading_call__(self._get_bucket_tagging, self.buckets.values())
self.__threading_call__(self._get_bucket_replication, self.buckets.values())
def _list_buckets(self, provider):
logger.info("S3 - Listing buckets...")
@@ -378,6 +379,46 @@ class S3(AWSService):
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_bucket_replication(self, bucket):
logger.info("S3 - Get buckets replication...")
try:
regional_client = self.regional_clients[bucket.region]
replication_config = regional_client.get_bucket_replication(
Bucket=bucket.name
)["ReplicationConfiguration"]["Rules"]
if replication_config:
for rule in replication_config:
bucket.replication_rules.append(
ReplicationRule(
id=rule["ID"],
status=rule["Status"],
destination=rule["Destination"]["Bucket"],
)
)
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchBucket":
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
elif (
error.response["Error"]["Code"]
== "ReplicationConfigurationNotFoundError"
):
bucket.replication = None
else:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
if regional_client:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class S3Control(AWSService):
def __init__(self, provider):
@@ -489,6 +530,12 @@ class AccessPoint(BaseModel):
region: str
class ReplicationRule(BaseModel):
id: str
status: str
destination: str
class Bucket(BaseModel):
name: str
versioning: bool = False
@@ -503,3 +550,4 @@ class Bucket(BaseModel):
object_lock: bool = False
mfa_delete: bool = False
tags: Optional[list] = []
replication_rules: Optional[list[ReplicationRule]] = []
@@ -0,0 +1,611 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_s3_bucket_cross_region_replication:
# No Buckets
@mock_aws
def test_no_buckets(self):
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_bucket_no_versioning(self):
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(Bucket=bucket_name_us)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_us} does not have correct cross region replication configuration."
)
assert result[0].resource_id == bucket_name_us
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}"
)
assert result[0].region == AWS_REGION_US_EAST_1
@mock_aws
def test_bucket_no_replication(self):
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us,
VersioningConfiguration={"Status": "Enabled"},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_us} does not have correct cross region replication configuration."
)
assert result[0].resource_id == bucket_name_us
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}"
)
assert result[0].region == AWS_REGION_US_EAST_1
@mock_aws
def test_bucket_versioning_enabled_replication_disabled(self):
# EU-WEST-1 Destination Bucket
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
bucket_name_eu = "bucket_test_eu"
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
ObjectOwnership="BucketOwnerEnforced",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
s3_client_eu_west_1.put_bucket_versioning(
Bucket=bucket_name_eu,
VersioningConfiguration={"Status": "Enabled"},
)
# US-EAST-1 Source Bucket
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us,
VersioningConfiguration={"Status": "Enabled"},
)
s3_client_us_east_1.put_bucket_replication(
Bucket=bucket_name_us,
ReplicationConfiguration={
"Role": "arn:aws:iam",
"Rules": [
{
"ID": "rule1",
"Status": "Disabled",
"Prefix": "",
"Destination": {
"Bucket": "arn:aws:s3:::bucket_test_eu",
"Account": "",
},
}
],
},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 2
# EU-WEST-1 Destination Bucket
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_eu} does not have correct cross region replication configuration."
)
assert result[0].resource_id == bucket_name_eu
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_eu}"
)
assert result[0].region == AWS_REGION_EU_WEST_1
# US-EAST-1 Source Bucket
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"S3 Bucket {bucket_name_us} does not have correct cross region replication configuration."
)
assert result[1].resource_id == bucket_name_us
assert (
result[1].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}"
)
assert result[1].region == AWS_REGION_US_EAST_1
@mock_aws
def test_bucket_versioning_enabled_replication_enabled(self):
# EU-WEST-1 Destination Bucket
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
bucket_name_eu = "bucket_test_eu"
arn_bucket_eu = f"arn:aws:s3:::{bucket_name_eu}"
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
ObjectOwnership="BucketOwnerEnforced",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
s3_client_eu_west_1.put_bucket_versioning(
Bucket=bucket_name_eu,
VersioningConfiguration={"Status": "Enabled"},
)
# US-EAST-1 Source Bucket
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name_us = "bucket_test_us"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us,
VersioningConfiguration={"Status": "Enabled"},
)
repl_rule_id = "rule1"
s3_client_us_east_1.put_bucket_replication(
Bucket=bucket_name_us,
ReplicationConfiguration={
"Role": "arn:aws:iam",
"Rules": [
{
"ID": repl_rule_id,
"Status": "Enabled",
"Prefix": "",
"Destination": {
"Bucket": arn_bucket_eu,
"Account": "",
},
}
],
},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 2
# EU-WEST-1 Destination Bucket
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_eu} does not have correct cross region replication configuration."
)
assert result[0].resource_id == bucket_name_eu
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_eu}"
)
assert result[0].region == AWS_REGION_EU_WEST_1
# US-EAST-1 Source Bucket
assert result[1].status == "PASS"
assert (
result[1].status_extended
== f"S3 Bucket {bucket_name_us} has cross region replication rule {repl_rule_id} in bucket {bucket_name_eu} located in region {AWS_REGION_EU_WEST_1}."
)
assert result[1].resource_id == bucket_name_us
assert (
result[1].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}"
)
assert result[1].region == AWS_REGION_US_EAST_1
@mock_aws
def test_buckets_in_same_region(self):
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
# US-EAST-1 Destination Bucket
bucket_name_destination = "bucket_test_destination"
bucket_arn_destination = f"arn:aws:s3:::{bucket_name_destination}"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_destination, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_destination,
VersioningConfiguration={"Status": "Enabled"},
)
# US-EAST-1 Source Bucket
bucket_name_source = "bucket_test_source"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_source, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_source,
VersioningConfiguration={"Status": "Enabled"},
)
repl_rule_id = "rule1"
s3_client_us_east_1.put_bucket_replication(
Bucket=bucket_name_source,
ReplicationConfiguration={
"Role": "arn:aws:iam",
"Rules": [
{
"ID": repl_rule_id,
"Status": "Enabled",
"Prefix": "",
"Destination": {
"Bucket": bucket_arn_destination,
"Account": "",
},
}
],
},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 2
# EU-WEST-1 Destination Bucket
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_destination} does not have correct cross region replication configuration."
)
assert result[0].resource_id == bucket_name_destination
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_destination}"
)
assert result[0].region == AWS_REGION_US_EAST_1
# US-EAST-1 Source Bucket
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"S3 Bucket {bucket_name_source} has cross region replication rule {repl_rule_id} in bucket {bucket_name_destination} located in the same region."
)
assert result[1].resource_id == bucket_name_source
assert (
result[1].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_source}"
)
assert result[1].region == AWS_REGION_US_EAST_1
@mock_aws
def test_source_bucket_several_replcation_rules(self):
# EU-WEST-1 Destination Bucket
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
bucket_name_eu = "bucket_test_eu"
arn_bucket_eu = f"arn:aws:s3:::{bucket_name_eu}"
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
ObjectOwnership="BucketOwnerEnforced",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
s3_client_eu_west_1.put_bucket_versioning(
Bucket=bucket_name_eu,
VersioningConfiguration={"Status": "Enabled"},
)
# US-EAST-1 Destination Bucket
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name_us_destination = "bucket_test_us_destination"
arn_bucket_us_destination = f"arn:aws:s3:::{bucket_name_us_destination}"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us_destination, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us_destination,
VersioningConfiguration={"Status": "Enabled"},
)
# US-EAST-1 Source Bucket
bucket_name_us_source = "bucket_test_us_source"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us_source, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us_source,
VersioningConfiguration={"Status": "Enabled"},
)
repl_rule_id_1 = "rule1"
repl_rule_id_2 = "rule2"
s3_client_us_east_1.put_bucket_replication(
Bucket=bucket_name_us_source,
ReplicationConfiguration={
"Role": "arn:aws:iam",
"Rules": [
{
"ID": repl_rule_id_1,
"Status": "Enabled",
"Prefix": "",
"Destination": {
"Bucket": arn_bucket_eu,
"Account": "",
},
},
{
"ID": repl_rule_id_2,
"Status": "Enabled",
"Prefix": "",
"Destination": {
"Bucket": arn_bucket_us_destination,
"Account": "",
},
},
],
},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 3
# EU-WEST-1 Destination Bucket
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_eu} does not have correct cross region replication configuration."
)
assert result[0].resource_id == bucket_name_eu
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_eu}"
)
assert result[0].region == AWS_REGION_EU_WEST_1
# US-EAST-1 Destination Bucket
assert result[1].status == "FAIL"
assert (
result[1].status_extended
== f"S3 Bucket {bucket_name_us_destination} does not have correct cross region replication configuration."
)
assert result[1].resource_id == bucket_name_us_destination
assert (
result[1].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us_destination}"
)
assert result[1].region == AWS_REGION_US_EAST_1
# US-EAST-1 Source Bucket
assert result[2].status == "PASS"
assert (
result[2].status_extended
== f"S3 Bucket {bucket_name_us_source} has cross region replication rule {repl_rule_id_1} in bucket {bucket_name_eu} located in region {AWS_REGION_EU_WEST_1}."
)
assert result[2].resource_id == bucket_name_us_source
assert (
result[2].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us_source}"
)
assert result[2].region == AWS_REGION_US_EAST_1
@mock_aws
def test_destination_bucket_out_of_scope(self):
# EU-WEST-1 Destination Bucket
s3_client_eu_west_1 = client("s3", region_name=AWS_REGION_EU_WEST_1)
bucket_name_eu = "bucket_test_eu"
arn_bucket_eu = f"arn:aws:s3:::{bucket_name_eu}"
s3_client_eu_west_1.create_bucket(
Bucket=bucket_name_eu,
ObjectOwnership="BucketOwnerEnforced",
CreateBucketConfiguration={"LocationConstraint": AWS_REGION_EU_WEST_1},
)
s3_client_eu_west_1.put_bucket_versioning(
Bucket=bucket_name_eu,
VersioningConfiguration={"Status": "Enabled"},
)
# US-EAST-1 Source Bucket
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name_us = "bucket_test_us_source"
s3_client_us_east_1.create_bucket(
Bucket=bucket_name_us, ObjectOwnership="BucketOwnerEnforced"
)
s3_client_us_east_1.put_bucket_versioning(
Bucket=bucket_name_us,
VersioningConfiguration={"Status": "Enabled"},
)
repl_rule_id = "rule1"
s3_client_us_east_1.put_bucket_replication(
Bucket=bucket_name_us,
ReplicationConfiguration={
"Role": "arn:aws:iam",
"Rules": [
{
"ID": repl_rule_id,
"Status": "Enabled",
"Prefix": "",
"Destination": {
"Bucket": arn_bucket_eu,
"Account": "",
},
},
],
},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_cross_region_replication.s3_bucket_cross_region_replication import (
s3_bucket_cross_region_replication,
)
check = s3_bucket_cross_region_replication()
result = check.execute()
assert len(result) == 1
# US-EAST-1 Source Bucket
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name_us} has cross region replication rule {repl_rule_id} in bucket {arn_bucket_eu.split(':')[-1]} which is out of Prowler's scope."
)
assert result[0].resource_id == bucket_name_us
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name_us}"
)
assert result[0].region == AWS_REGION_US_EAST_1
@@ -386,6 +386,49 @@ class Test_S3_Service:
assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1
assert s3.buckets[bucket_arn].object_lock
# Test S3 Get Bucket Replication
@mock_aws
def test_get_bucket_replication(self):
# Generate S3 Client
s3_client = client("s3")
# Create S3 Bucket
bucket_name = "test-bucket"
bucket_arn = f"arn:aws:s3:::{bucket_name}"
s3_client.create_bucket(
Bucket=bucket_name,
ObjectOwnership="BucketOwnerEnforced",
)
s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={"Status": "Enabled"},
)
s3_client.put_bucket_replication(
Bucket=bucket_name,
ReplicationConfiguration={
"Role": "arn:aws:iam::123456789012:role/replication-role",
"Rules": [
{
"ID": "rule1",
"Status": "Enabled",
"Prefix": "",
"Destination": {
"Bucket": bucket_arn,
"StorageClass": "STANDARD",
},
}
],
},
)
# S3 client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
s3 = S3(aws_provider)
assert len(s3.buckets) == 1
assert s3.buckets[bucket_arn].name == bucket_name
assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1
assert s3.buckets[bucket_arn].replication_rules[0].status == "Enabled"
assert s3.buckets[bucket_arn].replication_rules[0].destination == bucket_arn
# Test S3 List Access Points
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@mock_aws