Compare commits

...

3 Commits

Author SHA1 Message Date
pedrooot
ded0fd9b36 feat(cloudwatch): revert changes from report creation 2024-12-18 17:11:03 +01:00
pedrooot
a11bef6622 feat(check_cloudwatch_log_metric_filter): add comprobations for Nonetypes 2024-12-18 16:52:51 +01:00
pedrooot
9575d22c72 fix(checks): add getattr to fix NoneType errors 2024-12-18 16:00:17 +01:00
10 changed files with 103 additions and 59 deletions

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.cloudwatch.cloudwatch_client import (
class cloudwatch_alarm_actions_alarm_state_configured(Check):
def execute(self):
findings = []
for metric_alarm in cloudwatch_client.metric_alarms:
for metric_alarm in getattr(cloudwatch_client, "metric_alarms", []):
report = Check_Report_AWS(self.metadata())
report.region = metric_alarm.region
report.resource_id = metric_alarm.name

View File

@@ -7,7 +7,7 @@ from prowler.providers.aws.services.cloudwatch.cloudwatch_client import (
class cloudwatch_alarm_actions_enabled(Check):
def execute(self):
findings = []
for metric_alarm in cloudwatch_client.metric_alarms:
for metric_alarm in getattr(cloudwatch_client, "metric_alarms", []):
report = Check_Report_AWS(self.metadata())
report.region = metric_alarm.region
report.resource_id = metric_alarm.name

View File

@@ -7,14 +7,14 @@ class cloudwatch_log_group_not_publicly_accessible(Check):
def execute(self):
findings = []
public_log_groups = []
if (
logs_client.resource_policies is not None
and logs_client.log_groups is not None
if getattr(logs_client, "resource_policies", None) and getattr(
logs_client, "log_groups", None
):
for resource_policies in logs_client.resource_policies.values():
for resource_policy in resource_policies:
if is_policy_public(
resource_policy.policy, logs_client.audited_account
getattr(resource_policy, "policy", None),
getattr(logs_client, "audited_account", None),
):
for statement in resource_policy.policy.get("Statement", []):
public_resources = statement.get("Resource", [])
@@ -34,7 +34,7 @@ class cloudwatch_log_group_not_publicly_accessible(Check):
report.status_extended = (
f"Log Group {log_group.name} is not publicly accessible."
)
if log_group.arn in public_log_groups:
if getattr(log_group, "arn", None) in public_log_groups:
report.status = "FAIL"
report.status_extended = (
f"Log Group {log_group.name} is publicly accessible."

View File

@@ -19,23 +19,25 @@ def check_cloudwatch_log_metric_filter(
if trail.log_group_arn:
log_groups.append(trail.log_group_arn.split(":")[6])
# 2. Describe metric filters for previous log groups
for metric_filter in metric_filters:
if metric_filter.log_group.name in log_groups and re.search(
metric_filter_pattern, metric_filter.pattern, flags=re.DOTALL
):
report.resource_id = metric_filter.log_group.name
report.resource_arn = metric_filter.log_group.arn
report.region = metric_filter.log_group.region
report.resource_tags = getattr(metric_filter.log_group, "tags", [])
report.status = "FAIL"
report.status_extended = f"CloudWatch log group {metric_filter.log_group.name} found with metric filter {metric_filter.name} but no alarms associated."
# 3. Check if there is an alarm for the metric
for alarm in metric_alarms:
if alarm.metric == metric_filter.metric:
report.status = "PASS"
report.status_extended = f"CloudWatch log group {metric_filter.log_group.name} found with metric filter {metric_filter.name} and alarms set."
break
if report.status == "PASS":
break
if metric_filters is not None:
for metric_filter in metric_filters:
if metric_filter.log_group.name in log_groups and re.search(
metric_filter_pattern, metric_filter.pattern, flags=re.DOTALL
):
report.resource_id = metric_filter.log_group.name
report.resource_arn = metric_filter.log_group.arn
report.region = metric_filter.log_group.region
report.resource_tags = getattr(metric_filter.log_group, "tags", [])
report.status = "FAIL"
report.status_extended = f"CloudWatch log group {metric_filter.log_group.name} found with metric filter {metric_filter.name} but no alarms associated."
# 3. Check if there is an alarm for the metric
if metric_alarms is not None:
for alarm in metric_alarms:
if alarm.metric == metric_filter.metric:
report.status = "PASS"
report.status_extended = f"CloudWatch log group {metric_filter.log_group.name} found with metric filter {metric_filter.name} and alarms set."
break
if report.status == "PASS":
break
return report

View File

@@ -13,11 +13,13 @@ class codebuild_project_logging_enabled(Check):
report.resource_tags = project.tags
report.status = "PASS"
if project.cloudwatch_logs.enabled and project.s3_logs.enabled:
if getattr(
getattr(project, "cloudwatch_logs", None), "enabled", False
) and getattr(getattr(project, "s3_logs", None), "enabled", False):
report.status_extended = f"CodeBuild project {project.name} has enabled CloudWartch logs in log group {project.cloudwatch_logs.group_name} and S3 logs in bucket {project.s3_logs.bucket_location}."
elif project.cloudwatch_logs.enabled:
elif getattr(getattr(project, "cloudwatch_logs", None), "enabled", False):
report.status_extended = f"CodeBuild project {project.name} has CloudWatch logging enabled in log group {project.cloudwatch_logs.group_name}."
elif project.s3_logs.enabled:
elif getattr(getattr(project, "s3_logs", None), "enabled", False):
report.status_extended = f"CodeBuild project {project.name} has S3 logging enabled in bucket {project.s3_logs.bucket_location}."
else:
report.status = "FAIL"

View File

@@ -6,7 +6,7 @@ class codebuild_project_s3_logs_encrypted(Check):
def execute(self):
findings = []
for project in codebuild_client.projects.values():
if project.s3_logs.enabled:
if getattr(getattr(project, "s3_logs", None), "enabled", False):
report = Check_Report_AWS(self.metadata())
report.resource_id = project.name
report.resource_arn = project.arn

View File

@@ -16,18 +16,29 @@ class network_http_internet_access_restricted(Check):
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has HTTP internet access restricted."
rule_fail_condition = any(
(
rule.destination_port_range == "80"
getattr(rule, "destination_port_range", "") == "80"
or (
"-" in rule.destination_port_range
and int(rule.destination_port_range.split("-")[0]) <= 80
and int(rule.destination_port_range.split("-")[1]) >= 80
"-" in getattr(rule, "destination_port_range", "")
and int(
getattr(rule, "destination_port_range", "0-0").split(
"-"
)[0]
)
<= 80
and int(
getattr(rule, "destination_port_range", "0-0").split(
"-"
)[1]
)
>= 80
)
)
and rule.protocol in ["TCP", "Tcp", "*"]
and rule.source_address_prefix in ["Internet", "*", "0.0.0.0/0"]
and rule.access == "Allow"
and rule.direction == "Inbound"
for rule in security_group.security_rules
and getattr(rule, "protocol", "").lower() in ["tcp", "*"]
and getattr(rule, "source_address_prefix", "")
in ["Internet", "*", "0.0.0.0/0"]
and getattr(rule, "access", "") == "Allow"
and getattr(rule, "direction", "") == "Inbound"
for rule in getattr(security_group, "security_rules", []) or []
)
if rule_fail_condition:
report.status = "FAIL"

View File

@@ -16,18 +16,29 @@ class network_rdp_internet_access_restricted(Check):
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has RDP internet access restricted."
rule_fail_condition = any(
(
rule.destination_port_range == "3389"
getattr(rule, "destination_port_range", "") == "3389"
or (
"-" in rule.destination_port_range
and int(rule.destination_port_range.split("-")[0]) <= 3389
and int(rule.destination_port_range.split("-")[1]) >= 3389
"-" in getattr(rule, "destination_port_range", "")
and int(
getattr(rule, "destination_port_range", "0-0").split(
"-"
)[0]
)
<= 3389
and int(
getattr(rule, "destination_port_range", "0-0").split(
"-"
)[1]
)
>= 3389
)
)
and rule.protocol in ["TCP", "Tcp", "*"]
and rule.source_address_prefix in ["Internet", "*", "0.0.0.0/0"]
and rule.access == "Allow"
and rule.direction == "Inbound"
for rule in security_group.security_rules
and getattr(rule, "protocol", "").lower() in ["tcp", "*"]
and getattr(rule, "source_address_prefix", "")
in ["Internet", "*", "0.0.0.0/0"]
and getattr(rule, "access", "") == "Allow"
and getattr(rule, "direction", "") == "Inbound"
for rule in getattr(security_group, "security_rules", []) or []
)
if rule_fail_condition:
report.status = "FAIL"

View File

@@ -16,18 +16,29 @@ class network_ssh_internet_access_restricted(Check):
report.status_extended = f"Security Group {security_group.name} from subscription {subscription} has SSH internet access restricted."
rule_fail_condition = any(
(
rule.destination_port_range == "22"
getattr(rule, "destination_port_range", "") == "22"
or (
"-" in rule.destination_port_range
and int(rule.destination_port_range.split("-")[0]) <= 22
and int(rule.destination_port_range.split("-")[1]) >= 22
"-" in getattr(rule, "destination_port_range", "")
and int(
getattr(rule, "destination_port_range", "0-0").split(
"-"
)[0]
)
<= 22
and int(
getattr(rule, "destination_port_range", "0-0").split(
"-"
)[1]
)
>= 22
)
)
and rule.protocol in ["TCP", "Tcp", "*"]
and rule.source_address_prefix in ["Internet", "*", "0.0.0.0/0"]
and rule.access == "Allow"
and rule.direction == "Inbound"
for rule in security_group.security_rules
and getattr(rule, "protocol", "").lower() in ["tcp", "*"]
and getattr(rule, "source_address_prefix", "")
in ["Internet", "*", "0.0.0.0/0"]
and getattr(rule, "access", "") == "Allow"
and getattr(rule, "direction", "") == "Inbound"
for rule in getattr(security_group, "security_rules", []) or []
)
if rule_fail_condition:
report.status = "FAIL"

View File

@@ -16,9 +16,16 @@ class vm_trusted_launch_enabled(Check):
report.status_extended = f"VM {vm.resource_name} has trusted launch disabled in subscription {subscription_name}"
if (
vm.security_profile.security_type == "TrustedLaunch"
and vm.security_profile.uefi_settings.secure_boot_enabled
and vm.security_profile.uefi_settings.v_tpm_enabled
getattr(vm, "security_profile", None)
and getattr(vm.security_profile, "security_type", None)
== "TrustedLaunch"
and getattr(vm.security_profile, "uefi_settings", None)
and getattr(
vm.security_profile.uefi_settings, "secure_boot_enabled", False
)
and getattr(
vm.security_profile.uefi_settings, "v_tpm_enabled", False
)
):
report.status = "PASS"
report.status_extended = f"VM {vm.resource_name} has trusted launch enabled in subscription {subscription_name}"