feat(ec2_checks): add several checks for ec2 (#1268)

* feat(checks): add extra718

* feat(checks): add extra763

* feat(checks): add extra748, extra749, extra72

* feat(checks): add extra750

* feat(checks): add check45

* feat(checks): add check46, check45, check42, check41

* feat(metadata_sample): add sample of check metadata

* feat(pci-group): add pci group.

* feat(cloud9): environment setup.

* fix(protocol): add protocol conditions

Co-authored-by: sergargar <sergio@verica.io>
This commit is contained in:
Sergio Garcia
2022-07-26 18:21:40 -04:00
committed by GitHub
parent da76f69e51
commit 66d2b7b4d9
36 changed files with 1095 additions and 91 deletions

View File

@@ -0,0 +1,51 @@
{
"Categories": [
"cat1",
"cat2"
],
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22",
"CheckTitle": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to SSH port 22.",
"CheckType": "Data Protection",
"Compliance": [
{
"Control": [
"4.1"
],
"Framework": "CIS-AWS",
"Group": [
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [
"othercheck1",
"othercheck2"
],
"Description": "Extended Description",
"Notes": "additional information",
"Provider": "aws",
"RelatedTo": [
"othercheck3",
"othercheck4"
],
"RelatedUrl": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html",
"Remediation": {
"Code": {
"CLI": "cli command or URL to the cli command location.",
"NativeIaC": "code or URL to the code location.",
"Other": "cli command or URL to the cli command location.",
"Terraform": "code or URL to the code location."
},
"Recommendation": {
"Text": "Use a Zero Trust approach. Narrow ingress traffic as much as possible. Consider north-south as well as east-west traffic.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/vpc-security-best-practices.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"ServiceName": "ec2",
"Severity": "low",
"SubServiceName": "securitygroup"
}

View File

@@ -2,17 +2,39 @@
"aws": {
"gdpr": {
"checks": [
"check11",
"check12"
"s3_bucket_server_access_logging_enabled",
"s3_bucket_object_versioning",
"iam_avoid_root_usage",
"iam_user_mfa_enabled_console_access",
"iam_disable_90_days_credentials",
"iam_rotate_access_key_90_days",
"iam_root_mfa_enabled",
"iam_root_hardware_mfa_enabled",
"iam_no_root_access_key",
"iam_administrator_access_with_mfa",
"ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389",
"ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22",
"ec2_ebs_snapshots_encrypted",
"ec2_ebs_public_snapshot"
],
"description": "GDPR Readiness"
},
"iam": {
"pci": {
"checks": [
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials"
"iam_avoid_root_usage",
"iam_user_mfa_enabled_console_access",
"iam_disable_90_days_credentials",
"iam_rotate_access_key_90_days",
"iam_root_mfa_enabled",
"iam_root_hardware_mfa_enabled",
"iam_no_root_access_key",
"ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22",
"ec2_securitygroup_allow_ingress_from_internet_to_any_port",
"ec2_ebs_snapshots_encrypted",
"ec2_ebs_public_snapshot",
"s3_bucket_server_access_logging_enabled"
],
"description": "Identity and Access Management"
"description": "PCI-DSS v3.2.1 Readiness"
}
}
}

View File

@@ -161,19 +161,20 @@ class Test_Check:
},
"expected": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
},
{
"input": {
"excluded_group_list": {"iam"},
"excluded_group_list": {"pci"},
"provider": "aws",
"checks_to_run": {
"iam_disable_30_days_credentials",
"iam_disable_90_days_credentials",
},
},
"expected": set(),
"expected": {
"iam_disable_30_days_credentials",
},
},
]
for test in test_cases:

View File

@@ -1,5 +1,6 @@
from config.config import groups_file
from lib.check.check import (
load_checks_to_execute_from_groups,
parse_checks_from_file,
parse_groups_from_file,
recover_checks_from_provider,
@@ -59,8 +60,9 @@ def load_checks_to_execute(
# Handle if there are groups passed using -g/--groups
elif group_list:
try:
checks_to_execute = parse_groups_from_file(
groups_file, group_list, provider
available_groups = parse_groups_from_file(groups_file)
checks_to_execute = load_checks_to_execute_from_groups(
available_groups, group_list, provider
)
except Exception as e:
logger.error(f"{e.__class__.__name__} -- {e}")

View File

@@ -203,10 +203,9 @@ class Check_Report:
region: str
status_extended: str
check_metadata: dict
status_extended: str
resource_id: str
resource_details: str
resource_tags: str
resource_tags: list
resource_arn: str
def __init__(self, metadata):

View File

@@ -0,0 +1,37 @@
{
"Categories": [],
"CheckAlias": "extra72",
"CheckID": "ec2_ebs_public_snapshot",
"CheckName": "ec2_ebs_public_snapshot",
"CheckTitle": "Ensure there are no EBS Snapshots set as Public.",
"CheckType": "Data Protection",
"Compliance": [],
"DependsOn": [],
"Description": "Ensure there are no EBS Snapshots set as Public.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure the snapshot should be shared.",
"Url": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-modifying-snapshot-permissions.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2Snapshot",
"Risk": "When you share a snapshot, you are giving others access to all of the data on the snapshot. Share snapshots only with people with whom you want to share all of your snapshot data.",
"ServiceName": "ec2",
"Severity": "critical",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,35 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_ebs_public_snapshot(Check):
def execute(self):
findings = []
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.snapshots:
for snapshot in regional_client.snapshots:
report = Check_Report(self.metadata)
report.region = region
if not snapshot.public:
report.status = "PASS"
report.status_extended = (
f"EBS Snapshot {snapshot.id} is not Public"
)
report.resource_id = snapshot.id
else:
report.status = "FAIL"
report.status_extended = (
f"EBS Snapshot {snapshot.id} is currently Public"
)
report.resource_id = snapshot.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 EBS snapshots"
report.region = region
findings.append(report)
return findings

View File

@@ -1,67 +1,35 @@
{
"Categories": [
"cat1",
"cat2"
],
"Categories": [],
"CheckAlias": "extra740",
"CheckID": "ec2_ebs_snapshots_encrypted",
"CheckName": "ec2_ebs_snapshots_encrypted",
"CheckTitle": "Check if EBS snapshots are encrypted",
"CheckTitle": "Check if EBS snapshots are encrypted.",
"CheckType": "Data Protection",
"Compliance": [
{
"Control": [
"4.4"
],
"Framework": "CIS-AWS",
"Group": [
"level1",
"level2"
],
"Version": "1.4"
},
{
"Control": [
"4.4"
],
"Framework": "PCI-DSS",
"Group": [
"level1",
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [
"othercheck1",
"othercheck2"
],
"Description": "If Security groups are not properly configured the attack surface is increased.",
"Notes": "additional information",
"Compliance": [],
"DependsOn": [],
"Description": "Check if EBS snapshots are encrypted.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [
"othercheck3",
"othercheck4"
],
"RelatedUrl": "https://serviceofficialsiteorpageforthissubject",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "cli command or URL to the cli command location.",
"NativeIaC": "code or URL to the code location.",
"Other": "cli command or URL to the cli command location.",
"Terraform": "code or URL to the code location."
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Run sudo yum update and cross your fingers and toes.",
"Url": "https://myfp.com/recommendations/dangerous_things_and_how_to_fix_them.html"
"Text": "Encrypt all EBS Snapshot and Enable Encryption by default. You can configure your AWS account to enforce the encryption of the new EBS volumes and snapshot copies that you create. For example; Amazon EBS encrypts the EBS volumes created when you launch an instance and the snapshots that you copy from an unencrypted snapshot.",
"Url": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/EBSEncryption.html#encryption-by-default"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsIamAccessAnalyzer",
"Risk": "Risk associated.",
"ResourceType": "AwsEc2Snapshot",
"Risk": "Data encryption at rest prevents data visibility in the event of its unauthorized access or theft.",
"ServiceName": "ec2",
"Severity": "low",
"SubServiceName": "accessanalyzer",
"Severity": "medium",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"

View File

@@ -7,28 +7,28 @@ class ec2_ebs_snapshots_encrypted(Check):
findings = []
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if hasattr(regional_client, "snapshots"):
if regional_client.snapshots:
for snapshot in regional_client.snapshots:
report = Check_Report(self.metadata)
report.region = region
if snapshot["Encrypted"]:
report.status = "PASS"
report.status_extended = (
f"EBS Snapshot {snapshot['SnapshotId']} is encrypted"
)
report.resource_id = snapshot["SnapshotId"]
else:
report.status = "FAIL"
report.status_extended = (
f"EBS Snapshot {snapshot['SnapshotId']} is unencrypted"
)
report.resource_id = snapshot["SnapshotId"]
else:
if regional_client.snapshots:
for snapshot in regional_client.snapshots:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 EBS snapshots"
report.region = region
if snapshot.encrypted:
report.status = "PASS"
report.status_extended = (
f"EBS Snapshot {snapshot.id} is encrypted"
)
report.resource_id = snapshot.id
else:
report.status = "FAIL"
report.status_extended = (
f"EBS Snapshot {snapshot.id} is unencrypted"
)
report.resource_id = snapshot.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 EBS snapshots"
report.region = region
findings.append(report)

View File

@@ -0,0 +1,37 @@
{
"Categories": [],
"CheckAlias": "extra710",
"CheckID": "ec2_instance_public_ip",
"CheckName": "ec2_instance_public_ip",
"CheckTitle": "Check for EC2 Instances with Public IP.",
"CheckType": "Infrastructure Security",
"Compliance": [],
"DependsOn": [],
"Description": "Check for EC2 Instances with Public IP.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use an ALB and apply WAF ACL.",
"Url": "https://aws.amazon.com/blogs/aws/aws-web-application-firewall-waf-for-application-load-balancers/"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "Exposing an EC2 directly to internet increases the attack surface and therefore the risk of compromise.",
"ServiceName": "ec2",
"Severity": "medium",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,33 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_instance_public_ip(Check):
def execute(self):
findings = []
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.instances:
for instance in regional_client.instances:
report = Check_Report(self.metadata)
report.region = region
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"EC2 instance {instance.id} has a Public IP: {instance.public_ip} ({instance.public_dns})."
report.resource_id = {instance.id}
else:
report.status = "PASS"
report.status_extended = (
f"EC2 instance {instance.id} has not a Public IP."
)
report.resource_id = {instance.id}
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 instances."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,48 @@
{
"Categories": [],
"CheckAlias": "check45",
"CheckID": "ec2_networkacl_allow_ingress_tcp_port_22",
"CheckName": "ec2_networkacl_allow_ingress_tcp_port_22",
"CheckTitle": "Ensure no Network ACLs allow ingress from 0.0.0.0/0 to SSH port 22",
"CheckType": "Infrastructure Security",
"Compliance": [
{
"Control": [
"4.5"
],
"Framework": "CIS-AWS",
"Group": [
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [],
"Description": "Ensure no Network ACLs allow ingress from 0.0.0.0/0 to SSH port 22",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive network acls. Recommended best practices is to narrow the definition for the minimum ports required.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/vpc-network-acls.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2NetworkAcl",
"Risk": "Even having a perimeter firewall, having network acls open allows any user or malware with vpc access to scan for well known and sensitive ports and gain access to instance.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,45 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_networkacl_allow_ingress_tcp_port_22(Check):
def execute(self):
findings = []
check_port = 22
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.network_acls:
for network_acl in regional_client.network_acls:
public = False
report = Check_Report(self.metadata)
report.region = region
for entry in network_acl.entries:
if (
entry["CidrBlock"] == "0.0.0.0/0"
and entry["RuleAction"] == "allow"
and not entry["Egress"]
and "PortRange" in entry
and entry["Protocol"] == "6" # 6 relates to tcp protocol
):
if (
entry["PortRange"]["From"] == check_port
and entry["PortRange"]["To"] == check_port
):
public = True
report.status = "FAIL"
report.status_extended = f"Network ACL {network_acl.id} has SSH port 22 open to the Internet."
report.resource_id = network_acl.id
if not public:
report.status = "PASS"
report.status_extended = f"Network ACL {network_acl.id} has not SSH port 22 open to the Internet."
report.resource_id = network_acl.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 network acls."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,48 @@
{
"Categories": [],
"CheckAlias": "check45",
"CheckID": "ec2_networkacl_allow_ingress_tcp_port_3389",
"CheckName": "ec2_networkacl_allow_ingress_tcp_port_3389",
"CheckTitle": "Ensure no Network ACLs allow ingress from 0.0.0.0/0 to Microsoft RDP port 3389",
"CheckType": "Infrastructure Security",
"Compliance": [
{
"Control": [
"4.6"
],
"Framework": "CIS-AWS",
"Group": [
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [],
"Description": "Ensure no Network ACLs allow ingress from 0.0.0.0/0 to Microsoft RDP port 3389",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply Zero Trust approach. Implement a process to scan and remediate unrestricted or overly permissive network acls. Recommended best practices is to narrow the definition for the minimum ports required.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/vpc-network-acls.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2NetworkAcl",
"Risk": "Even having a perimeter firewall, having network acls open allows any user or malware with vpc access to scan for well known and sensitive ports and gain access to instance.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,45 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_networkacl_allow_ingress_tcp_port_3389(Check):
def execute(self):
findings = []
check_port = 3389
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.network_acls:
for network_acl in regional_client.network_acls:
public = False
report = Check_Report(self.metadata)
report.region = region
for entry in network_acl.entries:
if (
entry["CidrBlock"] == "0.0.0.0/0"
and entry["RuleAction"] == "allow"
and not entry["Egress"]
and "PortRange" in entry
and entry["Protocol"] == "6" # 6 relates to tcp protocol
):
if (
entry["PortRange"]["From"] == check_port
and entry["PortRange"]["To"] == check_port
):
public = True
report.status = "FAIL"
report.status_extended = f"Network ACL {network_acl.id} has Microsoft RDP port 3389 open to the Internet."
report.resource_id = network_acl.id
if not public:
report.status = "PASS"
report.status_extended = f"Network ACL {network_acl.id} has not Microsoft RDP port 3389 open to the Internet."
report.resource_id = network_acl.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 network acls."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Categories": [],
"CheckAlias": "extra748",
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_any_port",
"CheckName": "ec2_securitygroup_allow_ingress_from_internet_to_any_port",
"CheckTitle": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to any port.",
"CheckType": "Infrastructure Security",
"Compliance": [],
"DependsOn": [],
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to any port.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use a Zero Trust approach. Narrow ingress traffic as much as possible. Consider north-south as well as east-west traffic.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,38 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_any_port(Check):
def execute(self):
findings = []
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.security_groups:
for security_group in regional_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = region
for ingress_rule in security_group.ingress_rules:
# Check if the security group is open to the internet to all protocols
if (
"0.0.0.0/0" in str(ingress_rule["IpRanges"])
or "::/0" in str(ingress_rule["Ipv6Ranges"])
) and ingress_rule["IpProtocol"] == "-1":
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has all ports open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not all ports open to the Internet."
report.resource_id = security_group.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 security groups."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,48 @@
{
"Categories": [],
"CheckAlias": "extra750",
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22",
"CheckName": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22",
"CheckTitle": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to SSH port 22.",
"CheckType": "Infrastructure Security",
"Compliance": [
{
"Control": [
"4.1"
],
"Framework": "CIS-AWS",
"Group": [
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [],
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to SSH port 22.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use a Zero Trust approach. Narrow ingress traffic as much as possible. Consider north-south as well as east-west traffic.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,45 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_22(Check):
def execute(self):
findings = []
check_port = 22
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.security_groups:
for security_group in regional_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = region
for ingress_rule in security_group.ingress_rules:
if (
(
"0.0.0.0/0" in str(ingress_rule["IpRanges"])
or "::/0" in str(ingress_rule["Ipv6Ranges"])
)
and (
ingress_rule["FromPort"] == check_port
and ingress_rule["ToPort"] == check_port
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the SSH port 22 open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not SSH port 22 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 security groups."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,48 @@
{
"Categories": [],
"CheckAlias": "extra750",
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389",
"CheckName": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389",
"CheckTitle": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to port 3389.",
"CheckType": "Infrastructure Security",
"Compliance": [
{
"Control": [
"4.2"
],
"Framework": "CIS-AWS",
"Group": [
"level2"
],
"Version": "1.4"
}
],
"DependsOn": [],
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to port 3389.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use a Zero Trust approach. Narrow ingress traffic as much as possible. Consider north-south as well as east-west traffic.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,45 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_3389(Check):
def execute(self):
findings = []
check_port = 3389
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.security_groups:
for security_group in regional_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = region
for ingress_rule in security_group.ingress_rules:
if (
(
"0.0.0.0/0" in str(ingress_rule["IpRanges"])
or "::/0" in str(ingress_rule["Ipv6Ranges"])
)
and (
ingress_rule["FromPort"] == check_port
and ingress_rule["ToPort"] == check_port
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Microsoft RDP port 3389 open to the Internet."
report.resource_id = security_group.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 security groups."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Categories": [],
"CheckAlias": "extra750",
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306",
"CheckName": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306",
"CheckTitle": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to MySQL port 3306.",
"CheckType": "Infrastructure Security",
"Compliance": [],
"DependsOn": [],
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to MySQL port 3306.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use a Zero Trust approach. Narrow ingress traffic as much as possible. Consider north-south as well as east-west traffic.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,45 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_mysql_3306(Check):
def execute(self):
findings = []
check_port = 3306
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.security_groups:
for security_group in regional_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = region
for ingress_rule in security_group.ingress_rules:
if (
(
"0.0.0.0/0" in str(ingress_rule["IpRanges"])
or "::/0" in str(ingress_rule["Ipv6Ranges"])
)
and (
ingress_rule["FromPort"] == check_port
and ingress_rule["ToPort"] == check_port
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has the MySQL port open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not MySQL ports open to the Internet."
report.resource_id = security_group.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 security groups."
report.region = region
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Categories": [],
"CheckAlias": "extra749",
"CheckID": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483",
"CheckName": "ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483",
"CheckTitle": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to Oracle ports 1521 or 2483.",
"CheckType": "Infrastructure Security",
"Compliance": [],
"DependsOn": [],
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 or ::/0 to Oracle ports 1521 or 2483.",
"Notes": "",
"Provider": "aws",
"RelatedTo": [],
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use a Zero Trust approach. Narrow ingress traffic as much as possible. Consider north-south as well as east-west traffic.",
"Url": "https://docs.aws.amazon.com/vpc/latest/userguide/VPC_SecurityGroups.html"
}
},
"ResourceIdTemplate": "arn:partition:service:region:account-id:resource-id",
"ResourceType": "AwsEc2SecurityGroup",
"Risk": "If Security groups are not properly configured the attack surface is increased.",
"ServiceName": "ec2",
"Severity": "high",
"SubServiceName": "",
"Tags": {
"Tag1Key": "value",
"Tag2Key": "value"
}
}

View File

@@ -0,0 +1,52 @@
from lib.check.models import Check, Check_Report
from providers.aws.services.ec2.ec2_service import ec2_client
class ec2_securitygroup_allow_ingress_from_internet_to_tcp_port_oracle_1521_2483(Check):
def execute(self):
findings = []
check_port_1 = 1521
check_port_2 = 2483
for regional_client in ec2_client.regional_clients:
region = regional_client.region
if regional_client.security_groups:
for security_group in regional_client.security_groups:
public = False
report = Check_Report(self.metadata)
report.region = region
for ingress_rule in security_group.ingress_rules:
if (
(
"0.0.0.0/0" in str(ingress_rule["IpRanges"])
or "::/0" in str(ingress_rule["Ipv6Ranges"])
)
and (
(
ingress_rule["FromPort"] == check_port_1
and ingress_rule["ToPort"] == check_port_1
)
or (
ingress_rule["FromPort"] == check_port_2
and ingress_rule["ToPort"] == check_port_2
)
)
and ingress_rule["IpProtocol"] == "tcp"
):
public = True
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has Oracle ports open to the Internet."
report.resource_id = security_group.id
if not public:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) has not Oracle ports open to the Internet."
report.resource_id = security_group.id
findings.append(report)
else:
report = Check_Report(self.metadata)
report.status = "PASS"
report.status_extended = "There are no EC2 security groups."
report.region = region
findings.append(report)
return findings

View File

@@ -1,4 +1,5 @@
import threading
from dataclasses import dataclass
from lib.logger import logger
from providers.aws.aws_provider import current_audit_info, generate_regional_clients
@@ -11,7 +12,11 @@ class EC2:
self.session = audit_info.audit_session
self.audited_account = audit_info.audited_account
self.regional_clients = generate_regional_clients(self.service, audit_info)
self.__threading_call__(self.__describe_instances__)
self.__threading_call__(self.__describe_security_groups__)
self.__threading_call__(self.__describe_network_acls__)
self.__threading_call__(self.__describe_snapshots__)
self.__threading_call__(self.__get_snapshot_public__)
def __get_session__(self):
return self.session
@@ -19,34 +24,211 @@ class EC2:
def __threading_call__(self, call):
threads = []
for regional_client in self.regional_clients:
threads.append(
threading.Thread(
target=call, args=(regional_client, self.audited_account)
)
)
threads.append(threading.Thread(target=call, args=(regional_client,)))
for t in threads:
t.start()
for t in threads:
t.join()
def __describe_snapshots__(self, regional_client, audited_account):
def __describe_instances__(self, regional_client):
logger.info("EC2 - Describing EC2 Instances...")
try:
describe_instances_paginator = regional_client.get_paginator(
"describe_instances"
)
instances = []
for page in describe_instances_paginator.paginate():
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
if (
"PublicDnsName" in instance
and "PublicIpAddress" in instance
):
instances.append(
Instance(
instance["InstanceId"],
instance["InstanceType"],
instance["ImageId"],
instance["LaunchTime"],
instance["PrivateDnsName"],
instance["PrivateIpAddress"],
instance["PublicDnsName"],
instance["PublicIpAddress"],
)
)
else:
instances.append(
Instance(
instance["InstanceId"],
instance["InstanceType"],
instance["ImageId"],
instance["LaunchTime"],
instance["PrivateDnsName"],
instance["PrivateIpAddress"],
None,
None,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
regional_client.instances = []
else:
regional_client.instances = instances
def __describe_security_groups__(self, regional_client):
logger.info("EC2 - Describing Security Groups...")
try:
describe_security_groups_paginator = regional_client.get_paginator(
"describe_security_groups"
)
security_groups = []
for page in describe_security_groups_paginator.paginate():
for sg in page["SecurityGroups"]:
security_groups.append(
SecurityGroup(
sg["GroupName"],
sg["GroupId"],
sg["IpPermissions"],
sg["IpPermissionsEgress"],
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
regional_client.security_groups = []
else:
regional_client.security_groups = security_groups
def __describe_network_acls__(self, regional_client):
logger.info("EC2 - Describing Security Groups...")
try:
describe_network_acls_paginator = regional_client.get_paginator(
"describe_network_acls"
)
network_acls = []
for page in describe_network_acls_paginator.paginate():
for nacl in page["NetworkAcls"]:
network_acls.append(
NetworkACL(nacl["NetworkAclId"], nacl["Entries"])
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
regional_client.network_acls = []
else:
regional_client.network_acls = network_acls
def __describe_snapshots__(self, regional_client):
logger.info("EC2 - Describing Snapshots...")
try:
describe_snapshots_paginator = regional_client.get_paginator(
"describe_snapshots"
)
snapshots = []
encrypted = False
for page in describe_snapshots_paginator.paginate(
OwnerIds=[audited_account]
OwnerIds=[self.audited_account]
):
for snapshot in page["Snapshots"]:
snapshots.append(snapshot)
if snapshot["Encrypted"]:
encrypted = True
snapshots.append(Snapshot(snapshot["SnapshotId"], encrypted))
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
regional_client.snapshots = []
else:
regional_client.snapshots = snapshots
def __get_snapshot_public__(self, regional_client):
logger.info("EC2 - Get snapshots encryption...")
try:
if hasattr(regional_client, "snapshots"):
for snapshot in regional_client.snapshots:
snapshot_public = regional_client.describe_snapshot_attribute(
Attribute="createVolumePermission", SnapshotId=snapshot.id
)
for permission in snapshot_public["CreateVolumePermissions"]:
if "Group" in permission:
if permission["Group"] == "all":
snapshot.public = True
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}: {error}"
)
@dataclass
class Instance:
id: str
type: str
image_id: str
launch_time: str
private_dns: str
private_ip: str
public_dns: str
public_ip: str
def __init__(
self,
id,
type,
image_id,
launch_time,
private_dns,
private_ip,
public_dns,
public_ip,
):
self.id = id
self.type = type
self.image_id = image_id
self.launch_time = launch_time
self.private_dns = private_dns
self.private_ip = private_ip
self.public_dns = public_dns
self.public_ip = public_ip
@dataclass
class Snapshot:
id: str
encrypted: bool
public: bool
def __init__(self, id, encrypted):
self.id = id
self.encrypted = encrypted
self.public = False
@dataclass
class SecurityGroup:
name: str
id: str
ingress_rules: list[dict]
egress_rules: list[dict]
def __init__(self, name, id, ingress_rules, egress_rules):
self.name = name
self.id = id
self.ingress_rules = ingress_rules
self.egress_rules = egress_rules
@dataclass
class NetworkACL:
id: str
entries: list[dict]
def __init__(self, id, entries):
self.id = id
self.entries = entries
ec2_client = EC2(current_audit_info)

View File

@@ -0,0 +1,19 @@
#!/bin/bash
# Upgrade AWS CLI to v2
sudo yum update -y
sudo yum remove -y awscli
cd /opt || exit
sudo curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
sudo unzip awscliv2.zip
sudo ./aws/install
# shellcheck disable=SC1090
. ~/.profile # to load the new path for AWS CLI v2
sudo rm -fr /opt/aws/
cd ~/environment/ || exit
# Prepare Prowler 3.0
git clone https://github.com/prowler-cloud/prowler
cd prowler || exit
git checkout prowler-3.0-dev
sudo pip3 install pipenv detect-secrets==1.0.3
pipenv install && pipenv shell