feat(ec2): Add 2 new checks + fixers related with EC2 service (#3827)

Co-authored-by: Sergio <sergio@prowler.com>
This commit is contained in:
Pedro Martín
2024-04-24 11:43:19 +02:00
committed by GitHub
parent 022b7ef756
commit acac3fc693
17 changed files with 805 additions and 15 deletions
+7
View File
@@ -142,4 +142,11 @@ aws:
# kms_cmk_rotation_enabled
# No configuration needed for this check
# ec2_ebs_snapshot_account_block_public_access
ec2_ebs_snapshot_account_block_public_access:
State: "block-all-sharing"
# ec2_instance_account_imdsv2_enabled
# No configuration needed for this check
```
+7
View File
@@ -42,3 +42,10 @@ aws:
# kms_cmk_rotation_enabled
# No configuration needed for this check
#ec2_ebs_snapshot_account_block_public_access
ec2_ebs_snapshot_account_block_public_access:
State: "block-all-sharing"
#ec2_instance_account_imdsv2_enabled
# No configuration needed for this check
@@ -6,19 +6,18 @@ class ec2_ebs_default_encryption(Check):
def execute(self):
findings = []
for ebs_encryption in ec2_client.ebs_encryption_by_default:
report = Check_Report_AWS(self.metadata())
report.region = ebs_encryption.region
report.resource_arn = ec2_client.__get_volume_arn_template__(
ebs_encryption.region
)
report.resource_id = ec2_client.audited_account
if ebs_encryption.status:
report.status = "PASS"
report.status_extended = "EBS Default Encryption is activated."
findings.append(report)
elif ec2_client.provider.scan_unused_services or ebs_encryption.volumes:
if ebs_encryption.volumes or ec2_client.provider.scan_unused_services:
report = Check_Report_AWS(self.metadata())
report.region = ebs_encryption.region
report.resource_arn = ec2_client.__get_volume_arn_template__(
ebs_encryption.region
)
report.resource_id = ec2_client.audited_account
report.status = "FAIL"
report.status_extended = "EBS Default Encryption is not activated."
if ebs_encryption.status:
report.status = "PASS"
report.status_extended = "EBS Default Encryption is activated."
findings.append(report)
return findings
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "ec2_ebs_snapshot_account_block_public_access",
"CheckTitle": "Ensure that public access to EBS snapshots is disabled",
"CheckType": [
"Data Protection"
],
"ServiceName": "ec2",
"SubServiceName": "snapshot",
"ResourceIdTemplate": "arn:partition:service:region:account-id",
"Severity": "high",
"ResourceType": "AwsEc2Snapshot",
"Description": "EBS snapshots can be shared with other AWS accounts or made public. By default, EBS snapshots are private and only the AWS account that created the snapshot can access it. If an EBS snapshot is shared with another AWS account or made public, the data in the snapshot can be accessed by the other account or by anyone on the internet. Ensure that public access to EBS snapshots is disabled.",
"Risk": "If public access to EBS snapshots is enabled, the data in the snapshot can be accessed by anyone on the internet.",
"RelatedUrl": "https://docs.aws.amazon.com/ebs/latest/userguide/block-public-access-snapshots-work.html#block-public-access-snapshots-enable",
"Remediation": {
"Code": {
"CLI": "aws ec2 enable-snapshot-block-public-access --state block-all-sharing",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Use the following procedures to configure and monitor block public access for snapshots.",
"Url": "https://docs.aws.amazon.com/ebs/latest/userguide/block-public-access-snapshots-work.html#block-public-access-snapshots-enable"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,36 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
class ec2_ebs_snapshot_account_block_public_access(Check):
def execute(self):
findings = []
for (
ebs_snapshot_block_status
) in ec2_client.ebs_block_public_access_snapshots_states:
if (
ebs_snapshot_block_status.snapshots
or ec2_client.provider.scan_unused_services
):
report = Check_Report_AWS(self.metadata())
report.region = ebs_snapshot_block_status.region
report.resource_arn = ec2_client.account_arn_template
report.resource_id = ec2_client.audited_account
if ebs_snapshot_block_status.status == "block-all-sharing":
report.status = "PASS"
report.status_extended = (
"Public access is blocked for all EBS Snapshots."
)
elif ebs_snapshot_block_status.status == "block-new-sharing":
report.status = "FAIL"
report.status_extended = (
"Public access is blocked only for new EBS Snapshots."
)
else:
report.status = "FAIL"
report.status_extended = (
"Public access is not blocked for EBS Snapshots."
)
findings.append(report)
return findings
@@ -0,0 +1,37 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
def fixer(region):
"""
Enable EBS snapshot block public access in a region.
Requires the ec2:EnableSnapshotBlockPublicAccess permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:EnableSnapshotBlockPublicAccess",
"Resource": "*"
}
]
}
Args:
region (str): AWS region
Returns:
bool: True if EBS snapshot block public access is enabled, False otherwise
"""
try:
regional_client = ec2_client.regional_clients[region]
state = ec2_client.fixer_config.get(
"ec2_ebs_snapshot_account_block_public_access", {}
).get("State", "block-all-sharing")
return (
regional_client.enable_snapshot_block_public_access(State=state)["State"]
== state
)
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "ec2_instance_account_imdsv2_enabled",
"CheckTitle": "Ensure Instance Metadata Service Version 2 (IMDSv2) is enforced for EC2 instances at the account level to protect against SSRF vulnerabilities.",
"CheckType": [
"Data Protection"
],
"ServiceName": "ec2",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service:region:account-id",
"Severity": "medium",
"ResourceType": "AwsEc2Instance",
"Description": "Ensure Instance Metadata Service Version 2 (IMDSv2) is enforced for EC2 instances at the account level to protect against SSRF vulnerabilities.",
"Risk": "EC2 instances that use IMDSv1 are vulnerable to SSRF attacks.",
"RelatedUrl": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-IMDS-new-instances.html#set-imdsv2-account-defaults",
"Remediation": {
"Code": {
"CLI": "aws ec2 modify-instance-metadata-defaults --region <region> --http-tokens required --http-put-response-hop-limit 2",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable Instance Metadata Service Version 2 (IMDSv2) on the EC2 instances. Apply this configuration at the account level for each AWS Region to set the default instance metadata version.",
"Url": "https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/configuring-IMDS-new-instances.html#set-imdsv2-account-defaults"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,29 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
class ec2_instance_account_imdsv2_enabled(Check):
def execute(self):
findings = []
for instance_metadata_default in ec2_client.instance_metadata_defaults:
if (
instance_metadata_default.instances
or ec2_client.provider.scan_unused_services
):
report = Check_Report_AWS(self.metadata())
report.region = instance_metadata_default.region
report.resource_arn = ec2_client.account_arn_template
report.resource_id = ec2_client.audited_account
if instance_metadata_default.http_tokens == "required":
report.status = "PASS"
report.status_extended = (
"IMDSv2 is enabled by default for EC2 instances."
)
else:
report.status = "FAIL"
report.status_extended = (
"IMDSv2 is not enabled by default for EC2 instances."
)
findings.append(report)
return findings
@@ -0,0 +1,34 @@
from prowler.lib.logger import logger
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
def fixer(region):
"""
Enable IMDSv2 for EC2 instances in the specified region.
Requires the ec2:ModifyInstanceMetadataDefaults permission:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "ec2:ModifyInstanceMetadataDefaults",
"Resource": "*"
}
]
}
Args:
region (str): AWS region
Returns:
bool: True if IMDSv2 is enabled, False otherwise
"""
try:
regional_client = ec2_client.regional_clients[region]
return regional_client.modify_instance_metadata_defaults(HttpTokens="required")[
"Return"
]
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False
@@ -15,6 +15,7 @@ class EC2(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.account_arn_template = f"arn:{self.audited_partition}:ec2:{self.region}:{self.audited_account}:account"
self.instances = []
self.__threading_call__(self.__describe_instances__)
self.__threading_call__(self.__get_instance_user_data__, self.instances)
@@ -34,10 +35,16 @@ class EC2(AWSService):
self.__threading_call__(self.__describe_images__)
self.volumes = []
self.__threading_call__(self.__describe_volumes__)
self.attributes_for_regions = {}
self.__threading_call__(self.__get_resources_for_regions__)
self.ebs_encryption_by_default = []
self.__threading_call__(self.__get_ebs_encryption_settings__)
self.elastic_ips = []
self.__threading_call__(self.__describe_ec2_addresses__)
self.ebs_block_public_access_snapshots_states = []
self.__threading_call__(self.__get_snapshot_block_public_access_state__)
self.instance_metadata_defaults = []
self.__threading_call__(self.__get_instance_metadata_defaults__)
def __get_volume_arn_template__(self, region):
return (
@@ -389,10 +396,10 @@ class EC2(AWSService):
def __get_ebs_encryption_settings__(self, regional_client):
try:
volumes_in_region = False
for volume in self.volumes:
if volume.region == regional_client.region:
volumes_in_region = True
volumes_in_region = self.attributes_for_regions.get(
regional_client.region, []
)
volumes_in_region = volumes_in_region.get("has_volumes", False)
self.ebs_encryption_by_default.append(
EbsEncryptionByDefault(
status=regional_client.get_ebs_encryption_by_default()[
@@ -407,6 +414,73 @@ class EC2(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_snapshot_block_public_access_state__(self, regional_client):
try:
snapshots_in_region = self.attributes_for_regions.get(
regional_client.region, []
)
snapshots_in_region = snapshots_in_region.get("has_snapshots", False)
self.ebs_block_public_access_snapshots_states.append(
EbsSnapshotBlockPublicAccess(
status=regional_client.get_snapshot_block_public_access_state()[
"State"
],
snapshots=snapshots_in_region,
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_instance_metadata_defaults__(self, regional_client):
try:
instances_in_region = self.attributes_for_regions.get(
regional_client.region, []
)
instances_in_region = instances_in_region.get("has_instances", False)
metadata_defaults = regional_client.get_instance_metadata_defaults()
account_level = metadata_defaults.get("AccountLevel", {})
self.instance_metadata_defaults.append(
InstanceMetadataDefaults(
http_tokens=account_level.get("HttpTokens", None),
instances=instances_in_region,
region=regional_client.region,
)
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def __get_resources_for_regions__(self, regional_client):
try:
has_instances = False
for instance in self.instances:
if instance.region == regional_client.region:
has_instances = True
break
has_snapshots = False
for snapshot in self.snapshots:
if snapshot.region == regional_client.region:
has_snapshots = True
break
has_volumes = False
for volume in self.volumes:
if volume.region == regional_client.region:
has_volumes = True
break
self.attributes_for_regions[regional_client.region] = {
"has_instances": has_instances,
"has_snapshots": has_snapshots,
"has_volumes": has_volumes,
}
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Instance(BaseModel):
id: str
@@ -503,3 +577,15 @@ class EbsEncryptionByDefault(BaseModel):
status: bool
volumes: bool
region: str
class EbsSnapshotBlockPublicAccess(BaseModel):
status: str
snapshots: bool
region: str
class InstanceMetadataDefaults(BaseModel):
http_tokens: Optional[str]
instances: bool
region: str
@@ -0,0 +1,67 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.ec2.ec2_service import (
EbsSnapshotBlockPublicAccess,
InstanceMetadataDefaults,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
# Since moto does not support the ec2 metadata service, we need to mock the response for some functions
def mock_get_instance_metadata_defaults(http_tokens, instances, region):
return InstanceMetadataDefaults(
http_tokens=http_tokens, instances=instances, region=region
)
def mock_get_snapshot_block_public_access_state(status, snapshots, region):
return EbsSnapshotBlockPublicAccess(
status=status, snapshots=snapshots, region=region
)
def mock_enable_snapshot_block_public_access(State):
return {"State": State}
class Test_ec2_ebs_snapshot_account_block_public_access_fixer:
@mock_aws
def test_ec2_ebs_snapshot_account_block_public_access_fixer(self):
ec2_service = mock.MagicMock()
ec2_client = mock.MagicMock()
ec2_service.regional_clients = {AWS_REGION_US_EAST_1: ec2_client}
ec2_client.instance_metadata_defaults = [
mock_get_instance_metadata_defaults(
http_tokens="required", instances=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.ebs_block_public_access_snapshots_states = [
mock_get_snapshot_block_public_access_state(
status="block-all-sharing", snapshots=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.enable_snapshot_block_public_access = (
mock_enable_snapshot_block_public_access
)
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access_fixer.ec2_client",
ec2_service,
):
from prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access_fixer import (
fixer,
)
# By default, the account has not public access blocked
assert fixer(region=AWS_REGION_US_EAST_1)
@@ -0,0 +1,146 @@
from unittest import mock
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_ec2_ebs_snapshot_account_block_public_access:
@mock_aws
def test_ec2_ebs_block_public_access_state_unblocked(self):
from prowler.providers.aws.services.ec2.ec2_service import (
EbsSnapshotBlockPublicAccess,
)
ec2_client = mock.MagicMock()
ec2_client.ebs_block_public_access_snapshots_states = [
EbsSnapshotBlockPublicAccess(
status="unblocked", snapshots=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
ec2_client.account_arn_template = (
f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access.ec2_client",
new=ec2_client,
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access import (
ec2_ebs_snapshot_account_block_public_access,
)
check = ec2_ebs_snapshot_account_block_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Public access is not blocked for EBS Snapshots."
)
assert (
result[0].resource_arn
== f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
@mock_aws
def test_ec2_ebs_block_public_access_state_block_new_sharing(self):
from prowler.providers.aws.services.ec2.ec2_service import (
EbsSnapshotBlockPublicAccess,
)
ec2_client = mock.MagicMock()
ec2_client.ebs_block_public_access_snapshots_states = [
EbsSnapshotBlockPublicAccess(
status="block-new-sharing", snapshots=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
ec2_client.account_arn_template = (
f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access.ec2_client",
new=ec2_client,
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access import (
ec2_ebs_snapshot_account_block_public_access,
)
check = ec2_ebs_snapshot_account_block_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Public access is blocked only for new EBS Snapshots."
)
assert (
result[0].resource_arn
== f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
@mock_aws
def test_ec2_ebs_block_public_access_state_block_all_sharing(self):
from prowler.providers.aws.services.ec2.ec2_service import (
EbsSnapshotBlockPublicAccess,
)
ec2_client = mock.MagicMock()
ec2_client.ebs_block_public_access_snapshots_states = [
EbsSnapshotBlockPublicAccess(
status="block-all-sharing", snapshots=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
ec2_client.account_arn_template = (
f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access.ec2_client",
new=ec2_client,
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_ebs_snapshot_account_block_public_access.ec2_ebs_snapshot_account_block_public_access import (
ec2_ebs_snapshot_account_block_public_access,
)
check = ec2_ebs_snapshot_account_block_public_access()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Public access is blocked for all EBS Snapshots."
)
assert (
result[0].resource_arn
== f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
@@ -0,0 +1,68 @@
from unittest import mock
from moto import mock_aws
from prowler.providers.aws.services.ec2.ec2_service import (
EbsSnapshotBlockPublicAccess,
InstanceMetadataDefaults,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
# Since moto does not support the ec2 metadata service, we need to mock the response for some functions
def mock_get_instance_metadata_defaults(http_tokens, instances, region):
return InstanceMetadataDefaults(
http_tokens=http_tokens, instances=instances, region=region
)
def mock_get_snapshot_block_public_access_state(status, snapshots, region):
return EbsSnapshotBlockPublicAccess(
status=status, snapshots=snapshots, region=region
)
def mock_modify_instance_metadata_defaults(HttpTokens):
if HttpTokens == "required":
return {"Return": True}
class Test_ec2_instance_account_imdsv2_enabled_fixer:
@mock_aws
def test_ec2_instance_account_imdsv2_enabled_fixer(self):
ec2_service = mock.MagicMock()
ec2_client = mock.MagicMock()
ec2_service.regional_clients = {AWS_REGION_US_EAST_1: ec2_client}
ec2_client.instance_metadata_defaults = [
mock_get_instance_metadata_defaults(
http_tokens="required", instances=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.ebs_block_public_access_snapshots_states = [
mock_get_snapshot_block_public_access_state(
status="block-all-sharing", snapshots=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.modify_instance_metadata_defaults = (
mock_modify_instance_metadata_defaults
)
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled_fixer.ec2_client",
ec2_service,
):
from prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled_fixer import (
fixer,
)
# By default, the account has not public access blocked
assert fixer(region=AWS_REGION_US_EAST_1)
@@ -0,0 +1,102 @@
from unittest import mock
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
class Test_ec2_instance_account_imdsv2_enabled:
@mock_aws
def test_ec2_imdsv2_required(self):
from prowler.providers.aws.services.ec2.ec2_service import (
InstanceMetadataDefaults,
)
ec2_client = mock.MagicMock()
ec2_client.instance_metadata_defaults = [
InstanceMetadataDefaults(
http_tokens="required", instances=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
ec2_client.account_arn_template = (
f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled.ec2_client",
new=ec2_client,
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled import (
ec2_instance_account_imdsv2_enabled,
)
check = ec2_instance_account_imdsv2_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "IMDSv2 is enabled by default for EC2 instances."
)
assert (
result[0].resource_arn
== f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
@mock_aws
def test_ec2_imdsv2_none(self):
from prowler.providers.aws.services.ec2.ec2_service import (
InstanceMetadataDefaults,
)
ec2_client = mock.MagicMock()
ec2_client.instance_metadata_defaults = [
InstanceMetadataDefaults(
http_tokens=None, instances=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
ec2_client.account_arn_template = (
f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled.ec2_client",
new=ec2_client,
):
# Test Check
from prowler.providers.aws.services.ec2.ec2_instance_account_imdsv2_enabled.ec2_instance_account_imdsv2_enabled import (
ec2_instance_account_imdsv2_enabled,
)
check = ec2_instance_account_imdsv2_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "IMDSv2 is not enabled by default for EC2 instances."
)
assert (
result[0].resource_arn
== f"arn:aws:ec2:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:account"
)
assert result[0].resource_id == AWS_ACCOUNT_NUMBER
@@ -3,6 +3,8 @@ import re
from base64 import b64decode
from datetime import datetime
import botocore
import mock
from boto3 import client, resource
from dateutil.tz import tzutc
from freezegun import freeze_time
@@ -19,6 +21,8 @@ from tests.providers.aws.utils import (
EXAMPLE_AMI_ID = "ami-12c6146b"
MOCK_DATETIME = datetime(2023, 1, 4, 7, 27, 30, tzinfo=tzutc())
orig = botocore.client.BaseClient._make_api_call
class Test_EC2_Service:
# Test EC2 Service
@@ -336,6 +340,106 @@ class Test_EC2_Service:
if result.region == AWS_REGION_US_EAST_1:
assert result.status
# Test EC2 get_snapshot_block_public_access_state
def test__get_snapshot_block_public_access_state__(self):
from prowler.providers.aws.services.ec2.ec2_service import (
EbsSnapshotBlockPublicAccess,
)
ec2_client = mock.MagicMock()
ec2_client.ebs_block_public_access_snapshots_states = [
EbsSnapshotBlockPublicAccess(
status="block-all-sharing", snapshots=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_client.ec2_client",
new=ec2_client,
):
assert (
ec2_client.ebs_block_public_access_snapshots_states[0].status
== "block-all-sharing"
)
# Test EC2 __get_resources_for_regions__
@mock_aws
def test__get_resources_for_regions__(self):
# Generate EC2 Client
ec2_resource = resource("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
# Get AMI image
image_response = ec2_client.describe_images()
image_id = image_response["Images"][0]["ImageId"]
# Create EC2 Instances running
ec2_resource.create_instances(
MinCount=1,
MaxCount=1,
ImageId=image_id,
)
# Create Volume
volume_id = ec2_client.create_volume(
AvailabilityZone=AWS_REGION_US_EAST_1,
Encrypted=False,
Size=40,
TagSpecifications=[
{
"ResourceType": "volume",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)["VolumeId"]
ec2_client.create_snapshot(
VolumeId=volume_id,
TagSpecifications=[
{
"ResourceType": "snapshot",
"Tags": [
{"Key": "test", "Value": "test"},
],
},
],
)
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
ec2 = EC2(aws_provider)
assert ec2.attributes_for_regions[AWS_REGION_US_EAST_1]["has_snapshots"]
assert ec2.attributes_for_regions[AWS_REGION_US_EAST_1]["has_instances"]
assert ec2.attributes_for_regions[AWS_REGION_US_EAST_1]["has_volumes"]
# Test __get_instance_metadata_defaults__
@mock_aws
def test__get_instance_metadata_defaults__(self):
from prowler.providers.aws.services.ec2.ec2_service import (
InstanceMetadataDefaults,
)
ec2_client = mock.MagicMock()
ec2_client.instance_metadata_defaults = [
InstanceMetadataDefaults(
http_tokens="required", instances=True, region=AWS_REGION_US_EAST_1
)
]
ec2_client.audited_account = AWS_ACCOUNT_NUMBER
ec2_client.region = AWS_REGION_US_EAST_1
with mock.patch(
"prowler.providers.common.common.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ec2.ec2_client.ec2_client",
new=ec2_client,
):
assert ec2_client.instance_metadata_defaults[0].http_tokens == "required"
# Test EC2 Describe Addresses
@mock_aws
def test__describe_addresses__(self):