feat(aws): include resource metadata in services from r* to s* (#6536)

Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
This commit is contained in:
Hugo Pereira Brito
2025-01-16 00:10:53 +01:00
committed by GitHub
parent 95189b574a
commit b1f02098ff
105 changed files with 419 additions and 595 deletions
@@ -6,11 +6,11 @@ class rds_cluster_backtrack_enabled(Check):
def execute(self):
findings = []
for db_cluster in rds_client.db_clusters:
report = Check_Report_AWS(self.metadata())
report.region = rds_client.db_clusters[db_cluster].region
report.resource_id = rds_client.db_clusters[db_cluster].id
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=rds_client.db_clusters[db_cluster],
)
report.resource_arn = db_cluster
report.resource_tags = rds_client.db_clusters[db_cluster].tags
report.status = "FAIL"
report.status_extended = f"RDS Cluster {rds_client.db_clusters[db_cluster].id} does not have backtrack enabled."
# Only RDS Aurora MySQL clusters support backtrack.
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_cluster_copy_tags_to_snapshots(Check):
def execute(self):
findings = []
for db_cluster_arn, db_cluster in rds_client.db_clusters.items():
report = Check_Report_AWS(self.metadata())
report.region = db_cluster.region
report.resource_id = db_cluster.id
report.resource_arn = db_cluster_arn
report.resource_tags = db_cluster.tags
for db_cluster in rds_client.db_clusters.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_cluster
)
if db_cluster.copy_tags_to_snapshot:
report.status = "PASS"
report.status_extended = (
@@ -7,37 +7,31 @@ class rds_cluster_critical_event_subscription(Check):
findings = []
if rds_client.provider.scan_unused_services or rds_client.db_clusters:
for db_event in rds_client.db_event_subscriptions:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
report.status = "FAIL"
report.status_extended = "RDS cluster event categories of maintenance and failure are not subscribed."
report.resource_id = rds_client.audited_account
report.resource_arn = rds_client._get_rds_arn_template(db_event.region)
report.region = db_event.region
report.resource_tags = db_event.tags
if db_event.source_type == "db-cluster" and db_event.enabled:
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
if db_event.event_list == [] or set(db_event.event_list) == {
"maintenance",
"failure",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "PASS"
report.status_extended = "RDS cluster events are subscribed."
elif db_event.event_list == ["maintenance"]:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = (
"RDS cluster event category of failure is not subscribed."
)
elif db_event.event_list == ["failure"]:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS cluster event category of maintenance is not subscribed."
@@ -6,11 +6,10 @@ class rds_cluster_default_admin(Check):
def execute(self):
findings = []
for db_cluster in rds_client.db_clusters:
report = Check_Report_AWS(self.metadata())
report.region = rds_client.db_clusters[db_cluster].region
report.resource_id = rds_client.db_clusters[db_cluster].id
report.resource_arn = db_cluster
report.resource_tags = rds_client.db_clusters[db_cluster].tags
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=rds_client.db_clusters[db_cluster],
)
report.status = "FAIL"
report.status_extended = f"RDS Cluster {rds_client.db_clusters[db_cluster].id} is using the default master username."
if rds_client.db_clusters[db_cluster].username not in [
@@ -6,11 +6,10 @@ class rds_cluster_deletion_protection(Check):
def execute(self):
findings = []
for db_cluster in rds_client.db_clusters:
report = Check_Report_AWS(self.metadata())
report.region = rds_client.db_clusters[db_cluster].region
report.resource_id = rds_client.db_clusters[db_cluster].id
report.resource_arn = db_cluster
report.resource_tags = rds_client.db_clusters[db_cluster].tags
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=rds_client.db_clusters[db_cluster],
)
report.status = "FAIL"
report.status_extended = f"RDS Cluster {rds_client.db_clusters[db_cluster].id} does not have deletion protection enabled."
if rds_client.db_clusters[db_cluster].deletion_protection:
@@ -18,11 +18,10 @@ class rds_cluster_iam_authentication_enabled(Check):
engine in rds_client.db_clusters[db_cluster].engine
for engine in supported_engines
):
report = Check_Report_AWS(self.metadata())
report.region = rds_client.db_clusters[db_cluster].region
report.resource_id = rds_client.db_clusters[db_cluster].id
report.resource_arn = db_cluster
report.resource_tags = rds_client.db_clusters[db_cluster].tags
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=rds_client.db_clusters[db_cluster],
)
if rds_client.db_clusters[db_cluster].iam_auth:
report.status = "PASS"
@@ -6,13 +6,11 @@ class rds_cluster_integration_cloudwatch_logs(Check):
def execute(self):
findings = []
valid_engines = ["aurora-mysql", "aurora-postgresql", "mysql", "postgres"]
for db_cluster_arn, db_cluster in rds_client.db_clusters.items():
for db_cluster in rds_client.db_clusters.values():
if db_cluster.engine in valid_engines:
report = Check_Report_AWS(self.metadata())
report.region = db_cluster.region
report.resource_id = db_cluster.id
report.resource_arn = db_cluster_arn
report.resource_tags = db_cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_cluster
)
if db_cluster.cloudwatch_logs:
report.status = "PASS"
report.status_extended = f"RDS Cluster {db_cluster.id} is shipping {', '.join(db_cluster.cloudwatch_logs)} logs to CloudWatch Logs."
@@ -8,11 +8,10 @@ class rds_cluster_minor_version_upgrade_enabled(Check):
for db_cluster in rds_client.db_clusters:
# Auto minor version upgrade is only available for non-Aurora Multi-AZ DB clusters
if rds_client.db_clusters[db_cluster].multi_az:
report = Check_Report_AWS(self.metadata())
report.region = rds_client.db_clusters[db_cluster].region
report.resource_id = rds_client.db_clusters[db_cluster].id
report.resource_arn = rds_client.db_clusters[db_cluster].arn
report.resource_tags = rds_client.db_clusters[db_cluster].tags
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=rds_client.db_clusters[db_cluster],
)
if rds_client.db_clusters[db_cluster].auto_minor_version_upgrade:
report.status = "PASS"
report.status_extended = f"RDS Cluster {rds_client.db_clusters[db_cluster].id} has minor version upgrade enabled."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_cluster_multi_az(Check):
def execute(self):
findings = []
for db_cluster_arn, db_cluster in rds_client.db_clusters.items():
report = Check_Report_AWS(self.metadata())
report.region = db_cluster.region
report.resource_id = db_cluster.id
report.resource_arn = db_cluster_arn
report.resource_tags = db_cluster.tags
for db_cluster in rds_client.db_clusters.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_cluster
)
report.status = "FAIL"
report.status_extended = (
f"RDS Cluster {db_cluster.id} does not have multi-AZ enabled."
@@ -12,12 +12,10 @@ class rds_cluster_non_default_port(Check):
1433: ["sqlserver"],
50000: ["db2"],
}
for db_cluster_arn, db_cluster in rds_client.db_clusters.items():
report = Check_Report_AWS(self.metadata())
report.region = db_cluster.region
report.resource_id = db_cluster.id
report.resource_arn = db_cluster_arn
report.resource_tags = db_cluster.tags
for db_cluster in rds_client.db_clusters.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_cluster
)
report.status = "PASS"
report.status_extended = (
f"RDS Cluster {db_cluster.id} is not using the default port "
@@ -6,19 +6,17 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_cluster_protected_by_backup_plan(Check):
def execute(self):
findings = []
for db_cluster_arn, db_cluster in rds_client.db_clusters.items():
report = Check_Report_AWS(self.metadata())
report.region = db_cluster.region
report.resource_id = db_cluster.id
report.resource_arn = db_cluster_arn
report.resource_tags = db_cluster.tags
for db_cluster in rds_client.db_clusters.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_cluster
)
report.status = "FAIL"
report.status_extended = (
f"RDS Cluster {db_cluster.id} is not protected by a backup plan."
)
if (
db_cluster_arn in backup_client.protected_resources
db_cluster.arn in backup_client.protected_resources
or f"arn:{rds_client.audited_partition}:rds:*:*:cluster:*"
in backup_client.protected_resources
or "*" in backup_client.protected_resources
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_cluster_storage_encrypted(Check):
def execute(self):
findings = []
for db_cluster_arn, db_cluster in rds_client.db_clusters.items():
report = Check_Report_AWS(self.metadata())
report.region = db_cluster.region
report.resource_id = db_cluster.id
report.resource_arn = db_cluster_arn
report.resource_tags = db_cluster.tags
for db_cluster in rds_client.db_clusters.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_cluster
)
if db_cluster.encrypted:
report.status = "PASS"
report.status_extended = f"RDS cluster {db_cluster.id} is encrypted."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_backup_enabled(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.backup_retention_period > 0:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} has backup enabled with retention period {db_instance.backup_retention_period} days."
@@ -14,12 +14,10 @@ class rds_instance_certificate_expiration(Check):
# RDS Certificates that are expired the check will FAIL with a severity of critical.
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
report.status = "FAIL"
report.check_metadata.Severity = Severity.critical
report.status_extended = (
@@ -5,17 +5,15 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_copy_tags_to_snapshots(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
for db_instance in rds_client.db_instances.values():
if db_instance.engine not in [
"aurora",
"aurora-mysql",
"aurora-postgresql",
]:
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.copy_tags_to_snapshot:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} has copy tags to snapshots enabled."
@@ -7,7 +7,9 @@ class rds_instance_critical_event_subscription(Check):
findings = []
if rds_client.provider.scan_unused_services or rds_client.db_instances:
for db_event in rds_client.db_event_subscriptions:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
report.status = "FAIL"
report.status_extended = "RDS instance event categories of maintenance, configuration change, and failure are not subscribed."
report.resource_id = rds_client.audited_account
@@ -15,41 +17,29 @@ class rds_instance_critical_event_subscription(Check):
report.region = db_event.region
report.resource_tags = db_event.tags
if db_event.source_type == "db-instance" and db_event.enabled:
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
if db_event.event_list == [] or set(db_event.event_list) == {
"maintenance",
"configuration change",
"failure",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "PASS"
report.status_extended = "RDS instance events are subscribed."
elif set(db_event.event_list) == {"maintenance"}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS instance event categories of configuration change and failure are not subscribed."
elif set(db_event.event_list) == {"configuration change"}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS instance event categories of maintenance and failure are not subscribed."
elif set(db_event.event_list) == {"failure"}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS instance event categories of maintenance and configuration change are not subscribed."
elif set(db_event.event_list) == {
"maintenance",
"configuration change",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = (
"RDS instance event category of failure is not subscribed."
@@ -58,18 +48,12 @@ class rds_instance_critical_event_subscription(Check):
"maintenance",
"failure",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS instance event category of configuration change is not subscribed."
elif set(db_event.event_list) == {
"configuration change",
"failure",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS instance event category of maintenance is not subscribed."
findings.append(report)
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_default_admin(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
# Check if is member of a cluster
if db_instance.cluster_id:
if (
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_deletion_protection(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
# Check if is member of a cluster
if db_instance.cluster_id:
if (
@@ -5,13 +5,11 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_deprecated_engine_version(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
report.status = "FAIL"
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
report.status_extended = f"RDS instance {db_instance.id} is using a deprecated engine {db_instance.engine} with version {db_instance.engine_version}."
if (
hasattr(
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_enhanced_monitoring_enabled(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.enhanced_monitoring_arn:
report.status = "PASS"
report.status_extended = (
@@ -7,7 +7,9 @@ class rds_instance_event_subscription_parameter_groups(Check):
findings = []
if rds_client.provider.scan_unused_services or rds_client.db_instances:
for db_event in rds_client.db_event_subscriptions:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
report.status = "FAIL"
report.status_extended = "RDS parameter group event categories of configuration change is not subscribed."
report.resource_id = rds_client.audited_account
@@ -18,9 +20,9 @@ class rds_instance_event_subscription_parameter_groups(Check):
if db_event.event_list == [] or db_event.event_list == [
"configuration change",
]:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
report.status = "PASS"
report.status_extended = (
"RDS parameter group events are subscribed."
@@ -7,37 +7,32 @@ class rds_instance_event_subscription_security_groups(Check):
findings = []
if rds_client.provider.scan_unused_services or rds_client.db_instances:
for db_event in rds_client.db_event_subscriptions:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
report.status = "FAIL"
report.status_extended = "RDS security group event categories of configuration change and failure are not subscribed."
report.resource_id = rds_client.audited_account
report.resource_arn = rds_client._get_rds_arn_template(db_event.region)
report.region = db_event.region
report.resource_tags = []
if db_event.source_type == "db-security-group" and db_event.enabled:
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_event
)
if db_event.event_list == [] or set(db_event.event_list) == {
"failure",
"configuration change",
}:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "PASS"
report.status_extended = (
"RDS security group events are subscribed."
)
elif db_event.event_list == ["configuration change"]:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS security group event category of failure is not subscribed."
elif db_event.event_list == ["failure"]:
report.resource_id = db_event.id
report.resource_arn = db_event.arn
report.resource_tags = db_event.tags
report.status = "FAIL"
report.status_extended = "RDS security group event category of configuration change is not subscribed."
@@ -13,13 +13,11 @@ class rds_instance_iam_authentication_enabled(Check):
"aurora",
]
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
for db_instance in rds_client.db_instances.values():
if any(engine in db_instance.engine for engine in supported_engines):
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
# Check if is member of a cluster
if db_instance.cluster_id:
if db_instance.iam_auth:
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_inside_vpc(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.vpc_id:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} is deployed in a VPC {db_instance.vpc_id}."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_integration_cloudwatch_logs(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.cloudwatch_logs:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} is shipping {', '.join(db_instance.cloudwatch_logs)} logs to CloudWatch Logs."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_minor_version_upgrade_enabled(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.auto_minor_version_upgrade:
report.status = "PASS"
report.status_extended = (
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_multi_az(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
# Check if is member of a cluster
if db_instance.cluster_id:
if (
@@ -8,12 +8,10 @@ from prowler.providers.aws.services.vpc.vpc_client import vpc_client
class rds_instance_no_public_access(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
report.status = "PASS"
report.status_extended = (
f"RDS Instance {db_instance.id} is not publicly accessible."
@@ -12,12 +12,10 @@ class rds_instance_non_default_port(Check):
1433: ["sqlserver"],
50000: ["db2"],
}
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
report.status = "PASS"
report.status_extended = (
f"RDS Instance {db_instance.id} is not using the default port "
@@ -6,7 +6,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_protected_by_backup_plan(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
# Makes sure the instance is not running with an Aurora engine
# Aurora backup plans require enabling it separately from RDS
if db_instance.engine not in [
@@ -14,18 +17,13 @@ class rds_instance_protected_by_backup_plan(Check):
"aurora",
"aurora-postgresql",
]:
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
report.status = "FAIL"
report.status_extended = (
f"RDS Instance {db_instance.id} is not protected by a backup plan."
)
if (
db_instance_arn in backup_client.protected_resources
db_instance.arn in backup_client.protected_resources
or f"arn:{rds_client.audited_partition}:rds:*:*:instance:*"
in backup_client.protected_resources
or "*" in backup_client.protected_resources
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.rds.rds_client import rds_client
class rds_instance_storage_encrypted(Check):
def execute(self):
findings = []
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
if db_instance.encrypted:
report.status = "PASS"
report.status_extended = f"RDS Instance {db_instance.id} is encrypted."
@@ -16,12 +16,10 @@ class rds_instance_transport_encrypted(Check):
"mariadb",
"aurora-mysql",
]
for db_instance_arn, db_instance in rds_client.db_instances.items():
report = Check_Report_AWS(self.metadata())
report.region = db_instance.region
report.resource_id = db_instance.id
report.resource_arn = db_instance_arn
report.resource_tags = db_instance.tags
for db_instance in rds_client.db_instances.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_instance
)
report.status = "FAIL"
report.status_extended = (
f"RDS Instance {db_instance.id} connections are not encrypted."
@@ -58,11 +56,10 @@ class rds_instance_transport_encrypted(Check):
findings.append(report)
for db_cluster in rds_client.db_clusters:
report = Check_Report_AWS(self.metadata())
report.region = rds_client.db_clusters[db_cluster].region
report.resource_id = rds_client.db_clusters[db_cluster].id
report.resource_arn = db_cluster
report.resource_tags = rds_client.db_clusters[db_cluster].tags
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=rds_client.db_clusters[db_cluster],
)
report.status = "FAIL"
report.status_extended = f"RDS Cluster {rds_client.db_clusters[db_cluster].id} connections are not encrypted."
# Check RDS Clusters that support TLS encryption
@@ -6,11 +6,9 @@ class rds_snapshots_encrypted(Check):
def execute(self):
findings = []
for db_snap in rds_client.db_snapshots:
report = Check_Report_AWS(self.metadata())
report.region = db_snap.region
report.resource_id = db_snap.id
report.resource_arn = db_snap.arn
report.resource_tags = db_snap.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_snap
)
if db_snap.encrypted:
report.status = "PASS"
report.status_extended = (
@@ -25,11 +23,9 @@ class rds_snapshots_encrypted(Check):
findings.append(report)
for db_snap in rds_client.db_cluster_snapshots:
report = Check_Report_AWS(self.metadata())
report.region = db_snap.region
report.resource_id = db_snap.id
report.resource_arn = db_snap.arn
report.resource_tags = db_snap.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_snap
)
if db_snap.encrypted:
report.status = "PASS"
report.status_extended = (
@@ -6,11 +6,9 @@ class rds_snapshots_public_access(Check):
def execute(self):
findings = []
for db_snap in rds_client.db_snapshots:
report = Check_Report_AWS(self.metadata())
report.region = db_snap.region
report.resource_id = db_snap.id
report.resource_arn = db_snap.arn
report.resource_tags = db_snap.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_snap
)
if db_snap.public:
report.status = "FAIL"
report.status_extended = (
@@ -25,11 +23,9 @@ class rds_snapshots_public_access(Check):
findings.append(report)
for db_snap in rds_client.db_cluster_snapshots:
report = Check_Report_AWS(self.metadata())
report.region = db_snap.region
report.resource_id = db_snap.id
report.resource_arn = db_snap.arn
report.resource_tags = db_snap.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=db_snap
)
if db_snap.public:
report.status = "FAIL"
report.status_extended = f"RDS Cluster Snapshot {db_snap.id} is public."
@@ -6,11 +6,9 @@ class redshift_cluster_audit_logging(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "PASS"
report.status_extended = (
f"Redshift Cluster {cluster.id} has audit logging enabled."
@@ -6,11 +6,9 @@ class redshift_cluster_automated_snapshot(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "PASS"
report.status_extended = (
f"Redshift Cluster {cluster.id} has automated snapshots enabled."
@@ -6,11 +6,9 @@ class redshift_cluster_automatic_upgrades(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "PASS"
report.status_extended = (
f"Redshift Cluster {cluster.id} has AllowVersionUpgrade enabled."
@@ -6,11 +6,9 @@ class redshift_cluster_encrypted_at_rest(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "FAIL"
report.status_extended = (
f"Redshift Cluster {cluster.id} is not encrypted at rest."
@@ -6,11 +6,9 @@ class redshift_cluster_enhanced_vpc_routing(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "FAIL"
report.status_extended = f"Redshift Cluster {cluster.id} does not have Enhanced VPC Routing security feature enabled."
if cluster.enhanced_vpc_routing:
@@ -6,11 +6,9 @@ class redshift_cluster_in_transit_encryption_enabled(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "FAIL"
report.status_extended = (
f"Redshift Cluster {cluster.id} is not encrypted in transit."
@@ -6,11 +6,9 @@ class redshift_cluster_multi_az_enabled(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "FAIL"
report.status_extended = (
f"Redshift Cluster {cluster.id} does not have Multi-AZ enabled."
@@ -6,11 +6,9 @@ class redshift_cluster_non_default_database_name(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "PASS"
report.status_extended = f"Redshift Cluster {cluster.id} does not have the default database name."
if cluster.database_name == "dev":
@@ -6,11 +6,9 @@ class redshift_cluster_non_default_username(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "PASS"
report.status_extended = f"Redshift Cluster {cluster.id} does not have the default Admin username."
if cluster.master_username == "awsuser":
@@ -9,11 +9,9 @@ class redshift_cluster_public_access(Check):
def execute(self):
findings = []
for cluster in redshift_client.clusters:
report = Check_Report_AWS(self.metadata())
report.region = cluster.region
report.resource_id = cluster.id
report.resource_arn = cluster.arn
report.resource_tags = cluster.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=cluster
)
report.status = "PASS"
report.status_extended = (
f"Redshift Cluster {cluster.id} is not publicly accessible."
@@ -8,7 +8,10 @@ class resourceexplorer2_indexes_found(Check):
def execute(self):
findings = []
if resource_explorer_2_client.indexes is not None:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=resource_explorer_2_client.indexes,
)
report.status = "FAIL"
report.status_extended = "No Resource Explorer Indexes found."
report.region = resource_explorer_2_client.region
@@ -9,11 +9,9 @@ class route53_domains_privacy_protection_enabled(Check):
findings = []
for domain in route53domains_client.domains.values():
report = Check_Report_AWS(self.metadata())
report.resource_id = domain.name
report.resource_arn = domain.arn
report.region = domain.region
report.resource_tags = domain.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=domain
)
if domain.admin_privacy:
report.status = "PASS"
report.status_extended = (
@@ -9,11 +9,9 @@ class route53_domains_transferlock_enabled(Check):
findings = []
for domain in route53domains_client.domains.values():
report = Check_Report_AWS(self.metadata())
report.resource_id = domain.name
report.resource_arn = domain.arn
report.region = domain.region
report.resource_tags = domain.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=domain
)
if domain.status_list and "clientTransferProhibited" in domain.status_list:
report.status = "PASS"
report.status_extended = (
@@ -8,11 +8,9 @@ class route53_public_hosted_zones_cloudwatch_logging_enabled(Check):
for hosted_zone in route53_client.hosted_zones.values():
if not hosted_zone.private_zone:
report = Check_Report_AWS(self.metadata())
report.resource_id = hosted_zone.id
report.resource_arn = hosted_zone.arn
report.resource_tags = hosted_zone.tags
report.region = hosted_zone.region
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=hosted_zone
)
if (
hosted_zone.logging_config
and hosted_zone.logging_config.cloudwatch_log_group_arn
@@ -5,11 +5,10 @@ from prowler.providers.aws.services.s3.s3control_client import s3control_client
class s3_access_point_public_access_block(Check):
def execute(self):
findings = []
for arn, access_point in s3control_client.access_points.items():
report = Check_Report_AWS(self.metadata())
report.region = access_point.region
report.resource_id = access_point.name
report.resource_arn = arn
for access_point in s3control_client.access_points.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=access_point
)
report.status = "PASS"
report.status_extended = f"Access Point {access_point.name} of bucket {access_point.bucket} does have Public Access Block enabled."
@@ -6,7 +6,10 @@ from prowler.providers.aws.services.s3.s3control_client import s3control_client
class s3_account_level_public_access_blocks(Check):
def execute(self):
findings = []
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=s3control_client.account_public_access_block,
)
if (
s3control_client.account_public_access_block
and s3control_client.account_public_access_block.ignore_public_acls
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_acl_prohibited(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} has bucket ACLs enabled."
if bucket.ownership:
@@ -6,12 +6,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_cross_account_access(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has a bucket policy but it does not allow cross account access."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_cross_region_replication(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} does not have correct cross region replication configuration."
if bucket.replication_rules:
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_default_encryption(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
if bucket.encryption:
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has Server Side Encryption with {bucket.encryption}."
@@ -17,12 +17,10 @@ class s3_bucket_event_notifications_enabled(Check):
list[Check_Report_AWS]: List of Check_Report_AWS objects
"""
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "FAIL"
report.status_extended = (
f"S3 Bucket {bucket.name} does not have event notifications enabled."
@@ -5,13 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_kms_encryption(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
if bucket.encryption == "aws:kms" or bucket.encryption == "aws:kms:dsse":
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has Server Side Encryption with {bucket.encryption}."
@@ -6,13 +6,11 @@ from prowler.providers.aws.services.s3.s3control_client import s3control_client
class s3_bucket_level_public_access_block(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
for bucket in s3_client.buckets.values():
if bucket.public_access_block:
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "PASS"
report.status_extended = f"Block Public Access is configured for the S3 Bucket {bucket.name}."
if not (
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_lifecycle_enabled(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} does not have a lifecycle configuration enabled."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_no_mfa_delete(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
if bucket.mfa_delete:
report.status = "PASS"
report.status_extended = (
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_object_lock(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
if bucket.object_lock:
report.status = "PASS"
report.status_extended = (
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_object_versioning(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
if bucket.versioning:
report.status = "PASS"
report.status_extended = (
@@ -7,12 +7,10 @@ from prowler.providers.aws.services.s3.s3control_client import s3control_client
class s3_bucket_policy_public_write_access(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
# Check if bucket policy allow public write access
if not bucket.policy:
report.status = "PASS"
@@ -13,7 +13,10 @@ class s3_bucket_public_access(Check):
and s3control_client.account_public_access_block.ignore_public_acls
and s3control_client.account_public_access_block.restrict_public_buckets
):
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=s3control_client.account_public_access_block,
)
report.status = "PASS"
report.status_extended = "All S3 public access blocked at account level."
report.region = s3control_client.region
@@ -22,13 +25,11 @@ class s3_bucket_public_access(Check):
findings.append(report)
else:
# 2. If public access is not blocked at account level, check it at each bucket level
for arn, bucket in s3_client.buckets.items():
for bucket in s3_client.buckets.values():
if bucket.public_access_block:
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} is not public."
if not (
@@ -12,7 +12,10 @@ class s3_bucket_public_list_acl(Check):
and s3control_client.account_public_access_block.ignore_public_acls
and s3control_client.account_public_access_block.restrict_public_buckets
):
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=s3control_client.account_public_access_block,
)
report.status = "PASS"
report.status_extended = "All S3 public access blocked at account level."
report.region = s3control_client.region
@@ -21,13 +24,11 @@ class s3_bucket_public_list_acl(Check):
findings.append(report)
else:
# 2. If public access is not blocked at account level, check it at each bucket level
for arn, bucket in s3_client.buckets.items():
for bucket in s3_client.buckets.values():
if bucket.public_access_block:
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "PASS"
report.status_extended = (
f"S3 Bucket {bucket.name} is not publicly listable."
@@ -12,7 +12,10 @@ class s3_bucket_public_write_acl(Check):
and s3control_client.account_public_access_block.ignore_public_acls
and s3control_client.account_public_access_block.restrict_public_buckets
):
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=s3control_client.account_public_access_block,
)
report.status = "PASS"
report.status_extended = "All S3 public access blocked at account level."
report.region = s3control_client.region
@@ -21,13 +24,11 @@ class s3_bucket_public_write_acl(Check):
findings.append(report)
else:
# 2. If public access is not blocked at account level, check it at each bucket level
for arn, bucket in s3_client.buckets.items():
for bucket in s3_client.buckets.values():
if bucket.public_access_block:
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
report.status = "PASS"
report.status_extended = (
f"S3 Bucket {bucket.name} is not publicly writable."
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_secure_transport_policy(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
# Check if bucket policy enforces SSL
if not bucket.policy:
report.status = "FAIL"
@@ -5,12 +5,10 @@ from prowler.providers.aws.services.s3.s3_client import s3_client
class s3_bucket_server_access_logging_enabled(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
for bucket in s3_client.buckets.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=bucket
)
if bucket.logging:
report.status = "PASS"
report.status_extended = (
@@ -18,10 +18,9 @@ class s3_multi_region_access_point_public_access_block(Check):
"""
findings = []
for mr_access_point in s3control_client.multi_region_access_points.values():
report = Check_Report_AWS(self.metadata())
report.region = mr_access_point.region
report.resource_id = mr_access_point.name
report.resource_arn = mr_access_point.arn
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=mr_access_point
)
report.status = "PASS"
report.status_extended = f"S3 Multi Region Access Point {mr_access_point.name} of buckets {', '.join(mr_access_point.buckets)} does have Public Access Block enabled."
@@ -60,11 +60,13 @@ class S3(AWSService):
if provider.identity.audited_regions:
if bucket_region in provider.identity.audited_regions:
self.buckets[arn] = Bucket(
arn=arn,
name=bucket["Name"],
region=bucket_region,
)
else:
self.buckets[arn] = Bucket(
arn=arn,
name=bucket["Name"],
region=bucket_region,
)
@@ -673,6 +675,7 @@ class ReplicationRule(BaseModel):
class Bucket(BaseModel):
arn: str
name: str
versioning: bool = False
logging: bool = False
@@ -6,11 +6,9 @@ class sagemaker_endpoint_config_prod_variant_instances(Check):
def execute(self):
findings = []
for endpoint_config in sagemaker_client.endpoint_configs.values():
report = Check_Report_AWS(self.metadata())
report.region = endpoint_config.region
report.resource_id = endpoint_config.name
report.resource_arn = endpoint_config.arn
report.resource_tags = endpoint_config.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=endpoint_config
)
report.status = "PASS"
report.status_extended = f"Sagemaker Endpoint Config {endpoint_config.name} has all production variants with more than one initial instance."
non_compliant_production_variants = []
@@ -6,11 +6,7 @@ class sagemaker_models_network_isolation_enabled(Check):
def execute(self):
findings = []
for model in sagemaker_client.sagemaker_models:
report = Check_Report_AWS(self.metadata())
report.region = model.region
report.resource_id = model.name
report.resource_arn = model.arn
report.resource_tags = model.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=model)
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {model.name} has network isolation enabled."
if not model.network_isolation:
@@ -6,11 +6,9 @@ class sagemaker_notebook_instance_encryption_enabled(Check):
def execute(self):
findings = []
for notebook_instance in sagemaker_client.sagemaker_notebook_instances:
report = Check_Report_AWS(self.metadata())
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=notebook_instance
)
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {notebook_instance.name} has data encryption enabled."
if not notebook_instance.kms_key_id:
@@ -6,11 +6,9 @@ class sagemaker_notebook_instance_root_access_disabled(Check):
def execute(self):
findings = []
for notebook_instance in sagemaker_client.sagemaker_notebook_instances:
report = Check_Report_AWS(self.metadata())
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=notebook_instance
)
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {notebook_instance.name} has root access disabled."
if notebook_instance.root_access:
@@ -6,11 +6,9 @@ class sagemaker_notebook_instance_vpc_settings_configured(Check):
def execute(self):
findings = []
for notebook_instance in sagemaker_client.sagemaker_notebook_instances:
report = Check_Report_AWS(self.metadata())
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=notebook_instance
)
report.status = "PASS"
report.status_extended = (
f"Sagemaker notebook instance {notebook_instance.name} is in a VPC."
@@ -6,11 +6,9 @@ class sagemaker_notebook_instance_without_direct_internet_access_configured(Chec
def execute(self):
findings = []
for notebook_instance in sagemaker_client.sagemaker_notebook_instances:
report = Check_Report_AWS(self.metadata())
report.region = notebook_instance.region
report.resource_id = notebook_instance.name
report.resource_arn = notebook_instance.arn
report.resource_tags = notebook_instance.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=notebook_instance
)
report.status = "PASS"
report.status_extended = f"Sagemaker notebook instance {notebook_instance.name} has direct internet access disabled."
if notebook_instance.direct_internet_access:
@@ -6,11 +6,9 @@ class sagemaker_training_jobs_intercontainer_encryption_enabled(Check):
def execute(self):
findings = []
for training_job in sagemaker_client.sagemaker_training_jobs:
report = Check_Report_AWS(self.metadata())
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=training_job
)
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has intercontainer encryption enabled."
if not training_job.container_traffic_encryption:
@@ -6,11 +6,9 @@ class sagemaker_training_jobs_network_isolation_enabled(Check):
def execute(self):
findings = []
for training_job in sagemaker_client.sagemaker_training_jobs:
report = Check_Report_AWS(self.metadata())
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=training_job
)
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has network isolation enabled."
if not training_job.network_isolation:
@@ -6,11 +6,9 @@ class sagemaker_training_jobs_volume_and_output_encryption_enabled(Check):
def execute(self):
findings = []
for training_job in sagemaker_client.sagemaker_training_jobs:
report = Check_Report_AWS(self.metadata())
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=training_job
)
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has KMS encryption enabled."
if not training_job.volume_kms_key_id:
@@ -6,11 +6,9 @@ class sagemaker_training_jobs_vpc_settings_configured(Check):
def execute(self):
findings = []
for training_job in sagemaker_client.sagemaker_training_jobs:
report = Check_Report_AWS(self.metadata())
report.region = training_job.region
report.resource_id = training_job.name
report.resource_arn = training_job.arn
report.resource_tags = training_job.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=training_job
)
report.status = "PASS"
report.status_extended = f"Sagemaker training job {training_job.name} has VPC settings for the training job volume and output enabled."
if not training_job.vpc_config_subnets:
@@ -8,11 +8,9 @@ class secretsmanager_automatic_rotation_enabled(Check):
def execute(self):
findings = []
for secret in secretsmanager_client.secrets.values():
report = Check_Report_AWS(self.metadata())
report.region = secret.region
report.resource_id = secret.name
report.resource_arn = secret.arn
report.resource_tags = secret.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=secret
)
if secret.rotation_enabled:
report.status = "PASS"
report.status_extended = (
@@ -9,11 +9,9 @@ class secretsmanager_not_publicly_accessible(Check):
def execute(self):
findings = []
for secret in secretsmanager_client.secrets.values():
report = Check_Report_AWS(self.metadata())
report.region = secret.region
report.resource_id = secret.name
report.resource_arn = secret.arn
report.resource_tags = secret.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=secret
)
report.status = "PASS"
report.status_extended = (
f"SecretsManager secret {secret.name} is not publicly accessible."
@@ -25,11 +25,9 @@ class secretsmanager_secret_rotated_periodically(Check):
"""
findings = []
for secret in secretsmanager_client.secrets.values():
report = Check_Report_AWS(self.metadata())
report.resource_id = secret.name
report.resource_arn = secret.arn
report.region = secret.region
report.resource_tags = secret.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=secret
)
report.status = "PASS"
report.status_extended = f"Secret {secret.name} was last rotated on {secret.last_rotated_date.strftime('%B %d, %Y')}."
@@ -10,11 +10,9 @@ class secretsmanager_secret_unused(Check):
def execute(self):
findings = []
for secret in secretsmanager_client.secrets.values():
report = Check_Report_AWS(self.metadata())
report.resource_id = secret.name
report.resource_arn = secret.arn
report.region = secret.region
report.resource_tags = secret.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=secret
)
report.status = "PASS"
report.status_extended = f"Secret {secret.name} has been accessed recently, last accessed on {secret.last_accessed_date.strftime('%B %d, %Y')}."
@@ -8,11 +8,9 @@ class securityhub_enabled(Check):
def execute(self):
findings = []
for securityhub in securityhub_client.securityhubs:
report = Check_Report_AWS(self.metadata())
report.region = securityhub.region
report.resource_id = securityhub.id
report.resource_arn = securityhub.arn
report.resource_tags = securityhub.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=securityhub
)
if securityhub.status == "ACTIVE":
report.status = "PASS"
if securityhub.standards:
@@ -16,11 +16,9 @@ class servicecatalog_portfolio_shared_within_organization_only(Check):
):
for portfolio in servicecatalog_client.portfolios.values():
if portfolio.shares is not None:
report = Check_Report_AWS(self.metadata())
report.region = portfolio.region
report.resource_id = portfolio.id
report.resource_arn = portfolio.arn
report.resource_tags = portfolio.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=portfolio
)
report.status = "PASS"
report.status_extended = f"ServiceCatalog Portfolio {portfolio.name} is shared within your AWS Organization."
for portfolio_share in portfolio.shares:
@@ -7,11 +7,9 @@ class ses_identity_not_publicly_accessible(Check):
def execute(self):
findings = []
for identity in ses_client.email_identities.values():
report = Check_Report_AWS(self.metadata())
report.region = identity.region
report.resource_id = identity.name
report.resource_arn = identity.arn
report.resource_tags = identity.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=identity
)
report.status = "PASS"
report.status_extended = (
f"SES identity {identity.name} is not publicly accessible."
@@ -8,11 +8,11 @@ class shield_advanced_protection_in_associated_elastic_ips(Check):
findings = []
if shield_client.enabled:
for elastic_ip in ec2_client.elastic_ips:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=elastic_ip
)
report.region = shield_client.region
report.resource_id = elastic_ip.allocation_id
report.resource_arn = elastic_ip.arn
report.resource_tags = elastic_ip.tags
report.status = "FAIL"
report.status_extended = f"Elastic IP {elastic_ip.allocation_id} is not protected by AWS Shield Advanced."
@@ -7,22 +7,21 @@ class shield_advanced_protection_in_classic_load_balancers(Check):
def execute(self):
findings = []
if shield_client.enabled:
for elb_arn, elb in elb_client.loadbalancers.items():
report = Check_Report_AWS(self.metadata())
for lb in elb_client.loadbalancers.values():
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=lb
)
report.region = shield_client.region
report.resource_id = elb.name
report.resource_arn = elb_arn
report.resource_tags = elb.tags
report.status = "FAIL"
report.status_extended = (
f"ELB {elb.name} is not protected by AWS Shield Advanced."
f"ELB {lb.name} is not protected by AWS Shield Advanced."
)
for protection in shield_client.protections.values():
if elb_arn == protection.resource_arn:
if lb.arn == protection.resource_arn:
report.status = "PASS"
report.status_extended = (
f"ELB {elb.name} is protected by AWS Shield Advanced."
f"ELB {lb.name} is protected by AWS Shield Advanced."
)
break
@@ -10,11 +10,10 @@ class shield_advanced_protection_in_cloudfront_distributions(Check):
findings = []
if shield_client.enabled:
for distribution in cloudfront_client.distributions.values():
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=distribution
)
report.region = shield_client.region
report.resource_id = distribution.id
report.resource_arn = distribution.arn
report.resource_tags = distribution.tags
report.status = "FAIL"
report.status_extended = f"CloudFront distribution {distribution.id} is not protected by AWS Shield Advanced."
@@ -10,11 +10,10 @@ class shield_advanced_protection_in_global_accelerators(Check):
findings = []
if shield_client.enabled:
for accelerator in globalaccelerator_client.accelerators.values():
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=accelerator
)
report.region = shield_client.region
report.resource_id = accelerator.name
report.resource_arn = accelerator.arn
report.resource_tags = accelerator.tags
report.status = "FAIL"
report.status_extended = f"Global Accelerator {accelerator.name} is not protected by AWS Shield Advanced."
@@ -8,11 +8,10 @@ class shield_advanced_protection_in_route53_hosted_zones(Check):
findings = []
if shield_client.enabled:
for hosted_zone in route53_client.hosted_zones.values():
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=hosted_zone
)
report.region = shield_client.region
report.resource_id = hosted_zone.id
report.resource_arn = hosted_zone.arn
report.resource_tags = hosted_zone.tags
report.status = "FAIL"
report.status_extended = f"Route53 Hosted Zone {hosted_zone.id} is not protected by AWS Shield Advanced."
@@ -10,10 +10,8 @@ class sns_subscription_not_using_http_endpoints(Check):
if subscription.arn == "PendingConfirmation":
continue
report = Check_Report_AWS(self.metadata())
report.region = topic.region
report.resource_id = subscription.id
report.resource_arn = subscription.arn
report.resource_tags = topic.tags
report.resource_details = topic.arn
report.status = "PASS"
report.status_extended = (
@@ -6,11 +6,7 @@ class sns_topics_kms_encryption_at_rest_enabled(Check):
def execute(self):
findings = []
for topic in sns_client.topics:
report = Check_Report_AWS(self.metadata())
report.region = topic.region
report.resource_id = topic.name
report.resource_arn = topic.arn
report.resource_tags = topic.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=topic)
report.status = "PASS"
report.status_extended = f"SNS topic {topic.name} is encrypted."
if not topic.kms_master_key_id:
@@ -10,11 +10,7 @@ class sns_topics_not_publicly_accessible(Check):
def execute(self):
findings = []
for topic in sns_client.topics:
report = Check_Report_AWS(self.metadata())
report.region = topic.region
report.resource_id = topic.name
report.resource_arn = topic.arn
report.resource_tags = topic.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=topic)
report.status = "PASS"
report.status_extended = (
f"SNS topic {topic.name} is not publicly accessible."
@@ -7,11 +7,7 @@ class sqs_queues_not_publicly_accessible(Check):
def execute(self):
findings = []
for queue in sqs_client.queues:
report = Check_Report_AWS(self.metadata())
report.region = queue.region
report.resource_id = queue.id
report.resource_arn = queue.arn
report.resource_tags = queue.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=queue)
report.status = "PASS"
report.status_extended = f"SQS queue {queue.id} is not public."
if queue.policy:
@@ -6,11 +6,7 @@ class sqs_queues_server_side_encryption_enabled(Check):
def execute(self):
findings = []
for queue in sqs_client.queues:
report = Check_Report_AWS(self.metadata())
report.region = queue.region
report.resource_id = queue.id
report.resource_arn = queue.arn
report.resource_tags = queue.tags
report = Check_Report_AWS(metadata=self.metadata(), resource_metadata=queue)
report.status = "PASS"
report.status_extended = (
f"SQS queue {queue.id} is using Server Side Encryption."
@@ -6,11 +6,9 @@ class ssm_documents_set_as_public(Check):
def execute(self):
findings = []
for document in ssm_client.documents.values():
report = Check_Report_AWS(self.metadata())
report.region = document.region
report.resource_arn = document.arn
report.resource_id = document.name
report.resource_tags = document.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=document
)
trusted_account_ids = ssm_client.audit_config.get("trusted_account_ids", [])
if ssm_client.audited_account not in trusted_account_ids:
trusted_account_ids.append(ssm_client.audited_account)
@@ -8,10 +8,9 @@ class ssm_managed_compliant_patching(Check):
def execute(self):
findings = []
for resource in ssm_client.compliance_resources.values():
report = Check_Report_AWS(self.metadata())
report.region = resource.region
report.resource_id = resource.id
report.resource_arn = resource.arn
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=resource
)
# Find tags of the instance in ec2_client
for instance in ec2_client.instances:
if instance.id == resource.id:
@@ -8,7 +8,10 @@ class ssmincidents_enabled_with_plans(Check):
def execute(self):
findings = []
if ssmincidents_client.replication_set is not None:
report = Check_Report_AWS(self.metadata())
report = Check_Report_AWS(
metadata=self.metadata(),
resource_metadata=ssmincidents_client.replication_set,
)
report.status = "FAIL"
report.status_extended = "No SSM Incidents replication set exists."
report.resource_arn = ssmincidents_client.replication_set_arn_template
@@ -29,11 +29,9 @@ class stepfunctions_statemachine_logging_enabled(Check):
"""
findings = []
for state_machine in stepfunctions_client.state_machines.values():
report = Check_Report_AWS(self.metadata())
report.region = state_machine.region
report.resource_id = state_machine.id
report.resource_arn = state_machine.arn
report.resource_tags = state_machine.tags
report = Check_Report_AWS(
metadata=self.metadata(), resource_metadata=state_machine
)
report.status = "PASS"
report.status_extended = f"Step Functions state machine {state_machine.name} has logging enabled."

Some files were not shown because too many files have changed in this diff Show More