feat: add ASD Essential Eight compliance framework for AWS (#10808)

Co-authored-by: Boon <boon@security8.work>
Co-authored-by: pedrooot <pedromarting3@gmail.com>
This commit is contained in:
Boon
2026-04-30 19:49:08 +08:00
committed by GitHub
parent 578186aa40
commit 228fe6d579
12 changed files with 1783 additions and 0 deletions
+3
View File
@@ -9,6 +9,9 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `bedrock_guardrails_configured` check for AWS provider [(#10844)](https://github.com/prowler-cloud/prowler/pull/10844)
- Universal compliance pipeline integrated into the CLI: `--list-compliance` and `--list-compliance-requirements` show universal frameworks, and CSV plus OCSF outputs are generated for any framework declaring a `TableConfig` [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
### 🚀 Added
- ASD Essential Eight Maturity Model compliance framework for AWS provider, mapping 64 checks across all 8 controls [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
### 🔄 Changed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
+15
View File
@@ -90,6 +90,9 @@ from prowler.lib.outputs.compliance.csa.csa_oraclecloud import OracleCloudCSA
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.essential_eight.essential_eight_aws import (
EssentialEightAWS,
)
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_azure import AzureISO27001
@@ -673,6 +676,18 @@ def prowler():
)
generated_outputs["compliance"].append(cis)
cis.batch_write_data_to_file()
elif compliance_name.startswith("essential_eight"):
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
essential_eight = EssentialEightAWS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(essential_eight)
essential_eight.batch_write_data_to_file()
elif compliance_name == "mitre_attack_aws":
# Generate MITRE ATT&CK Finding Object
filename = (
File diff suppressed because it is too large Load Diff
+43
View File
@@ -102,6 +102,48 @@ class CIS_Requirement_Attribute(BaseModel):
References: str
class EssentialEight_Requirement_Attribute_MaturityLevel(str, Enum):
"""ASD Essential Eight Maturity Level"""
ML1 = "ML1"
ML2 = "ML2"
ML3 = "ML3"
class EssentialEight_Requirement_Attribute_AssessmentStatus(str, Enum):
"""Essential Eight Requirement Attribute Assessment Status"""
Manual = "Manual"
Automated = "Automated"
class EssentialEight_Requirement_Attribute_CloudApplicability(str, Enum):
"""How well the ASD control maps to AWS cloud infrastructure."""
Full = "full"
Partial = "partial"
Limited = "limited"
NonApplicable = "non-applicable"
# Essential Eight Requirement Attribute
class EssentialEight_Requirement_Attribute(BaseModel):
"""ASD Essential Eight Requirement Attribute"""
Section: str
MaturityLevel: EssentialEight_Requirement_Attribute_MaturityLevel
AssessmentStatus: EssentialEight_Requirement_Attribute_AssessmentStatus
CloudApplicability: EssentialEight_Requirement_Attribute_CloudApplicability
MitigatedThreats: list[str]
Description: str
RationaleStatement: str
ImpactStatement: str
RemediationProcedure: str
AuditProcedure: str
AdditionalInformation: str
References: str
# Well Architected Requirement Attribute
class AWS_Well_Architected_Requirement_Attribute(BaseModel):
"""AWS Well Architected Requirement Attribute"""
@@ -250,6 +292,7 @@ class Compliance_Requirement(BaseModel):
Name: Optional[str] = None
Attributes: list[
Union[
EssentialEight_Requirement_Attribute,
CIS_Requirement_Attribute,
ENS_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
@@ -9,6 +9,9 @@ from prowler.lib.outputs.compliance.compliance_check import ( # noqa: F401 - re
)
from prowler.lib.outputs.compliance.csa.csa import get_csa_table
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
from prowler.lib.outputs.compliance.essential_eight.essential_eight import (
get_essential_eight_table,
)
from prowler.lib.outputs.compliance.generic.generic_table import (
get_generic_compliance_table,
)
@@ -230,6 +233,15 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "essential_eight" in compliance_framework:
get_essential_eight_table(
findings,
bulk_checks_metadata,
compliance_framework,
output_filename,
output_directory,
compliance_overview,
)
else:
get_generic_compliance_table(
findings,
@@ -0,0 +1,98 @@
from colorama import Fore, Style
from tabulate import tabulate
from prowler.config.config import orange_color
def get_essential_eight_table(
findings: list,
bulk_checks_metadata: dict,
compliance_framework: str,
output_filename: str,
output_directory: str,
compliance_overview: bool,
):
sections = {}
essential_eight_compliance_table = {
"Provider": [],
"Section": [],
"Status": [],
"Muted": [],
}
pass_count = []
fail_count = []
muted_count = []
for index, finding in enumerate(findings):
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "Essential-Eight":
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
if section not in sections:
sections[section] = {
"FAIL": 0,
"PASS": 0,
"Muted": 0,
}
if finding.muted:
if index not in muted_count:
muted_count.append(index)
sections[section]["Muted"] += 1
else:
if finding.status == "FAIL" and index not in fail_count:
fail_count.append(index)
sections[section]["FAIL"] += 1
elif finding.status == "PASS" and index not in pass_count:
pass_count.append(index)
sections[section]["PASS"] += 1
sections = dict(sorted(sections.items()))
for section in sections:
essential_eight_compliance_table["Provider"].append(compliance.Provider)
essential_eight_compliance_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
essential_eight_compliance_table["Status"].append(
f"{Fore.RED}FAIL({sections[section]['FAIL']}){Style.RESET_ALL}"
)
elif sections[section]["PASS"] > 0:
essential_eight_compliance_table["Status"].append(
f"{Fore.GREEN}PASS({sections[section]['PASS']}){Style.RESET_ALL}"
)
else:
essential_eight_compliance_table["Status"].append("-")
essential_eight_compliance_table["Muted"].append(
f"{orange_color}{sections[section]['Muted']}{Style.RESET_ALL}"
)
if len(fail_count) + len(pass_count) + len(muted_count) > 1:
print(
f"\nCompliance Status of {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Framework:"
)
total_findings_count = len(fail_count) + len(pass_count) + len(muted_count)
overview_table = [
[
f"{Fore.RED}{round(len(fail_count) / total_findings_count * 100, 2)}% ({len(fail_count)}) FAIL{Style.RESET_ALL}",
f"{Fore.GREEN}{round(len(pass_count) / total_findings_count * 100, 2)}% ({len(pass_count)}) PASS{Style.RESET_ALL}",
f"{orange_color}{round(len(muted_count) / total_findings_count * 100, 2)}% ({len(muted_count)}) MUTED{Style.RESET_ALL}",
]
]
print(tabulate(overview_table, tablefmt="rounded_grid"))
if not compliance_overview:
print(
f"\nFramework {Fore.YELLOW}{compliance_framework.upper()}{Style.RESET_ALL} Results:"
)
print(
tabulate(
essential_eight_compliance_table,
headers="keys",
tablefmt="rounded_grid",
)
)
print(
f"{Style.BRIGHT}* Only sections containing results appear.{Style.RESET_ALL}"
)
print(f"\nDetailed results of {compliance_framework.upper()} are in:")
print(
f" - CSV: {output_directory}/compliance/{output_filename}_{compliance_framework}.csv\n"
)
@@ -0,0 +1,111 @@
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.essential_eight.models import (
EssentialEightAWSModel,
)
from prowler.lib.outputs.finding import Finding
class EssentialEightAWS(ComplianceOutput):
"""
This class represents the AWS ASD Essential Eight compliance output.
Attributes:
- _data (list): A list to store transformed data from findings.
- _file_descriptor (TextIOWrapper): A file descriptor to write data to a file.
Methods:
- transform: Transforms findings into AWS Essential Eight compliance format.
"""
def transform(
self,
findings: list[Finding],
compliance: Compliance,
compliance_name: str,
) -> None:
"""
Transforms a list of findings into AWS Essential Eight compliance format.
Parameters:
- findings (list): A list of findings.
- compliance (Compliance): A compliance model.
- compliance_name (str): The name of the compliance model.
Returns:
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
Region=finding.region,
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_MaturityLevel=attribute.MaturityLevel,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_CloudApplicability=attribute.CloudApplicability,
Requirements_Attributes_MitigatedThreats=", ".join(
attribute.MitigatedThreats
),
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status=finding.status,
StatusExtended=finding.status_extended,
ResourceId=finding.resource_uid,
ResourceName=finding.resource_name,
CheckId=finding.check_id,
Muted=finding.muted,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
# Add manual requirements to the compliance output
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
AccountId="",
Region="",
AssessmentDate=str(timestamp),
Requirements_Id=requirement.Id,
Requirements_Description=requirement.Description,
Requirements_Attributes_Section=attribute.Section,
Requirements_Attributes_MaturityLevel=attribute.MaturityLevel,
Requirements_Attributes_AssessmentStatus=attribute.AssessmentStatus,
Requirements_Attributes_CloudApplicability=attribute.CloudApplicability,
Requirements_Attributes_MitigatedThreats=", ".join(
attribute.MitigatedThreats
),
Requirements_Attributes_Description=attribute.Description,
Requirements_Attributes_RationaleStatement=attribute.RationaleStatement,
Requirements_Attributes_ImpactStatement=attribute.ImpactStatement,
Requirements_Attributes_RemediationProcedure=attribute.RemediationProcedure,
Requirements_Attributes_AuditProcedure=attribute.AuditProcedure,
Requirements_Attributes_AdditionalInformation=attribute.AdditionalInformation,
Requirements_Attributes_References=attribute.References,
Status="MANUAL",
StatusExtended="Manual check",
ResourceId="manual_check",
ResourceName="Manual check",
CheckId="manual",
Muted=False,
Framework=compliance.Framework,
Name=compliance.Name,
)
self._data.append(compliance_row)
@@ -0,0 +1,35 @@
from pydantic.v1 import BaseModel
class EssentialEightAWSModel(BaseModel):
"""
EssentialEightAWSModel generates a finding's output in AWS ASD Essential Eight Compliance format.
"""
Provider: str
Description: str
AccountId: str
Region: str
AssessmentDate: str
Requirements_Id: str
Requirements_Description: str
Requirements_Attributes_Section: str
Requirements_Attributes_MaturityLevel: str
Requirements_Attributes_AssessmentStatus: str
Requirements_Attributes_CloudApplicability: str
Requirements_Attributes_MitigatedThreats: str
Requirements_Attributes_Description: str
Requirements_Attributes_RationaleStatement: str
Requirements_Attributes_ImpactStatement: str
Requirements_Attributes_RemediationProcedure: str
Requirements_Attributes_AuditProcedure: str
Requirements_Attributes_AdditionalInformation: str
Requirements_Attributes_References: str
Status: str
StatusExtended: str
ResourceId: str
ResourceName: str
CheckId: str
Muted: bool
Framework: str
Name: str
@@ -0,0 +1,128 @@
from io import StringIO
from unittest import mock
from freezegun import freeze_time
from mock import patch
from prowler.lib.outputs.compliance.essential_eight.essential_eight_aws import (
EssentialEightAWS,
)
from prowler.lib.outputs.compliance.essential_eight.models import (
EssentialEightAWSModel,
)
from tests.lib.outputs.compliance.fixtures import ESSENTIAL_EIGHT_AWS
from tests.lib.outputs.fixtures.fixtures import generate_finding_output
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
# The fixture's first Requirement maps clause "E8-1.8" (Patch applications,
# clause 8: removal of unsupported online services). The second Requirement is
# E8-6.1 (Restrict Office macros, clause 1) which has no Checks and is therefore
# emitted as a manual row.
COMPLIANCE_NAME = "Essential-Eight-Nov 2023"
class TestEssentialEightAWS:
def test_output_transform(self):
findings = [generate_finding_output(compliance={COMPLIANCE_NAME: "E8-1.8"})]
output = EssentialEightAWS(findings, ESSENTIAL_EIGHT_AWS)
output_data = output.data[0]
assert isinstance(output_data, EssentialEightAWSModel)
assert output_data.Provider == "aws"
assert output_data.Framework == ESSENTIAL_EIGHT_AWS.Framework
assert output_data.Name == ESSENTIAL_EIGHT_AWS.Name
assert output_data.Description == ESSENTIAL_EIGHT_AWS.Description
assert output_data.AccountId == AWS_ACCOUNT_NUMBER
assert output_data.Region == AWS_REGION_EU_WEST_1
assert output_data.Requirements_Id == "E8-1.8"
assert (
output_data.Requirements_Description
== ESSENTIAL_EIGHT_AWS.Requirements[0].Description
)
assert output_data.Requirements_Attributes_Section == "1 Patch applications"
assert output_data.Requirements_Attributes_MaturityLevel == "ML1"
assert output_data.Requirements_Attributes_AssessmentStatus == "Automated"
assert output_data.Requirements_Attributes_CloudApplicability == "full"
assert (
output_data.Requirements_Attributes_MitigatedThreats
== "Use of unsupported software, Long-tail vulnerability accumulation"
)
assert (
output_data.Requirements_Attributes_Description
== ESSENTIAL_EIGHT_AWS.Requirements[0].Attributes[0].Description
)
assert output_data.Status == "PASS"
assert output_data.StatusExtended == ""
assert output_data.ResourceId == ""
assert output_data.ResourceName == ""
assert output_data.CheckId == "service_test_check_id"
assert not output_data.Muted
def test_manual_requirement(self):
findings = [generate_finding_output(compliance={COMPLIANCE_NAME: "E8-1.8"})]
output = EssentialEightAWS(findings, ESSENTIAL_EIGHT_AWS)
# E8-6.1 (macros) has no Checks -> emitted as a manual row, non-applicable
manual_rows = [row for row in output.data if row.Status == "MANUAL"]
assert len(manual_rows) == 1
manual = manual_rows[0]
assert manual.Provider == "aws"
assert manual.AccountId == ""
assert manual.Region == ""
assert manual.Requirements_Id == "E8-6.1"
assert (
manual.Requirements_Attributes_Section
== "6 Restrict Microsoft Office macros"
)
assert manual.Requirements_Attributes_MaturityLevel == "ML1"
assert manual.Requirements_Attributes_AssessmentStatus == "Manual"
assert manual.Requirements_Attributes_CloudApplicability == "non-applicable"
assert (
manual.Requirements_Attributes_MitigatedThreats
== "Macro-based malware delivery"
)
assert manual.StatusExtended == "Manual check"
assert manual.ResourceId == "manual_check"
assert manual.ResourceName == "Manual check"
assert manual.CheckId == "manual"
assert not manual.Muted
@freeze_time("2025-01-01 00:00:00")
@mock.patch(
"prowler.lib.outputs.compliance.essential_eight.essential_eight_aws.timestamp",
"2025-01-01 00:00:00",
)
def test_batch_write_data_to_file(self):
mock_file = StringIO()
findings = [generate_finding_output(compliance={COMPLIANCE_NAME: "E8-1.8"})]
output = EssentialEightAWS(findings, ESSENTIAL_EIGHT_AWS)
output._file_descriptor = mock_file
with patch.object(mock_file, "close", return_value=None):
output.batch_write_data_to_file()
mock_file.seek(0)
content = mock_file.read()
# Validate header carries the E8-specific column names
first_line = content.split("\r\n", 1)[0]
for column in (
"REQUIREMENTS_ATTRIBUTES_MATURITYLEVEL",
"REQUIREMENTS_ATTRIBUTES_ASSESSMENTSTATUS",
"REQUIREMENTS_ATTRIBUTES_CLOUDAPPLICABILITY",
"REQUIREMENTS_ATTRIBUTES_MITIGATEDTHREATS",
"REQUIREMENTS_ATTRIBUTES_RATIONALESTATEMENT",
"REQUIREMENTS_ATTRIBUTES_REMEDIATIONPROCEDURE",
"REQUIREMENTS_ATTRIBUTES_AUDITPROCEDURE",
):
assert column in first_line, f"missing column {column} in CSV header"
# rows: header + matched + manual
rows = [r for r in content.split("\r\n") if r]
assert len(rows) == 3
assert rows[1].split(";")[0] == "aws"
assert "ML1" in rows[1]
assert ";PASS;" in rows[1]
assert ";MANUAL;" in rows[2]
assert ";manual_check;" in rows[2]
+56
View File
@@ -7,6 +7,7 @@ from prowler.lib.check.compliance_models import (
ENS_Requirement_Attribute,
ENS_Requirement_Attribute_Nivel,
ENS_Requirement_Attribute_Tipos,
EssentialEight_Requirement_Attribute,
Generic_Compliance_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
KISA_ISMSP_Requirement_Attribute,
@@ -1189,3 +1190,58 @@ CCC_GCP_FIXTURE = Compliance(
),
],
)
ESSENTIAL_EIGHT_AWS = Compliance(
Framework="Essential-Eight",
Name="ASD Essential Eight Maturity Model - Maturity Level One (AWS)",
Version="Nov 2023",
Provider="AWS",
Description="Literal mapping of the Australian Signals Directorate (ASD) Essential Eight Maturity Model ML1 to AWS infrastructure checks.",
Requirements=[
Compliance_Requirement(
Id="E8-1.8",
Description="Online services that are no longer supported by vendors are removed.",
Attributes=[
EssentialEight_Requirement_Attribute(
Section="1 Patch applications",
MaturityLevel="ML1",
AssessmentStatus="Automated",
CloudApplicability="full",
MitigatedThreats=[
"Use of unsupported software",
"Long-tail vulnerability accumulation",
],
Description="Detect and remove unsupported AWS-hosted online services (Lambda runtimes, RDS engines, EKS, Fargate, Kafka, OpenSearch).",
RationaleStatement="Unsupported services no longer receive security patches.",
ImpactStatement="",
RemediationProcedure="Migrate Lambda off deprecated runtimes; remove RDS Extended Support; upgrade EKS.",
AuditProcedure="Run all listed checks.",
AdditionalInformation="ASD Essential Eight ML1 - Patch applications - clause 8.",
References="https://www.cyber.gov.au/resources-business-and-government/essential-cyber-security/essential-eight/essential-eight-maturity-model",
)
],
Checks=["service_test_check_id"],
),
Compliance_Requirement(
Id="E8-6.1",
Description="Microsoft Office macros are disabled for users that do not have a demonstrated business requirement.",
Attributes=[
EssentialEight_Requirement_Attribute(
Section="6 Restrict Microsoft Office macros",
MaturityLevel="ML1",
AssessmentStatus="Manual",
CloudApplicability="non-applicable",
MitigatedThreats=["Macro-based malware delivery"],
Description="Endpoint / Microsoft 365 control. Out of AWS infrastructure scope.",
RationaleStatement="Most users never need Office macros.",
ImpactStatement="",
RemediationProcedure="Disable macros via Group Policy / Intune / M365 admin policies.",
AuditProcedure="Manual review of M365 macro policy.",
AdditionalInformation="ASD Essential Eight ML1 - Restrict Microsoft Office macros - clause 1. Out of AWS infrastructure scope.",
References="https://www.cyber.gov.au/resources-business-and-government/essential-cyber-security/essential-eight/essential-eight-maturity-model",
)
],
Checks=[],
),
],
)