chore(ec2): change all ec2 fixers to new structure

This commit is contained in:
Daniel Barranquero
2025-06-12 11:40:00 +02:00
parent 07c5ae547f
commit 4fcb6b34e7
38 changed files with 1638 additions and 1282 deletions
@@ -1,35 +1,60 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
def fixer(region):
class Ec2InstanceAccountImdsv2EnabledFixer(AWSFixer):
"""
Enable IMDSv2 for EC2 instances in the specified region.
Requires the ec2:ModifyInstanceMetadataDefaults permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:ModifyInstanceMetadataDefaults",
"Resource": "*"
}
]
}
Args:
region (str): AWS region
Returns:
bool: True if IMDSv2 is enabled, False otherwise
Fixer to enable IMDSv2 for EC2 instances in a region.
"""
try:
regional_client = ec2_client.regional_clients[region]
return regional_client.modify_instance_metadata_defaults(HttpTokens="required")[
"Return"
]
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Enable IMDSv2 for EC2 instances in a region.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:ModifyInstanceMetadataDefaults",
"Resource": "*",
}
],
},
)
return False
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Enable IMDSv2 for EC2 instances in a region.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: region (if finding is not provided)
Returns:
bool: True if IMDSv2 is enabled, False otherwise
"""
try:
if finding:
region = finding.region
else:
region = kwargs.get("region")
if not region:
raise ValueError("Region is required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
return regional_client.modify_instance_metadata_defaults(
HttpTokens="required"
)["Return"]
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortCassandraExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Cassandra ports (7000, 7001, 7199, 9042, 9160) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Cassandra ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Cassandra ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [7000, 7001, 7199, 9042, 9160]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Cassandra ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Cassandra ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [7000, 7001, 7199, 9042, 9160]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortCifsExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing CIFS ports (139, 445) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies CIFS ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing CIFS ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [139, 445]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing CIFS ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing CIFS ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [139, 445]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortElasticsearchKibanaExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Elasticsearch and Kibana ports (9200, 9300, 5601)
from any address (0.0.0.0/0) for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies those ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Elasticsearch and Kibana ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [9200, 9300, 5601]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Elasticsearch and Kibana ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Elasticsearch and Kibana ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [9200, 9300, 5601]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortFtpExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing FTP ports (20, 21) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies FTP ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing FTP ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [20, 21]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing FTP ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing FTP ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [20, 21]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortKafkaExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Kafka ports (9092) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Kafka ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Kafka ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [9092]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Kafka ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Kafka ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [9092]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortKerberosExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Kerberos ports (88, 464, 749, 750) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Kerberos ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Kerberos ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [88, 464, 749, 750]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Kerberos ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Kerberos ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [88, 464, 749, 750]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortLdapExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing LDAP ports (389, 636) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies LDAP ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing LDAP ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [389, 636]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing LDAP ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing LDAP ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [389, 636]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortMemcachedExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Memcached ports (11211) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Memcached ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Memcached ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [11211]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Memcached ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Memcached ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [11211]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortMongodbExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing MongoDB ports (27017, 27018) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies MongoDB ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing MongoDB ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [27017, 27018]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing MongoDB ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing MongoDB ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [27017, 27018]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortMysqlExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing MySQL ports (3306) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies MySQL ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing MySQL ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [3306]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing MySQL ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing MySQL ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [3306]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortOracleExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Oracle ports (1521, 2483, 2484) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Oracle ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Oracle ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [1521, 2483, 2484]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Oracle ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Oracle ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [1521, 2483, 2484]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortPostgresqlExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing PostgreSQL ports (5432) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies PostgreSQL ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing PostgreSQL ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [5432]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing PostgreSQL ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing PostgreSQL ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [5432]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortRdpExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing RDP ports (3389) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies RDP ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing RDP ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [3389]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing RDP ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing RDP ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [3389]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortRedisExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Redis ports (6379) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Redis ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Redis ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [6379]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Redis ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Redis ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [6379]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortSqlserverExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing SQLServer ports (1433, 1434) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies SQLServer ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing SQLServer ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [1433, 1434]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing SQLServer ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing SQLServer ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [1433, 1434]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortSshExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing SSH ports (22) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies SSH ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing SSH ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [22]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing SSH ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing SSH ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [22]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,51 +1,75 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2InstancePortTelnetExposedToInternetFixer(AWSFixer):
"""
Revokes any ingress rule allowing Telnet ports (23) from any address (0.0.0.0/0)
for the EC2 instance's security groups.
This fixer will only be triggered if the check identifies Telnet ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The EC2 instance ID.
region (str): The AWS region where the EC2 instance exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing Telnet ports from any address for EC2 instances' security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = [23]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing Telnet ports from any address for EC2 instances' security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing Telnet ports from any address for EC2 instances' security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = [23]
for instance in ec2_client.instances:
if instance.id == resource_id:
for sg in ec2_client.security_groups.values():
if sg.id in instance.security_groups:
for ingress_rule in sg.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=sg.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -1,52 +1,76 @@
from typing import Optional
from prowler.lib.check.models import Check_Report_AWS
from prowler.lib.logger import logger
from prowler.providers.aws.lib.fix.fixer import AWSFixer
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.ec2.lib.security_groups import check_security_group
def fixer(resource_id: str, region: str) -> bool:
class Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer(AWSFixer):
"""
Revokes any ingress rule allowing high risk ports (25, 110, 135, 143, 445, 3000, 4333, 5000, 5500, 8080, 8088)
from any address (0.0.0.0/0) for the security groups.
This fixer will only be triggered if the check identifies high risk ports open to the Internet.
Requires the ec2:RevokeSecurityGroupIngress permission.
Permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*"
}
]
}
Args:
resource_id (str): The Security Group ID.
region (str): The AWS region where the Security Group exists.
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
Fixer to revoke ingress rules allowing high risk ports from any address for security groups.
"""
try:
regional_client = ec2_client.regional_clients[region]
check_ports = ec2_client.audit_config.get(
"ec2_high_risk_ports",
[25, 110, 135, 143, 445, 3000, 4333, 5000, 5500, 8080, 8088],
)
for security_group in ec2_client.security_groups.values():
if security_group.id == resource_id:
for ingress_rule in security_group.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=security_group.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
def __init__(self):
super().__init__(
description="Revoke ingress rules allowing high risk ports from any address for security groups.",
cost_impact=False,
cost_description=None,
service="ec2",
iam_policy_required={
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:RevokeSecurityGroupIngress",
"Resource": "*",
}
],
},
)
return False
else:
return True
def fix(self, finding: Optional[Check_Report_AWS] = None, **kwargs) -> bool:
"""
Revoke ingress rules allowing high risk ports from any address for security groups.
Args:
finding (Optional[Check_Report_AWS]): Finding to fix
**kwargs: resource_id, region (if finding is not provided)
Returns:
bool: True if the operation is successful (ingress rule revoked), False otherwise.
"""
try:
if finding:
resource_id = finding.resource_id
region = finding.region
else:
resource_id = kwargs.get("resource_id")
region = kwargs.get("region")
if not resource_id or not region:
raise ValueError("resource_id and region are required")
super().fix(region=region)
regional_client = ec2_client.regional_clients[region]
check_ports = ec2_client.audit_config.get(
"ec2_high_risk_ports",
[25, 110, 135, 143, 445, 3000, 4333, 5000, 5500, 8080, 8088],
)
for security_group in ec2_client.security_groups.values():
if security_group.id == resource_id:
for ingress_rule in security_group.ingress_rules:
if check_security_group(
ingress_rule, "tcp", check_ports, any_address=True
):
regional_client.revoke_security_group_ingress(
GroupId=security_group.id,
IpPermissions=[ingress_rule],
)
except Exception as error:
logger.error(
f"{region if 'region' in locals() else 'unknown'} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
else:
return True
@@ -27,7 +27,7 @@ def mock_modify_instance_metadata_defaults(HttpTokens):
return {"Return": True}
class Test_ec2_instance_account_imdsv2_enabled_fixer:
class TestEc2InstanceAccountImdsv2EnabledFixer:
@mock_aws
def test_ec2_instance_account_imdsv2_enabled_fixer(self):
ec2_service = mock.MagicMock()
@@ -64,8 +64,9 @@ class Test_ec2_instance_account_imdsv2_enabled_fixer:
):
from prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled_fixer import (
fixer,
Ec2InstanceAccountImdsv2EnabledFixer,
)
# By default, the account has not public access blocked
assert fixer(region=AWS_REGION_US_EAST_1)
assert Ec2InstanceAccountImdsv2EnabledFixer().fix(
region=AWS_REGION_US_EAST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_cassandra_exposed_to_internet_fixer:
class TestEc2InstancePortCassandraExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_cassandra_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cassandra_exposed_to_internet.ec2_instance_port_cassandra_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCassandraExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCassandraExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_cassandra_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cassandra_exposed_to_internet.ec2_instance_port_cassandra_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCassandraExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortCassandraExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_cassandra_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cassandra_exposed_to_internet.ec2_instance_port_cassandra_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCassandraExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCassandraExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_cassandra_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cassandra_exposed_to_internet.ec2_instance_port_cassandra_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCassandraExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCassandraExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_all_ports(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_cassandra_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cassandra_exposed_to_internet.ec2_instance_port_cassandra_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCassandraExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCassandraExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
class TestEc2InstancePortCifsExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cifs_exposed_to_internet.ec2_instance_port_cifs_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCifsExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCifsExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cifs_exposed_to_internet.ec2_instance_port_cifs_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCifsExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortCifsExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +231,13 @@ class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cifs_exposed_to_internet.ec2_instance_port_cifs_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCifsExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCifsExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +295,13 @@ class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cifs_exposed_to_internet.ec2_instance_port_cifs_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCifsExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCifsExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_139_port(self):
@@ -355,12 +359,13 @@ class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cifs_exposed_to_internet.ec2_instance_port_cifs_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCifsExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCifsExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_with_public_ip_in_public_subnet_only_445_port(
@@ -440,9 +445,10 @@ class Test_ec2_instance_port_cifs_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_cifs_exposed_to_internet.ec2_instance_port_cifs_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortCifsExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortCifsExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
class TestEc2InstancePortElasticsearchKibanaExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_elasticsearch_kibana_exposed_to_internet.ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortElasticsearchKibanaExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortElasticsearchKibanaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,15 @@ class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_elasticsearch_kibana_exposed_to_internet.ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortElasticsearchKibanaExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert (
not Ec2InstancePortElasticsearchKibanaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +233,13 @@ class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_elasticsearch_kibana_exposed_to_internet.ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortElasticsearchKibanaExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortElasticsearchKibanaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +297,13 @@ class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_elasticsearch_kibana_exposed_to_internet.ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortElasticsearchKibanaExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortElasticsearchKibanaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_9200_9300_port(self):
@@ -355,12 +361,13 @@ class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_elasticsearch_kibana_exposed_to_internet.ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortElasticsearchKibanaExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortElasticsearchKibanaExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_with_public_ip_in_public_subnet_only_5601_port(
@@ -440,9 +447,10 @@ class Test_ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_elasticsearch_kibana_exposed_to_internet.ec2_instance_port_elasticsearch_kibana_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortElasticsearchKibanaExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortElasticsearchKibanaExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_ftp_exposed_to_internet_fixer:
class TestEc2InstancePortFtpExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_ftp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ftp_exposed_to_internet.ec2_instance_port_ftp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortFtpExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortFtpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_ftp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ftp_exposed_to_internet.ec2_instance_port_ftp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortFtpExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortFtpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +231,13 @@ class Test_ec2_instance_port_ftp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ftp_exposed_to_internet.ec2_instance_port_ftp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortFtpExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortFtpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +295,13 @@ class Test_ec2_instance_port_ftp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ftp_exposed_to_internet.ec2_instance_port_ftp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortFtpExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortFtpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_both_ports(self):
@@ -355,9 +359,10 @@ class Test_ec2_instance_port_ftp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ftp_exposed_to_internet.ec2_instance_port_ftp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortFtpExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortFtpExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_kafka_exposed_to_internet_fixer:
class TestEc2InstancePortKafkaExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_kafka_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kafka_exposed_to_internet.ec2_instance_port_kafka_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKafkaExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKafkaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -153,12 +154,13 @@ class Test_ec2_instance_port_kafka_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kafka_exposed_to_internet.ec2_instance_port_kafka_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKafkaExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortKafkaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -210,12 +212,13 @@ class Test_ec2_instance_port_kafka_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kafka_exposed_to_internet.ec2_instance_port_kafka_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKafkaExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKafkaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -267,12 +270,13 @@ class Test_ec2_instance_port_kafka_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kafka_exposed_to_internet.ec2_instance_port_kafka_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKafkaExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKafkaExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_one_port(self):
@@ -330,9 +334,10 @@ class Test_ec2_instance_port_kafka_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kafka_exposed_to_internet.ec2_instance_port_kafka_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKafkaExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKafkaExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_kerberos_exposed_to_internet_fixer:
class TestEc2InstancePortKerberosExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_kerberos_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kerberos_exposed_to_internet.ec2_instance_port_kerberos_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKerberosExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKerberosExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_kerberos_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kerberos_exposed_to_internet.ec2_instance_port_kerberos_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKerberosExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortKerberosExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +231,13 @@ class Test_ec2_instance_port_kerberos_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kerberos_exposed_to_internet.ec2_instance_port_kerberos_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKerberosExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKerberosExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +295,13 @@ class Test_ec2_instance_port_kerberos_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kerberos_exposed_to_internet.ec2_instance_port_kerberos_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKerberosExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKerberosExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_both_ports(self):
@@ -355,9 +359,10 @@ class Test_ec2_instance_port_kerberos_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_kerberos_exposed_to_internet.ec2_instance_port_kerberos_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortKerberosExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortKerberosExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
class TestEc2InstancePortLdapExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -47,8 +47,8 @@ class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
},
{
"IpProtocol": "tcp",
"FromPort": 445,
"ToPort": 445,
"FromPort": 389,
"ToPort": 636,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}, {"CidrIp": "10.0.0.0/24"}],
"Ipv6Ranges": [{"CidrIpv6": "::/0"}, {"CidrIpv6": "2001:db8::/32"}],
},
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ldap_exposed_to_internet.ec2_instance_port_ldap_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortLdapExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortLdapExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ldap_exposed_to_internet.ec2_instance_port_ldap_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortLdapExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortLdapExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +231,13 @@ class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ldap_exposed_to_internet.ec2_instance_port_ldap_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortLdapExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortLdapExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +295,13 @@ class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ldap_exposed_to_internet.ec2_instance_port_ldap_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortLdapExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortLdapExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_bot_ports(self):
@@ -355,9 +359,10 @@ class Test_ec2_instance_port_ldap_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ldap_exposed_to_internet.ec2_instance_port_ldap_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortLdapExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortLdapExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_memcached_exposed_to_internet_fixer:
class TestEc2InstancePortMemcachedExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_memcached_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_memcached_exposed_to_internet.ec2_instance_port_memcached_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMemcachedExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMemcachedExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_memcached_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_memcached_exposed_to_internet.ec2_instance_port_memcached_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMemcachedExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortMemcachedExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_memcached_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_memcached_exposed_to_internet.ec2_instance_port_memcached_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMemcachedExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMemcachedExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_memcached_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_memcached_exposed_to_internet.ec2_instance_port_memcached_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMemcachedExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMemcachedExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_11211_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_memcached_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_memcached_exposed_to_internet.ec2_instance_port_memcached_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMemcachedExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMemcachedExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
class TestEc2InstancePortMongodbExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mongodb_exposed_to_internet.ec2_instance_port_mongodb_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMongodbExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMongodbExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mongodb_exposed_to_internet.ec2_instance_port_mongodb_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMongodbExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortMongodbExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +231,13 @@ class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mongodb_exposed_to_internet.ec2_instance_port_mongodb_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMongodbExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMongodbExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +295,13 @@ class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mongodb_exposed_to_internet.ec2_instance_port_mongodb_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMongodbExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMongodbExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_27017_port(self):
@@ -355,12 +359,13 @@ class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mongodb_exposed_to_internet.ec2_instance_port_mongodb_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMongodbExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMongodbExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_with_public_ip_in_public_subnet_only_27018_port(
@@ -440,9 +445,10 @@ class Test_ec2_instance_port_mongodb_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mongodb_exposed_to_internet.ec2_instance_port_mongodb_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMongodbExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMongodbExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_mysql_exposed_to_internet_fixer:
class TestEc2InstancePortMysqlExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_mysql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mysql_exposed_to_internet.ec2_instance_port_mysql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMysqlExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMysqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_mysql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mysql_exposed_to_internet.ec2_instance_port_mysql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMysqlExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortMysqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_mysql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mysql_exposed_to_internet.ec2_instance_port_mysql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMysqlExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMysqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_mysql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mysql_exposed_to_internet.ec2_instance_port_mysql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMysqlExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMysqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_3306_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_mysql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_mysql_exposed_to_internet.ec2_instance_port_mysql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortMysqlExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortMysqlExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_oracle_exposed_to_internet_fixer:
class TestEc2InstancePortOracleExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -48,7 +48,7 @@ class Test_ec2_instance_port_oracle_exposed_to_internet_fixer:
{
"IpProtocol": "tcp",
"FromPort": 1521,
"ToPort": 1521,
"ToPort": 2484,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}, {"CidrIp": "10.0.0.0/24"}],
"Ipv6Ranges": [{"CidrIpv6": "::/0"}, {"CidrIpv6": "2001:db8::/32"}],
},
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_oracle_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortOracleExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortOracleExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -106,23 +107,10 @@ class Test_ec2_instance_port_oracle_exposed_to_internet_fixer:
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 2483,
"ToPort": 2484,
"IpRanges": [
{"CidrIp": "0.0.0.0/0"},
{"CidrIp": "10.0.0.0/24"},
],
"Ipv6Ranges": [
{"CidrIpv6": "::/0"},
{"CidrIpv6": "2001:db8::/32"},
],
},
{
"IpProtocol": "tcp",
"FromPort": 1521,
"ToPort": 1521,
"ToPort": 2484,
"IpRanges": [
{"CidrIp": "0.0.0.0/0"},
{"CidrIp": "10.0.0.0/24"},
@@ -166,198 +154,10 @@ class Test_ec2_instance_port_oracle_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortOracleExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_EU_WEST_1)
ec2_resource = resource("ec2", region_name=AWS_REGION_EU_WEST_1)
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 1521,
"ToPort": 1521,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}, {"CidrIp": "10.0.0.0/24"}],
},
{
"IpProtocol": "tcp",
"FromPort": 2483,
"ToPort": 2484,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}, {"CidrIp": "10.0.0.0/24"}],
},
],
)
subnet_id = ec2_client.create_subnet(VpcId=vpc_id, CidrBlock="10.0.0.0/16")[
"Subnet"
]["SubnetId"]
instance_id = ec2_resource.create_instances(
ImageId="ami-12345678",
MinCount=1,
MaxCount=1,
InstanceType="t2.micro",
SecurityGroupIds=[default_sg_id],
SubnetId=subnet_id,
TagSpecifications=[
{"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": "test"}]}
],
)[0].id
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer.ec2_client",
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer import (
fixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_EU_WEST_1)
ec2_resource = resource("ec2", region_name=AWS_REGION_EU_WEST_1)
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 1521,
"ToPort": 1521,
"Ipv6Ranges": [{"CidrIpv6": "::/0"}, {"CidrIpv6": "2001:db8::/32"}],
},
{
"IpProtocol": "tcp",
"FromPort": 2483,
"ToPort": 2484,
"Ipv6Ranges": [{"CidrIpv6": "::/0"}, {"CidrIpv6": "2001:db8::/32"}],
},
],
)
subnet_id = ec2_client.create_subnet(VpcId=vpc_id, CidrBlock="10.0.0.0/16")[
"Subnet"
]["SubnetId"]
instance_id = ec2_resource.create_instances(
ImageId="ami-12345678",
MinCount=1,
MaxCount=1,
InstanceType="t2.micro",
SecurityGroupIds=[default_sg_id],
SubnetId=subnet_id,
TagSpecifications=[
{"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": "test"}]}
],
)[0].id
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer.ec2_client",
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer import (
fixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_bots_ports(self):
# Create EC2 Mocked Resources
ec2_client = client("ec2", region_name=AWS_REGION_EU_WEST_1)
ec2_resource = resource("ec2", region_name=AWS_REGION_EU_WEST_1)
vpc_id = ec2_client.create_vpc(CidrBlock="10.0.0.0/16")["Vpc"]["VpcId"]
default_sg = ec2_client.describe_security_groups(GroupNames=["default"])[
"SecurityGroups"
][0]
default_sg_id = default_sg["GroupId"]
ec2_client.authorize_security_group_ingress(
GroupId=default_sg_id,
IpPermissions=[
{
"IpProtocol": "tcp",
"FromPort": 1521,
"ToPort": 2484,
"IpRanges": [{"CidrIp": "0.0.0.0/0"}],
}
],
)
subnet_id = ec2_client.create_subnet(VpcId=vpc_id, CidrBlock="10.0.0.0/16")[
"Subnet"
]["SubnetId"]
instance = ec2_resource.create_instances(
ImageId="ami-12345678",
MinCount=1,
MaxCount=1,
InstanceType="t2.micro",
SecurityGroupIds=[default_sg_id],
NetworkInterfaces=[
{
"DeviceIndex": 0,
"SubnetId": subnet_id,
"AssociatePublicIpAddress": True,
}
],
TagSpecifications=[
{"ResourceType": "instance", "Tags": [{"Key": "Name", "Value": "test"}]}
],
)[0]
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer.ec2_client",
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_oracle_exposed_to_internet.ec2_instance_port_oracle_exposed_to_internet_fixer import (
fixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortOracleExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_postgresql_exposed_to_internet_fixer:
class TestEc2InstancePortPostgresqlExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_postgresql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_postgresql_exposed_to_internet.ec2_instance_port_postgresql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortPostgresqlExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortPostgresqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_postgresql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_postgresql_exposed_to_internet.ec2_instance_port_postgresql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortPostgresqlExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortPostgresqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_postgresql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_postgresql_exposed_to_internet.ec2_instance_port_postgresql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortPostgresqlExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortPostgresqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_postgresql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_postgresql_exposed_to_internet.ec2_instance_port_postgresql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortPostgresqlExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortPostgresqlExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_5432_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_postgresql_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_postgresql_exposed_to_internet.ec2_instance_port_postgresql_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortPostgresqlExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortPostgresqlExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_rdp_exposed_to_internet_fixer:
class TestEc2InstancePortRdpExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_rdp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_rdp_exposed_to_internet.ec2_instance_port_rdp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRdpExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRdpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_rdp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_rdp_exposed_to_internet.ec2_instance_port_rdp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRdpExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortRdpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_rdp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_rdp_exposed_to_internet.ec2_instance_port_rdp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRdpExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRdpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_rdp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_rdp_exposed_to_internet.ec2_instance_port_rdp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRdpExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRdpExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_3389_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_rdp_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_rdp_exposed_to_internet.ec2_instance_port_rdp_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRdpExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRdpExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_redis_exposed_to_internet_fixer:
class TestEc2InstancePortRedisExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_redis_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_redis_exposed_to_internet.ec2_instance_port_redis_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRedisExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRedisExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_redis_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_redis_exposed_to_internet.ec2_instance_port_redis_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRedisExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortRedisExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_redis_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_redis_exposed_to_internet.ec2_instance_port_redis_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRedisExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRedisExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_redis_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_redis_exposed_to_internet.ec2_instance_port_redis_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRedisExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRedisExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_6379_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_redis_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_redis_exposed_to_internet.ec2_instance_port_redis_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortRedisExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortRedisExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_sqlserver_exposed_to_internet_fixer:
class TestEc2InstancePortSqlserverExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_sqlserver_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_sqlserver_exposed_to_internet.ec2_instance_port_sqlserver_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSqlserverExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSqlserverExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_sqlserver_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_sqlserver_exposed_to_internet.ec2_instance_port_sqlserver_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSqlserverExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortSqlserverExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -229,12 +231,13 @@ class Test_ec2_instance_port_sqlserver_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_sqlserver_exposed_to_internet.ec2_instance_port_sqlserver_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSqlserverExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSqlserverExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -292,12 +295,13 @@ class Test_ec2_instance_port_sqlserver_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_sqlserver_exposed_to_internet.ec2_instance_port_sqlserver_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSqlserverExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSqlserverExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_both_ports(self):
@@ -355,9 +359,10 @@ class Test_ec2_instance_port_sqlserver_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_sqlserver_exposed_to_internet.ec2_instance_port_sqlserver_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSqlserverExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSqlserverExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_ssh_exposed_to_internet_fixer:
class TestEc2InstancePortSshExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_ssh_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSshExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSshExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_ssh_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSshExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortSshExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_ssh_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSshExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSshExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_ssh_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSshExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSshExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_22_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_ssh_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_ssh_exposed_to_internet.ec2_instance_port_ssh_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortSshExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortSshExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_instance_port_telnet_exposed_to_internet_fixer:
class TestEc2InstancePortTelnetExposedToInternetFixer:
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -83,12 +83,13 @@ class Test_ec2_instance_port_telnet_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_telnet_exposed_to_internet.ec2_instance_port_telnet_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortTelnetExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortTelnetExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_error(self):
@@ -166,12 +167,13 @@ class Test_ec2_instance_port_telnet_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_telnet_exposed_to_internet.ec2_instance_port_telnet_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortTelnetExposedToInternetFixer,
)
assert not fixer(instance_id, AWS_REGION_EU_WEST_1)
assert not Ec2InstancePortTelnetExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -223,12 +225,13 @@ class Test_ec2_instance_port_telnet_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_telnet_exposed_to_internet.ec2_instance_port_telnet_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortTelnetExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortTelnetExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -280,12 +283,13 @@ class Test_ec2_instance_port_telnet_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_telnet_exposed_to_internet.ec2_instance_port_telnet_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortTelnetExposedToInternetFixer,
)
assert fixer(instance_id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortTelnetExposedToInternetFixer().fix(
resource_id=instance_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_instance_exposed_port_in_public_subnet_only_23_port(self):
@@ -343,9 +347,10 @@ class Test_ec2_instance_port_telnet_exposed_to_internet_fixer:
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_instance_port_telnet_exposed_to_internet.ec2_instance_port_telnet_exposed_to_internet_fixer import (
fixer,
Ec2InstancePortTelnetExposedToInternetFixer,
)
assert fixer(instance.id, AWS_REGION_EU_WEST_1)
assert Ec2InstancePortTelnetExposedToInternetFixer().fix(
resource_id=instance.id, region=AWS_REGION_EU_WEST_1
)
@@ -24,7 +24,7 @@ def mock_make_api_call_error(self, operation_name, kwarg):
return mock_make_api_call(self, operation_name, kwarg)
class Test_ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_fixer:
class TestEc2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer:
@mock_aws
def test_ec2_sg_exposed_port_in_private_subnet_with_ip4_and_ip6(self):
# Create EC2 Mocked Resources
@@ -76,12 +76,15 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_fixer import (
fixer,
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer,
)
assert fixer(default_sg_id, AWS_REGION_EU_WEST_1)
assert (
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer().fix(
resource_id=default_sg_id, region=AWS_REGION_EU_WEST_1
)
)
@mock_aws
def test_ec2_sg_exposed_port_error(self):
@@ -143,12 +146,13 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_fixer import (
fixer,
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer,
)
assert not fixer(default_sg_id, AWS_REGION_EU_WEST_1)
assert not Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer().fix(
resource_id=default_sg_id, region=AWS_REGION_EU_WEST_1
)
@mock_aws
def test_ec2_sg_exposed_port_in_private_subnet_only_with_ip4(self):
@@ -200,12 +204,15 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_fixer import (
fixer,
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer,
)
assert fixer(default_sg_id, AWS_REGION_EU_WEST_1)
assert (
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer().fix(
resource_id=default_sg_id, region=AWS_REGION_EU_WEST_1
)
)
@mock_aws
def test_ec2_sg_exposed_port_in_private_subnet_only_with_ip6(self):
@@ -257,12 +264,15 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_fixer import (
fixer,
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer,
)
assert fixer(default_sg_id, AWS_REGION_EU_WEST_1)
assert (
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer().fix(
resource_id=default_sg_id, region=AWS_REGION_EU_WEST_1
)
)
@mock_aws
def test_ec2_sg_exposed_port_in_public_subnet_all_ports(self):
@@ -314,9 +324,12 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_
new=EC2(aws_provider),
),
):
# Test Fixer
from prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports.ec2_securitygroup_allow_ingress_from_internet_to_high_risk_tcp_ports_fixer import (
fixer,
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer,
)
assert fixer(default_sg_id, AWS_REGION_EU_WEST_1)
assert (
Ec2SecuritygroupAllowIngressFromInternetToHighRiskTcpPortsFixer().fix(
resource_id=default_sg_id, region=AWS_REGION_EU_WEST_1
)
)