feat(aws): add s3_bucket_shadow_resource_vulnerability check (#8398)

Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
This commit is contained in:
Paul Negedu
2025-08-01 05:26:03 -05:00
committed by GitHub
parent 90c6c6b98d
commit 2170fbb1ab
6 changed files with 348 additions and 2 deletions

View File

@@ -12,6 +12,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `vm_desired_sku_size` check for Azure provider [(#8191)](https://github.com/prowler-cloud/prowler/pull/8191)
- `vm_scaleset_not_empty` check for Azure provider [(#8192)](https://github.com/prowler-cloud/prowler/pull/8192)
- GitHub repository and organization scoping support with `--repository/respositories` and `--organization/organizations` flags [(#8329)](https://github.com/prowler-cloud/prowler/pull/8329)
- `s3_bucket_shadow_resource_vulnerability` check for AWS provider [(#8398)](https://github.com/prowler-cloud/prowler/pull/8398)
### Changed
- Handle some AWS errors as warnings instead of errors [(#8347)](https://github.com/prowler-cloud/prowler/pull/8347)

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_shadow_resource_vulnerability",
"CheckTitle": "Check for S3 buckets vulnerable to Shadow Resource Hijacking (Bucket Monopoly)",
"CheckType": [
""
],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:s3:::bucket_name",
"Severity": "high",
"ResourceType": "AwsS3Bucket",
"Description": "Checks for S3 buckets with predictable names that could be hijacked by an attacker before legitimate use, leading to data leakage or other security breaches.",
"Risk": "An attacker can pre-create S3 buckets with predictable names used by various AWS services. When a legitimate user's service attempts to use that bucket, it may inadvertently write sensitive data to the attacker-controlled bucket, leading to information disclosure, denial of service, or even remote code execution.",
"RelatedUrl": "https://www.aquasec.com/blog/bucket-monopoly-breaching-aws-accounts-through-shadow-resources/",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "Manually verify the ownership of any flagged S3 buckets. If a bucket is not owned by your account, investigate its origin and purpose. If it is not a legitimate resource, you should avoid using services that may interact with it.",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that all S3 buckets associated with your AWS account are owned by your account. Be cautious of services that create buckets with predictable names. Whenever possible, pre-create these buckets in all regions to prevent hijacking.",
"Url": "https://www.aquasec.com/blog/bucket-monopoly-breaching-aws-accounts-through-shadow-resources/"
}
},
"Categories": [
"trustboundaries"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check is based on the 'Bucket Monopoly' vulnerability disclosed by Aqua Security."
}

View File

@@ -0,0 +1,96 @@
import re
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_shadow_resource_vulnerability(Check):
def execute(self):
findings = []
# Predictable bucket name patterns from the research article
# These patterns are used by AWS services and can be claimed by attackers
predictable_patterns = {
"Glue": f"aws-glue-assets-{s3_client.provider.identity.account}-<region>",
"SageMaker": f"sagemaker-<region>-{s3_client.provider.identity.account}",
# "CloudFormation": "cf-templates-.*-<region>",
"EMR": f"aws-emr-studio-{s3_client.provider.identity.account}-<region>",
"CodeStar": f"aws-codestar-<region>-{s3_client.provider.identity.account}",
# Add other patterns here as they are discovered
}
# Track buckets we've already reported to avoid duplicates
reported_buckets = set()
# First, check buckets in the current account
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(self.metadata(), resource=bucket)
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = bucket.arn
report.resource_tags = bucket.tags
report.status = "PASS"
report.status_extended = (
f"S3 bucket {bucket.name} is not a known shadow resource."
)
# Check if this bucket matches any predictable pattern
for service, pattern_format in predictable_patterns.items():
pattern = pattern_format.replace("<region>", bucket.region)
if re.match(pattern, bucket.name):
if bucket.owner_id != s3_client.audited_canonical_id:
report.status = "FAIL"
report.status_extended = f"S3 bucket {bucket.name} for service {service} is a known shadow resource and it is owned by another account ({bucket.owner_id})."
else:
report.status = "PASS"
report.status_extended = f"S3 bucket {bucket.name} for service {service} is a known shadow resource but it is correctly owned by the audited account."
break
findings.append(report)
reported_buckets.add(bucket.name)
# Now check for shadow resources in other accounts by testing predictable patterns
# We'll test different regions to see if shadow resources exist
regions_to_test = (
s3_client.provider.identity.audited_regions
or s3_client.regional_clients.keys()
)
for region in regions_to_test:
for service, pattern_format in predictable_patterns.items():
# Generate bucket name for this region
bucket_name = pattern_format.replace("<region>", region)
# Skip if we've already reported this bucket
if bucket_name in reported_buckets:
continue
logger.info(
f"Checking if shadow resource bucket {bucket_name} exists in other accounts"
)
# Check if this bucket exists in another account
if s3_client._head_bucket(bucket_name):
# Create a virtual bucket object for reporting
virtual_bucket = type(
"obj",
(object,),
{
"name": bucket_name,
"region": region,
"arn": f"arn:{s3_client.audited_partition}:s3:::{bucket_name}",
"tags": [],
},
)()
report = Check_Report_AWS(self.metadata(), resource=virtual_bucket)
report.region = region
report.resource_id = bucket_name
report.resource_arn = virtual_bucket.arn
report.resource_tags = []
report.status = "FAIL"
report.status_extended = f"S3 bucket {bucket_name} for service {service} is a known shadow resource that exists and is owned by another account."
findings.append(report)
reported_buckets.add(bucket_name)
return findings

View File

@@ -16,6 +16,7 @@ class S3(AWSService):
self.account_arn_template = f"arn:{self.audited_partition}:s3:{self.region}:{self.audited_account}:account"
self.regions_with_buckets = []
self.buckets = {}
self.audited_canonical_id = ""
self._list_buckets(provider)
self.__threading_call__(self._get_bucket_versioning, self.buckets.values())
self.__threading_call__(self._get_bucket_logging, self.buckets.values())
@@ -40,6 +41,7 @@ class S3(AWSService):
logger.info("S3 - Listing buckets...")
try:
list_buckets = self.client.list_buckets()
self.audited_canonical_id = list_buckets["Owner"]["ID"]
for bucket in list_buckets["Buckets"]:
try:
bucket_region = self.client.get_bucket_location(
@@ -237,9 +239,10 @@ class S3(AWSService):
logger.info("S3 - Get buckets acl...")
try:
regional_client = self.regional_clients[bucket.region]
acl = regional_client.get_bucket_acl(Bucket=bucket.name)
bucket.owner_id = acl["Owner"]["ID"]
grantees = []
acl_grants = regional_client.get_bucket_acl(Bucket=bucket.name)["Grants"]
for grant in acl_grants:
for grant in acl["Grants"]:
grantee = ACL_Grantee(type=grant["Grantee"]["Type"])
if "DisplayName" in grant["Grantee"]:
grantee.display_name = grant["Grantee"]["DisplayName"]
@@ -683,6 +686,8 @@ class ReplicationRule(BaseModel):
class Bucket(BaseModel):
arn: str
name: str
owner_id: Optional[str]
owner: Optional[str]
versioning: bool = False
logging: bool = False
public_access_block: Optional[PublicAccessBlock]

View File

@@ -0,0 +1,210 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.s3.s3_service import Bucket
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_s3_bucket_shadow_resource_vulnerability:
@mock_aws
def test_no_buckets(self):
s3_client = mock.MagicMock
s3_client.buckets = {}
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
aws_provider.identity.identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
s3_client = mock.MagicMock
s3_client.provider = aws_provider
s3_client._head_bucket = mock.MagicMock(return_value=False)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability.s3_client",
new=s3_client,
),
):
from prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability import (
s3_bucket_shadow_resource_vulnerability,
)
check = s3_bucket_shadow_resource_vulnerability()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_bucket_owned_by_account(self):
s3_client = mock.MagicMock
bucket_name = f"sagemaker-{AWS_REGION_US_EAST_1}-{AWS_ACCOUNT_NUMBER}"
s3_client.audited_account_id = AWS_ACCOUNT_NUMBER
s3_client.audited_identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
s3_client.audited_canonical_id = AWS_ACCOUNT_NUMBER
s3_client.buckets = {
bucket_name: Bucket(
name=bucket_name,
arn=f"arn:aws:s3:::{bucket_name}",
region=AWS_REGION_US_EAST_1,
owner_id=AWS_ACCOUNT_NUMBER,
)
}
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
aws_provider.identity.identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
s3_client = mock.MagicMock
s3_client.provider = aws_provider
s3_client._head_bucket = mock.MagicMock(return_value=False)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability.s3_client",
new=s3_client,
),
):
from prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability import (
s3_bucket_shadow_resource_vulnerability,
)
check = s3_bucket_shadow_resource_vulnerability()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
"is correctly owned by the audited account" in result[0].status_extended
)
@mock_aws
def test_bucket_not_predictable(self):
s3_client = mock.MagicMock
bucket_name = "my-non-predictable-bucket"
s3_client.audited_account_id = AWS_ACCOUNT_NUMBER
s3_client.audited_identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
s3_client.audited_canonical_id = AWS_ACCOUNT_NUMBER
s3_client.buckets = {
bucket_name: Bucket(
name=bucket_name,
arn=f"arn:aws:s3:::{bucket_name}",
region=AWS_REGION_US_EAST_1,
owner_id=AWS_ACCOUNT_NUMBER,
)
}
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
aws_provider.identity.identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
s3_client = mock.MagicMock
s3_client.provider = aws_provider
s3_client._head_bucket = mock.MagicMock(return_value=False)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability.s3_client",
new=s3_client,
),
):
from prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability import (
s3_bucket_shadow_resource_vulnerability,
)
check = s3_bucket_shadow_resource_vulnerability()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "is not a known shadow resource" in result[0].status_extended
@mock_aws
def test_shadow_resource_in_other_account(self):
# Mock S3 client with no buckets in current account
s3_client = mock.MagicMock()
s3_client.buckets = {}
s3_client.audited_account_id = AWS_ACCOUNT_NUMBER
s3_client.audited_identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
s3_client.audited_canonical_id = AWS_ACCOUNT_NUMBER
s3_client.audited_partition = "aws"
# Mock regional clients - this is what the check uses to determine regions to test
s3_client.regional_clients = {
"us-east-1": mock.MagicMock(),
"us-west-2": mock.MagicMock(),
"eu-west-1": mock.MagicMock(),
}
# Define the shadow resources we want to simulate
shadow_resources = [
f"aws-glue-assets-{AWS_ACCOUNT_NUMBER}-us-west-2",
f"sagemaker-us-east-1-{AWS_ACCOUNT_NUMBER}",
f"aws-emr-studio-{AWS_ACCOUNT_NUMBER}-eu-west-1",
]
# Mock the _head_bucket method to simulate finding shadow resources
def mock_head_bucket(bucket_name):
return bucket_name in shadow_resources
s3_client._head_bucket = mock_head_bucket
# Mock provider with multiple regions to test
aws_provider = set_mocked_aws_provider(["us-east-1", "us-west-2", "eu-west-1"])
aws_provider.identity.identity_arn = f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:root"
aws_provider.identity.account = AWS_ACCOUNT_NUMBER
s3_client.provider = aws_provider
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability.s3_client",
new=s3_client,
),
):
from prowler.providers.aws.services.s3.s3_bucket_shadow_resource_vulnerability.s3_bucket_shadow_resource_vulnerability import (
s3_bucket_shadow_resource_vulnerability,
)
check = s3_bucket_shadow_resource_vulnerability()
result = check.execute()
# Should find shadow resources
assert len(result) >= 3
# Check if we found all expected shadow resources
found_services = set()
for finding in result:
if (
finding.status == "FAIL"
and "shadow resource" in finding.status_extended
):
if (
"aws-glue-assets" in finding.status_extended
and "Glue" in finding.status_extended
):
found_services.add("Glue")
assert "us-west-2" in finding.status_extended
elif (
"sagemaker" in finding.status_extended
and "SageMaker" in finding.status_extended
):
found_services.add("SageMaker")
assert "us-east-1" in finding.status_extended
elif (
"aws-emr-studio" in finding.status_extended
and "EMR" in finding.status_extended
):
found_services.add("EMR")
assert "eu-west-1" in finding.status_extended
# Verify common attributes
assert "owned by another account" in finding.status_extended
# Verify we found all expected services
expected_services = {"Glue", "SageMaker", "EMR"}
assert found_services == expected_services