mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
fix(s3): move object sampling to service layer and fix metadata
- Move list_objects_v2/get_object_acl into s3_service (opt-in, threaded) - Deterministic sampling; also flag AuthenticatedUsers group - Fix metadata validation (CheckType, Description, RelatedUrl, Url, Category) - Add changelog entry (#9517) and required IAM permissions
This commit is contained in:
@@ -39,6 +39,8 @@
|
||||
"rolesanywhere:ListTagsForResource",
|
||||
"rolesanywhere:ListTrustAnchors",
|
||||
"s3:GetAccountPublicAccessBlock",
|
||||
"s3:GetObjectAcl",
|
||||
"s3:ListBucket",
|
||||
"shield:DescribeProtection",
|
||||
"shield:GetSubscriptionState",
|
||||
"securityhub:BatchImportFindings",
|
||||
|
||||
@@ -25,6 +25,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- AWS Bedrock AgentCore privilege escalation paths in the IAM privilege escalation checks, covering Runtime, Harness, Code Interpreter and Custom Browser [(#11726)](https://github.com/prowler-cloud/prowler/pull/11726)
|
||||
- `--scan-secrets-validate` flag and `aws.secrets_validate` configuration option to optionally validate the secrets discovered by the secret-scanning checks against the provider APIs; secrets confirmed to be live are reported as critical [(#11694)](https://github.com/prowler-cloud/prowler/pull/11694)
|
||||
- `apigateway_restapi_no_secrets_in_stage_variables` check for AWS provider, scanning API Gateway REST API stage variables for hardcoded secrets such as passwords, API keys, and tokens [(#11188)](https://github.com/prowler-cloud/prowler/pull/11188)
|
||||
- `s3_bucket_object_public` check for AWS provider, spot-checking a configurable sample of object ACLs in each bucket and flagging objects granted to the AllUsers or AuthenticatedUsers groups; disabled by default and opted into via the `s3_bucket_object_public_enabled` configuration option [(#9517)](https://github.com/prowler-cloud/prowler/pull/9517)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
|
||||
+7
-6
@@ -3,16 +3,17 @@
|
||||
"CheckID": "s3_bucket_object_public",
|
||||
"CheckTitle": "Spot-check S3 bucket objects for public ACLs",
|
||||
"CheckType": [
|
||||
"Data Protection"
|
||||
"Software and Configuration Checks/AWS Security Best Practices",
|
||||
"Effects/Data Exposure"
|
||||
],
|
||||
"ServiceName": "s3",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn:partition:s3:::resource",
|
||||
"Severity": "low",
|
||||
"ResourceType": "AwsS3Bucket",
|
||||
"Description": "Spot-checks a configurable random sample of objects in each S3 bucket and flags any whose ACL grants read access to the AllUsers group. This is a sampling-based check, not a comprehensive audit: ListObjectsV2 returns keys in lexicographical order and only a small sample is inspected, so public objects outside the sample can be missed. The check is disabled by default and must be opted into via the s3_bucket_object_public_enabled configuration flag.",
|
||||
"Description": "Spot-checks a configurable sample of objects in each S3 bucket and flags any whose ACL grants access to the AllUsers or AuthenticatedUsers groups. This is a sampling-based check, not a comprehensive audit, so public objects outside the sample can be missed. It is disabled by default and must be enabled via the s3_bucket_object_public_enabled configuration flag.",
|
||||
"Risk": "Public objects can be accessed by anyone on the internet, potentially leaking sensitive data. A bucket can appear private at the bucket-policy level while still containing individual objects with public ACL grants.",
|
||||
"RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/access-control-block-public-access.html",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aws s3api put-object-acl --bucket <bucket_name> --key <object_key> --acl private",
|
||||
@@ -22,15 +23,15 @@
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "For complete coverage, enable the s3_bucket_acl_prohibited check, which enforces the BucketOwnerEnforced Object Ownership setting (AWS's recommended approach since April 2023) and prevents public object ACLs entirely. Use this spot-check as a supplementary tool for manual assessments.",
|
||||
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/about-object-ownership.html"
|
||||
"Url": "https://hub.prowler.com/check/s3_bucket_object_public"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"public-access"
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [
|
||||
"s3_bucket_acl_prohibited"
|
||||
],
|
||||
"Notes": "Disabled by default. Configure s3_bucket_object_public_enabled, s3_bucket_object_public_max_objects, and s3_bucket_object_public_sample_size in the Prowler configuration. Because ListObjectsV2 returns keys lexicographically and only a sample is inspected, a PASS does not guarantee the bucket is free of public objects; use s3_bucket_acl_prohibited for full assurance."
|
||||
"Notes": "Disabled by default. Configure s3_bucket_object_public_enabled, s3_bucket_object_public_max_objects, and s3_bucket_object_public_sample_size in the Prowler configuration. Because only a sample of objects is inspected, a PASS does not guarantee the bucket is free of public objects; use s3_bucket_acl_prohibited for full assurance."
|
||||
}
|
||||
|
||||
+38
-54
@@ -1,11 +1,12 @@
|
||||
import random
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
from prowler.lib.check.models import Check, Check_Report_AWS
|
||||
from prowler.providers.aws.services.s3.s3_client import s3_client
|
||||
|
||||
ALL_USERS_URI = "http://acs.amazonaws.com/groups/global/AllUsers"
|
||||
# ACL grantee groups that make an object effectively public. AllUsers is anyone on
|
||||
# the internet; AuthenticatedUsers is any authenticated AWS principal (any account).
|
||||
PUBLIC_ACL_URIS = {
|
||||
"http://acs.amazonaws.com/groups/global/AllUsers",
|
||||
"http://acs.amazonaws.com/groups/global/AuthenticatedUsers",
|
||||
}
|
||||
|
||||
|
||||
class s3_bucket_object_public(Check):
|
||||
@@ -15,51 +16,47 @@ class s3_bucket_object_public(Check):
|
||||
if not s3_client.audit_config.get("s3_bucket_object_public_enabled", False):
|
||||
return findings
|
||||
|
||||
max_objects = s3_client.audit_config.get(
|
||||
"s3_bucket_object_public_max_objects", 100
|
||||
)
|
||||
sample_size = s3_client.audit_config.get(
|
||||
"s3_bucket_object_public_sample_size", 3
|
||||
)
|
||||
|
||||
for bucket in s3_client.buckets.values():
|
||||
report = Check_Report_AWS(metadata=self.metadata(), resource=bucket)
|
||||
|
||||
try:
|
||||
regional_client = s3_client.regional_clients[bucket.region]
|
||||
objects = regional_client.list_objects_v2(
|
||||
Bucket=bucket.name, MaxKeys=max_objects
|
||||
)
|
||||
|
||||
contents = objects.get("Contents", [])
|
||||
if not contents:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"S3 Bucket {bucket.name} is empty."
|
||||
findings.append(report)
|
||||
sampling = bucket.object_sampling
|
||||
# Sampling is populated by the service layer only when the check is
|
||||
# enabled; skip any bucket that was not sampled.
|
||||
if sampling is None or not sampling.performed:
|
||||
continue
|
||||
|
||||
all_keys = [obj["Key"] for obj in contents]
|
||||
sample_keys = random.sample(all_keys, min(len(all_keys), sample_size))
|
||||
sampled = len(sample_keys)
|
||||
report = Check_Report_AWS(metadata=self.metadata(), resource=bucket)
|
||||
|
||||
public_objects_found = []
|
||||
for key in sample_keys:
|
||||
acl = regional_client.get_object_acl(Bucket=bucket.name, Key=key)
|
||||
for grant in acl.get("Grants", []):
|
||||
grantee = grant.get("Grantee", {})
|
||||
if (
|
||||
grantee.get("Type") == "Group"
|
||||
and grantee.get("URI") == ALL_USERS_URI
|
||||
):
|
||||
public_objects_found.append(key)
|
||||
break
|
||||
if sampling.error_code is not None:
|
||||
report.status = "MANUAL"
|
||||
if sampling.error_code == "AccessDenied":
|
||||
report.status_extended = (
|
||||
f"Access Denied when spot-checking objects in bucket "
|
||||
f"{bucket.name}."
|
||||
)
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"Could not spot-check objects in bucket {bucket.name}: "
|
||||
f"{sampling.error_message}."
|
||||
)
|
||||
elif sampling.is_empty:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"S3 Bucket {bucket.name} is empty."
|
||||
else:
|
||||
public_objects = [
|
||||
obj.key
|
||||
for obj in sampling.objects
|
||||
if any(
|
||||
grantee.type == "Group" and grantee.URI in PUBLIC_ACL_URIS
|
||||
for grantee in obj.grantees
|
||||
)
|
||||
]
|
||||
sampled = len(sampling.objects)
|
||||
|
||||
if public_objects_found:
|
||||
if public_objects:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"S3 Bucket {bucket.name} has public objects detected in "
|
||||
f"spot-check sample of {sampled} objects: "
|
||||
f"{', '.join(public_objects_found)}."
|
||||
f"{', '.join(public_objects)}."
|
||||
)
|
||||
else:
|
||||
report.status = "PASS"
|
||||
@@ -70,19 +67,6 @@ class s3_bucket_object_public(Check):
|
||||
f"settings."
|
||||
)
|
||||
|
||||
except ClientError as error:
|
||||
report.status = "MANUAL"
|
||||
if error.response["Error"]["Code"] == "AccessDenied":
|
||||
report.status_extended = (
|
||||
f"Access Denied when spot-checking objects in bucket "
|
||||
f"{bucket.name}."
|
||||
)
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"Could not spot-check objects in bucket {bucket.name}: "
|
||||
f"{error}."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
|
||||
@@ -36,6 +36,10 @@ class S3(AWSService):
|
||||
self.__threading_call__(
|
||||
self._get_bucket_notification_configuration, self.buckets.values()
|
||||
)
|
||||
# Object-level ACL sampling is expensive and opt-in, so only run it when
|
||||
# the s3_bucket_object_public check is explicitly enabled in the config.
|
||||
if self.audit_config.get("s3_bucket_object_public_enabled", False):
|
||||
self.__threading_call__(self._get_public_objects, self.buckets.values())
|
||||
|
||||
def _list_buckets(self, provider):
|
||||
logger.info("S3 - Listing buckets...")
|
||||
@@ -487,6 +491,63 @@ class S3(AWSService):
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _get_public_objects(self, bucket):
|
||||
logger.info("S3 - Spot-checking bucket objects for public ACLs...")
|
||||
max_objects = self.audit_config.get("s3_bucket_object_public_max_objects", 100)
|
||||
sample_size = self.audit_config.get("s3_bucket_object_public_sample_size", 3)
|
||||
sampling = BucketObjectSampling(performed=True)
|
||||
regional_client = None
|
||||
try:
|
||||
regional_client = self.regional_clients[bucket.region]
|
||||
contents = regional_client.list_objects_v2(
|
||||
Bucket=bucket.name, MaxKeys=max_objects
|
||||
).get("Contents", [])
|
||||
|
||||
if not contents:
|
||||
sampling.is_empty = True
|
||||
bucket.object_sampling = sampling
|
||||
return
|
||||
|
||||
all_keys = [obj["Key"] for obj in contents]
|
||||
# Deterministic, evenly-spaced sampling so findings are reproducible
|
||||
# across scans instead of flipping between PASS/FAIL with a random sample.
|
||||
if len(all_keys) <= sample_size:
|
||||
sample_keys = all_keys
|
||||
else:
|
||||
step = len(all_keys) // sample_size
|
||||
sample_keys = [all_keys[i * step] for i in range(sample_size)]
|
||||
|
||||
for key in sample_keys:
|
||||
acl = regional_client.get_object_acl(Bucket=bucket.name, Key=key)
|
||||
grantees = []
|
||||
for grant in acl.get("Grants", []):
|
||||
grant_grantee = grant.get("Grantee", {})
|
||||
grantee = ACL_Grantee(type=grant_grantee.get("Type", ""))
|
||||
grantee.display_name = grant_grantee.get("DisplayName")
|
||||
grantee.ID = grant_grantee.get("ID")
|
||||
grantee.URI = grant_grantee.get("URI")
|
||||
grantee.permission = grant.get("Permission")
|
||||
grantees.append(grantee)
|
||||
sampling.objects.append(ObjectACL(key=key, grantees=grantees))
|
||||
|
||||
bucket.object_sampling = sampling
|
||||
except ClientError as error:
|
||||
sampling.error_code = error.response["Error"]["Code"]
|
||||
sampling.error_message = str(error)
|
||||
bucket.object_sampling = sampling
|
||||
region = regional_client.region if regional_client else bucket.region
|
||||
logger.warning(
|
||||
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
except Exception as error:
|
||||
sampling.error_code = error.__class__.__name__
|
||||
sampling.error_message = str(error)
|
||||
bucket.object_sampling = sampling
|
||||
region = regional_client.region if regional_client else bucket.region
|
||||
logger.error(
|
||||
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _head_bucket(self, bucket_name):
|
||||
logger.info("S3 - Checking if bucket exists...")
|
||||
try:
|
||||
@@ -654,6 +715,19 @@ class PublicAccessBlock(BaseModel):
|
||||
restrict_public_buckets: bool
|
||||
|
||||
|
||||
class ObjectACL(BaseModel):
|
||||
key: str
|
||||
grantees: List[ACL_Grantee] = Field(default_factory=list)
|
||||
|
||||
|
||||
class BucketObjectSampling(BaseModel):
|
||||
performed: bool = False
|
||||
is_empty: bool = False
|
||||
objects: List[ObjectACL] = Field(default_factory=list)
|
||||
error_code: Optional[str] = None
|
||||
error_message: Optional[str] = None
|
||||
|
||||
|
||||
class AccessPoint(BaseModel):
|
||||
arn: str
|
||||
account_id: str
|
||||
@@ -703,3 +777,4 @@ class Bucket(BaseModel):
|
||||
lifecycle: List[LifeCycleRule] = Field(default_factory=list)
|
||||
replication_rules: List[ReplicationRule] = Field(default_factory=list)
|
||||
notification_config: Dict = Field(default_factory=dict)
|
||||
object_sampling: Optional[BucketObjectSampling] = None
|
||||
|
||||
+85
-31
@@ -10,6 +10,12 @@ CHECK_MODULE = (
|
||||
"prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public"
|
||||
)
|
||||
|
||||
ENABLED_CONFIG = {
|
||||
"s3_bucket_object_public_enabled": True,
|
||||
"s3_bucket_object_public_max_objects": 100,
|
||||
"s3_bucket_object_public_sample_size": 3,
|
||||
}
|
||||
|
||||
|
||||
class Test_s3_bucket_object_public:
|
||||
@mock_aws
|
||||
@@ -19,6 +25,7 @@ class Test_s3_bucket_object_public:
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
# No audit_config -> check disabled by default
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
|
||||
with mock.patch(
|
||||
@@ -37,10 +44,11 @@ class Test_s3_bucket_object_public:
|
||||
assert result == []
|
||||
|
||||
@mock_aws
|
||||
def test_bucket_empty_passes(self):
|
||||
def test_service_does_not_sample_when_disabled(self):
|
||||
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
|
||||
bucket_name = "bucket-empty"
|
||||
bucket_name = "bucket-not-sampled"
|
||||
s3_client_us_east_1.create_bucket(Bucket=bucket_name)
|
||||
s3_client_us_east_1.put_object(Bucket=bucket_name, Key="a.txt", Body=b"x")
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
@@ -51,11 +59,26 @@ class Test_s3_bucket_object_public:
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
s3_service.audit_config = {
|
||||
"s3_bucket_object_public_enabled": True,
|
||||
"s3_bucket_object_public_max_objects": 100,
|
||||
"s3_bucket_object_public_sample_size": 3,
|
||||
}
|
||||
bucket = next(iter(s3_service.buckets.values()))
|
||||
assert bucket.object_sampling is None
|
||||
|
||||
@mock_aws
|
||||
def test_bucket_empty_passes(self):
|
||||
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
|
||||
bucket_name = "bucket-empty"
|
||||
s3_client_us_east_1.create_bucket(Bucket=bucket_name)
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
[AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG
|
||||
)
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service):
|
||||
from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (
|
||||
s3_bucket_object_public,
|
||||
@@ -86,18 +109,15 @@ class Test_s3_bucket_object_public:
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
[AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG
|
||||
)
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
s3_service.audit_config = {
|
||||
"s3_bucket_object_public_enabled": True,
|
||||
"s3_bucket_object_public_max_objects": 100,
|
||||
"s3_bucket_object_public_sample_size": 3,
|
||||
}
|
||||
with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service):
|
||||
from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (
|
||||
s3_bucket_object_public,
|
||||
@@ -130,18 +150,15 @@ class Test_s3_bucket_object_public:
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
[AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG
|
||||
)
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
s3_service.audit_config = {
|
||||
"s3_bucket_object_public_enabled": True,
|
||||
"s3_bucket_object_public_max_objects": 100,
|
||||
"s3_bucket_object_public_sample_size": 3,
|
||||
}
|
||||
with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service):
|
||||
from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (
|
||||
s3_bucket_object_public,
|
||||
@@ -158,6 +175,40 @@ class Test_s3_bucket_object_public:
|
||||
"spot-check sample of"
|
||||
) in result[0].status_extended
|
||||
|
||||
@mock_aws
|
||||
def test_bucket_with_authenticated_users_object_fails(self):
|
||||
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
|
||||
bucket_name = "bucket-authenticated-object"
|
||||
public_key = "authenticated.txt"
|
||||
s3_client_us_east_1.create_bucket(Bucket=bucket_name)
|
||||
s3_client_us_east_1.put_object(Bucket=bucket_name, Key=public_key, Body=b"x")
|
||||
s3_client_us_east_1.put_object_acl(
|
||||
Bucket=bucket_name, Key=public_key, ACL="authenticated-read"
|
||||
)
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
[AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG
|
||||
)
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service):
|
||||
from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (
|
||||
s3_bucket_object_public,
|
||||
)
|
||||
|
||||
check = s3_bucket_object_public()
|
||||
result = check.execute()
|
||||
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert public_key in result[0].status_extended
|
||||
|
||||
@mock_aws
|
||||
def test_access_denied_on_list_objects_reports_manual(self):
|
||||
s3_client_us_east_1 = client("s3", region_name=AWS_REGION_US_EAST_1)
|
||||
@@ -166,25 +217,27 @@ class Test_s3_bucket_object_public:
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
[AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG
|
||||
)
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
s3_service.audit_config = {
|
||||
"s3_bucket_object_public_enabled": True,
|
||||
"s3_bucket_object_public_max_objects": 100,
|
||||
"s3_bucket_object_public_sample_size": 3,
|
||||
}
|
||||
|
||||
# Simulate AccessDenied when sampling and re-run sampling for the bucket
|
||||
regional_client = mock.MagicMock()
|
||||
regional_client.region = AWS_REGION_US_EAST_1
|
||||
regional_client.list_objects_v2.side_effect = ClientError(
|
||||
{"Error": {"Code": "AccessDenied", "Message": "denied"}},
|
||||
"ListObjectsV2",
|
||||
)
|
||||
s3_service.regional_clients[AWS_REGION_US_EAST_1] = regional_client
|
||||
bucket = next(iter(s3_service.buckets.values()))
|
||||
bucket.object_sampling = None
|
||||
s3_service._get_public_objects(bucket)
|
||||
|
||||
with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service):
|
||||
from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (
|
||||
@@ -209,25 +262,26 @@ class Test_s3_bucket_object_public:
|
||||
|
||||
from prowler.providers.aws.services.s3.s3_service import S3
|
||||
|
||||
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
|
||||
aws_provider = set_mocked_aws_provider(
|
||||
[AWS_REGION_US_EAST_1], audit_config=ENABLED_CONFIG
|
||||
)
|
||||
|
||||
with mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=aws_provider,
|
||||
):
|
||||
s3_service = S3(aws_provider)
|
||||
s3_service.audit_config = {
|
||||
"s3_bucket_object_public_enabled": True,
|
||||
"s3_bucket_object_public_max_objects": 100,
|
||||
"s3_bucket_object_public_sample_size": 3,
|
||||
}
|
||||
|
||||
regional_client = mock.MagicMock()
|
||||
regional_client.region = AWS_REGION_US_EAST_1
|
||||
regional_client.list_objects_v2.side_effect = ClientError(
|
||||
{"Error": {"Code": "InternalError", "Message": "boom"}},
|
||||
"ListObjectsV2",
|
||||
)
|
||||
s3_service.regional_clients[AWS_REGION_US_EAST_1] = regional_client
|
||||
bucket = next(iter(s3_service.buckets.values()))
|
||||
bucket.object_sampling = None
|
||||
s3_service._get_public_objects(bucket)
|
||||
|
||||
with mock.patch(f"{CHECK_MODULE}.s3_client", new=s3_service):
|
||||
from prowler.providers.aws.services.s3.s3_bucket_object_public.s3_bucket_object_public import (
|
||||
|
||||
Reference in New Issue
Block a user