feat(redshift): add new check redshift_cluster_enhanced_vpc_routing (#5281)

Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com>
This commit is contained in:
Daniel Barranquero
2024-10-09 18:40:36 +02:00
committed by GitHub
parent 16191a7b15
commit d45750b042
6 changed files with 196 additions and 0 deletions

View File

@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "redshift_cluster_enhanced_vpc_routing",
"CheckTitle": "Check if Redshift clusters are using enhanced VPC routing.",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "redshift",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:redshift:region:account-id:cluster/cluster-name",
"Severity": "medium",
"ResourceType": "AwsRedshiftCluster",
"Description": "This control checks whether an Amazon Redshift cluster has EnhancedVpcRouting enabled. Enhanced VPC routing forces all COPY and UNLOAD traffic between the cluster and data repositories to go through your VPC, allowing you to use VPC security features such as security groups and network access control lists.",
"Risk": "Without enhanced VPC routing, network traffic between the Redshift cluster and data repositories might bypass VPC-level security controls, increasing the risk of unauthorized access or data exfiltration.",
"RelatedUrl": "https://docs.aws.amazon.com/redshift/latest/mgmt/enhanced-vpc-enabling-cluster.html",
"Remediation": {
"Code": {
"CLI": "aws redshift modify-cluster --cluster-identifier <cluster-id> --enhanced-vpc-routing",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/redshift-controls.html#redshift-7",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable enhanced VPC routing for your Redshift clusters to enforce network traffic through your VPC and apply additional security controls.",
"Url": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/Redshift/enable-enhanced-vpc-routing.html"
}
},
"Categories": [
"trustboundaries"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.redshift.redshift_client import redshift_client
class redshift_cluster_enhanced_vpc_routing(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report.status = "FAIL"
report.status_extended = f"Redshift Cluster {cluster.id} does not have Enhanced VPC Routing security feature enabled."
if cluster.enhanced_vpc_routing:
report.status = "PASS"
report.status_extended = f"Redshift Cluster {cluster.id} has Enhanced VPC Routing security feature enabled."
findings.append(report)
return findings

View File

@@ -41,6 +41,9 @@ class Redshift(AWSService):
region=regional_client.region,
tags=cluster.get("Tags"),
master_username=cluster.get("MasterUsername", ""),
enhanced_vpc_routing=cluster.get(
"EnhancedVpcRouting", False
),
database_name=cluster.get("DBName", ""),
parameter_group_name=cluster.get(
"ClusterParameterGroups", [{}]
@@ -119,5 +122,6 @@ class Cluster(BaseModel):
bucket: str = None
cluster_snapshots: bool = False
tags: Optional[list] = []
enhanced_vpc_routing: bool = False
parameter_group_name: str = None
require_ssl: bool = False

View File

@@ -0,0 +1,134 @@
from unittest import mock
from uuid import uuid4
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
CLUSTER_ID = str(uuid4())
CLUSTER_ARN = (
f"arn:aws:redshift:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:cluster:{CLUSTER_ID}"
)
class Test_redshift_cluster_enhanced_vpc_routing:
def test_no_clusters(self):
from prowler.providers.aws.services.redshift.redshift_service import Redshift
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.redshift.redshift_cluster_enhanced_vpc_routing.redshift_cluster_enhanced_vpc_routing.redshift_client",
new=Redshift(aws_provider),
):
from prowler.providers.aws.services.redshift.redshift_cluster_enhanced_vpc_routing.redshift_cluster_enhanced_vpc_routing import (
redshift_cluster_enhanced_vpc_routing,
)
check = redshift_cluster_enhanced_vpc_routing()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_cluster_not_enhanced_vpc_routing_enabled(self):
redshift_client = client("redshift", region_name=AWS_REGION_EU_WEST_1)
redshift_client.create_cluster(
DBName="test",
ClusterIdentifier=CLUSTER_ID,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="user",
MasterUserPassword="password",
PubliclyAccessible=True,
Tags=[
{"Key": "test", "Value": "test"},
],
Port=9439,
Encrypted=False,
EnhancedVpcRouting=False,
)
from prowler.providers.aws.services.redshift.redshift_service import Redshift
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.redshift.redshift_cluster_enhanced_vpc_routing.redshift_cluster_enhanced_vpc_routing.redshift_client",
new=Redshift(aws_provider),
):
from prowler.providers.aws.services.redshift.redshift_cluster_enhanced_vpc_routing.redshift_cluster_enhanced_vpc_routing import (
redshift_cluster_enhanced_vpc_routing,
)
check = redshift_cluster_enhanced_vpc_routing()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].status_extended == (
f"Redshift Cluster {CLUSTER_ID} does not have Enhanced VPC Routing security feature enabled."
)
assert result[0].resource_id == CLUSTER_ID
assert result[0].resource_arn == CLUSTER_ARN
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == [{"Key": "test", "Value": "test"}]
@mock_aws
def test_cluster_enhanced_vpc_routing_enabled(self):
redshift_client = client("redshift", region_name=AWS_REGION_EU_WEST_1)
redshift_client.create_cluster(
DBName="test",
ClusterIdentifier=CLUSTER_ID,
ClusterType="single-node",
NodeType="ds2.xlarge",
MasterUsername="user",
MasterUserPassword="password",
PubliclyAccessible=True,
Tags=[
{"Key": "test", "Value": "test"},
],
Port=9439,
Encrypted=True,
EnhancedVpcRouting=True,
)
from prowler.providers.aws.services.redshift.redshift_service import Redshift
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.redshift.redshift_cluster_enhanced_vpc_routing.redshift_cluster_enhanced_vpc_routing.redshift_client",
new=Redshift(aws_provider),
):
from prowler.providers.aws.services.redshift.redshift_cluster_enhanced_vpc_routing.redshift_cluster_enhanced_vpc_routing import (
redshift_cluster_enhanced_vpc_routing,
)
check = redshift_cluster_enhanced_vpc_routing()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == (
f"Redshift Cluster {CLUSTER_ID} has Enhanced VPC Routing security feature enabled."
)
assert result[0].resource_id == CLUSTER_ID
assert result[0].resource_arn == CLUSTER_ARN
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == [{"Key": "test", "Value": "test"}]

View File

@@ -110,6 +110,7 @@ class Test_Redshift_Service:
Tags=[
{"Key": "test", "Value": "test"},
],
EnhancedVpcRouting=True,
ClusterParameterGroupName="default.redshift-1.0",
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
@@ -133,6 +134,7 @@ class Test_Redshift_Service:
assert redshift.clusters[0].parameter_group_name == "default.redshift-1.0"
assert redshift.clusters[0].encrypted
assert redshift.clusters[0].master_username == "user"
assert redshift.clusters[0].enhanced_vpc_routing
assert redshift.clusters[0].database_name == "test"
@mock_aws