feat(s3): add new check s3_bucket_event_notifications_enabled (#5562)

Co-authored-by: Sergio <sergio@prowler.com>
This commit is contained in:
Hugo Pereira Brito
2024-10-29 21:38:38 +01:00
committed by GitHub
parent 82ec3e8779
commit 4bee4d482a
7 changed files with 297 additions and 10 deletions
@@ -21,7 +21,7 @@
"Terraform": "https://docs.prowler.com/checks/aws/general-policies/ensure-that-s3-bucket-has-cross-region-replication-enabled#terraform"
},
"Recommendation": {
"Text": "Ensure that S3 buckets have corss region replication.",
"Text": "Ensure that S3 buckets have cross region replication.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/replication-walkthrough1.html"
}
},
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_event_notifications_enabled",
"CheckTitle": "Check if S3 buckets have event notifications enabled.",
"CheckType": [
"Software and Configuration Checks/Industry and Regulatory Standards/NIST 800-53 Controls"
],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:s3:::bucket_name",
"Severity": "medium",
"ResourceType": "AwsS3Bucket",
"Description": "Ensure whether S3 buckets have event notifications enabled.",
"Risk": "Without event notifications, important actions on S3 buckets may go unnoticed, leading to missed opportunities for timely response to critical changes, such as object creation, deletion, or updates that could impact data security and availability.",
"RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-how-to-event-types-and-destinations.html#supported-notification-event-types",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/s3-controls.html#s3-11",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable event notifications for all S3 general-purpose buckets to monitor important events such as object creation, deletion, tagging, and lifecycle events, ensuring visibility and quick action on relevant changes.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/EventNotifications.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,39 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_event_notifications_enabled(Check):
"""Ensure S3 Buckets have event notifications enabled
This check will return a FAIL if the S3 Bucket does not have event notifications enabled.
"""
def execute(self) -> list[Check_Report_AWS]:
"""Execute the s3_bucket_event_notifications_enabled check
Iterates over all S3 Buckets and checks if they have event notifications enabled.
Returns:
list[Check_Report_AWS]: List of Check_Report_AWS objects
"""
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report.status = "FAIL"
report.status_extended = (
f"S3 Bucket {bucket.name} does not have event notifications enabled."
)
if bucket.notification_config:
report.status = "PASS"
report.status_extended = (
f"S3 Bucket {bucket.name} does have event notifications enabled."
)
findings.append(report)
return findings
@@ -1,8 +1,8 @@
import json
from typing import Optional
from typing import Dict, List, Optional
from botocore.client import ClientError
from pydantic import BaseModel
from pydantic import BaseModel, Field
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
@@ -32,6 +32,9 @@ class S3(AWSService):
self.__threading_call__(self._get_bucket_tagging, self.buckets.values())
self.__threading_call__(self._get_bucket_replication, self.buckets.values())
self.__threading_call__(self._get_bucket_lifecycle, self.buckets.values())
self.__threading_call__(
self._get_bucket_notification_configuration, self.buckets.values()
)
def _list_buckets(self, provider):
logger.info("S3 - Listing buckets...")
@@ -442,6 +445,43 @@ class S3(AWSService):
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_bucket_notification_configuration(self, bucket):
logger.info("S3 - Get bucket's notification configuration...")
try:
regional_client = self.regional_clients[bucket.region]
bucket_notification_config = (
regional_client.get_bucket_notification_configuration(
Bucket=bucket.name
)
)
if any(
key in bucket_notification_config
for key in (
"TopicConfigurations",
"QueueConfigurations",
"LambdaFunctionConfigurations",
"EventBridgeConfiguration",
)
):
bucket.notification_config = bucket_notification_config
else:
bucket.notification_config = {}
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchBucket":
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _head_bucket(self, bucket_name):
logger.info("S3 - Checking if bucket exists...")
try:
@@ -637,14 +677,15 @@ class Bucket(BaseModel):
versioning: bool = False
logging: bool = False
public_access_block: Optional[PublicAccessBlock]
acl_grantees: list[ACL_Grantee] = []
policy: dict = {}
acl_grantees: List[ACL_Grantee] = Field(default_factory=list)
policy: Dict = Field(default_factory=dict)
encryption: Optional[str]
region: str
logging_target_bucket: Optional[str]
ownership: Optional[str]
object_lock: bool = False
mfa_delete: bool = False
tags: Optional[list] = []
lifecycle: Optional[list[LifeCycleRule]] = []
replication_rules: Optional[list[ReplicationRule]] = []
tags: List[Dict[str, str]] = Field(default_factory=list)
lifecycle: List[LifeCycleRule] = Field(default_factory=list)
replication_rules: List[ReplicationRule] = Field(default_factory=list)
notification_config: Dict = Field(default_factory=dict)
@@ -0,0 +1,143 @@
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
class Test_s3_bucket_event_notifications_enabled:
# No Buckets
@mock_aws
def test_no_buckets(self):
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_event_notifications_enabled.s3_bucket_event_notifications_enabled.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_event_notifications_enabled.s3_bucket_event_notifications_enabled import (
s3_bucket_event_notifications_enabled,
)
check = s3_bucket_event_notifications_enabled()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_bucket_event_notifications_disabled(self):
s3_client = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name, ObjectOwnership="BucketOwnerEnforced"
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_event_notifications_enabled.s3_bucket_event_notifications_enabled.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_event_notifications_enabled.s3_bucket_event_notifications_enabled import (
s3_bucket_event_notifications_enabled,
)
check = s3_bucket_event_notifications_enabled()
result = check.execute()
assert len(result) == 1
# US-EAST-1 Source Bucket
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name} does not have event notifications enabled."
)
assert result[0].resource_id == bucket_name
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name}"
)
assert result[0].region == AWS_REGION_US_EAST_1
@mock_aws
def test_bucket_event_notifications_enabled(self):
s3_client = client("s3", region_name=AWS_REGION_US_EAST_1)
bucket_name = "test-bucket"
s3_client.create_bucket(
Bucket=bucket_name, ObjectOwnership="BucketOwnerEnforced"
)
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function",
"Events": ["s3:ObjectCreated:*"],
}
],
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-east-1:123456789012:my-queue",
"Events": ["s3:ObjectCreated:*"],
}
],
"TopicConfigurations": [
{
"TopicArn": "arn:aws:sns:us-east-1:123456789012:my-topic",
"Events": ["s3:ObjectCreated:*"],
}
],
},
)
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_event_notifications_enabled.s3_bucket_event_notifications_enabled.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_event_notifications_enabled.s3_bucket_event_notifications_enabled import (
s3_bucket_event_notifications_enabled,
)
check = s3_bucket_event_notifications_enabled()
result = check.execute()
assert len(result) == 1
# US-EAST-1 Source Bucket
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name} does have event notifications enabled."
)
assert result[0].resource_id == bucket_name
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:s3:::{bucket_name}"
)
assert result[0].region == AWS_REGION_US_EAST_1
@@ -479,6 +479,38 @@ class Test_S3_Service:
assert s3.buckets[bucket_arn].lifecycle[0].id == "test"
assert s3.buckets[bucket_arn].lifecycle[0].status == "Enabled"
# Test S3 Get Bucket Notification Configuration
@mock_aws
def test_get_bucket_notification_configuration(self):
# Generate S3 Client
s3_client = client("s3", region_name=AWS_REGION_US_EAST_1)
# Create S3 Bucket
bucket_name = "test-bucket"
bucket_arn = f"arn:aws:s3:::{bucket_name}"
s3_client.create_bucket(
Bucket=bucket_name,
ObjectOwnership="BucketOwnerEnforced",
ObjectLockEnabledForBucket=True,
)
s3_client.put_bucket_notification_configuration(
Bucket=bucket_name,
NotificationConfiguration={
"LambdaFunctionConfigurations": [
{
"LambdaFunctionArn": f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:123456789012:function:Test",
"Events": ["s3:ObjectCreated:*"],
}
]
},
)
# S3 client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
s3 = S3(aws_provider)
assert len(s3.buckets) == 1
assert s3.buckets[bucket_arn].name == bucket_name
assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1
assert s3.buckets[bucket_arn].notification_config
# Test S3 Head Bucket
@mock_aws
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@@ -505,7 +537,7 @@ class Test_S3_Service:
)
assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1
# Test S3 List Access Points
# Test S3Control List Access Points
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@mock_aws
def test_list_access_points(self):
@@ -555,7 +587,7 @@ class Test_S3_Service:
assert s3control.access_points[arn].bucket == "test-bucket"
assert s3control.access_points[arn].region == AWS_REGION_US_EAST_1
# Test S3 Get Access Point
# Test S3Control Get Access Point
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@mock_aws
def test_get_access_point(self):