diff --git a/prowler/providers/aws/services/directconnect/__init__.py b/prowler/providers/aws/services/directconnect/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/directconnect/directconnect_client.py b/prowler/providers/aws/services/directconnect/directconnect_client.py new file mode 100644 index 0000000000..341d879990 --- /dev/null +++ b/prowler/providers/aws/services/directconnect/directconnect_client.py @@ -0,0 +1,6 @@ +from prowler.providers.aws.services.directconnect.directconnect_service import ( + DirectConnect, +) +from prowler.providers.common.provider import Provider + +directconnect_client = DirectConnect(Provider.get_global_provider()) diff --git a/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/__init__.py b/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy.metadata.json b/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy.metadata.json new file mode 100644 index 0000000000..fc13f982e4 --- /dev/null +++ b/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "directconnect_connection_redundancy", + "CheckTitle": "Ensure Direct Connect connections are redundant", + "CheckType": [ + "Resilience" + ], + "ServiceName": "directconnect", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:directconnect:region:account-id:directconnect/resource-id", + "Severity": "medium", + "ResourceType": "", + "Description": "Checks the resilience of the AWS Direct Connect used to connect your on-premises.", + "Risk": "This check alerts you if any Direct Connect connections are not redundant and the connections are coming from two distinct Direct Connect locations. Lack of location resiliency can result in unexpected downtime during maintenance, a fiber cut, a device failure, or a complete location failure.", + "RelatedUrl": "https://docs.aws.amazon.com/awssupport/latest/user/fault-tolerance-checks.html#amazon-direct-connect-location-resiliency", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "To build Direct Connect location resiliency, you should have at least two connections from at least two distinct Direct Connect locations.", + "Url": "https://aws.amazon.com/directconnect/resiliency-recommendation/" + } + }, + "Categories": [ + "redundancy" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy.py b/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy.py new file mode 100644 index 0000000000..79ac46334d --- /dev/null +++ b/prowler/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy.py @@ -0,0 +1,42 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.directconnect.directconnect_client import ( + directconnect_client, +) + + +class directconnect_connection_redundancy(Check): + def execute(self): + findings = [] + if len(directconnect_client.connections): + regions = {} + for conn in directconnect_client.connections.values(): + if conn.region not in regions: + regions[conn.region] = {} + regions[conn.region]["Connections"] = 0 + regions[conn.region]["Locations"] = set() + regions[conn.region]["Connections"] += 1 + regions[conn.region]["Locations"].add(conn.location) + + for region, connections in regions.items(): + report = Check_Report_AWS(self.metadata()) + report.region = region + report.resource_arn = directconnect_client.audited_account_arn + report.resource_id = directconnect_client.audited_account + if connections["Connections"] == 1: + report.status = "FAIL" + report.status_extended = ( + "There is only one Direct Connect connection." + ) + else: # Connection Redundancy is met. + if ( + len(connections["Locations"]) == 1 + ): # All connections use the same location + report.status = "FAIL" + report.status_extended = f"There is only one location {next(iter(connections['Locations']))} used by all the Direct Connect connections." + else: # Connection Redundancy and Location Redundancy is also met + report.status = "PASS" + report.status_extended = f"There are {connections['Connections']} Direct Connect connections across {len(connections['Locations'])} locations." + + findings.append(report) + + return findings diff --git a/prowler/providers/aws/services/directconnect/directconnect_service.py b/prowler/providers/aws/services/directconnect/directconnect_service.py new file mode 100644 index 0000000000..aa652f9963 --- /dev/null +++ b/prowler/providers/aws/services/directconnect/directconnect_service.py @@ -0,0 +1,149 @@ +from typing import Optional + +from botocore.exceptions import ClientError +from pydantic import BaseModel + +from prowler.lib.logger import logger +from prowler.lib.scan_filters.scan_filters import is_resource_filtered +from prowler.providers.aws.lib.service.service import AWSService + + +class DirectConnect(AWSService): + def __init__(self, provider): + super().__init__(__class__.__name__, provider) + self.connections = {} + self.vifs = {} + self.vgws = {} + self.dxgws = {} + self.__threading_call__(self._describe_connections) + self.__threading_call__(self._describe_vifs) + + def _describe_connections(self, regional_client): + """List DirectConnect(s) in the given region. + + Args: + regional_client: The regional AWS client. + """ + + try: + logger.info("DirectConnect - Listing Connections...") + dx_connect = regional_client.describe_connections() + + for connection in dx_connect["connections"]: + connection_arn = f"arn:{self.audited_partition}:directconnect:{regional_client.region}:{self.audited_account}:dxcon/{connection['connectionId']}" + if not self.audit_resources or ( + is_resource_filtered(connection_arn, self.audit_resources) + ): + self.connections[connection_arn] = Connection( + arn=connection_arn, + id=connection["connectionId"], + name=connection["connectionName"], + location=connection["location"], + region=regional_client.region, + ) + + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + def _describe_vifs(self, regional_client): + """Describe each DirectConnect VIFs.""" + + logger.info("DirectConnect - Describing VIFs...") + try: + describe_vifs = regional_client.describe_virtual_interfaces() + for vif in describe_vifs["virtualInterfaces"]: + vif_id = vif["virtualInterfaceId"] + vif_arn = f"arn:{self.audited_partition}:directconnect:{regional_client.region}:{self.audited_account}:dxvif/{vif_id}" + if not self.audit_resources or ( + is_resource_filtered(vif_arn, self.audit_resources) + ): + vgw_id = vif.get("virtualGatewayId") + connection_id = vif.get("connectionId") + dxgw_id = vif.get("directConnectGatewayId") + self.vifs[vif_arn] = VirtualInterface( + arn=vif_arn, + id=vif_id, + name=vif["virtualInterfaceName"], + connection_id=connection_id, + vgw_gateway_id=vif["virtualGatewayId"], + dx_gateway_id=dxgw_id, + location=vif["location"], + region=regional_client.region, + ) + if vgw_id: + vgw_arn = f"arn:{self.audited_partition}:directconnect:{regional_client.region}:{self.audited_account}:virtual-gateway/{vgw_id}" + if vgw_arn in self.vgws: + self.vgws[vgw_arn].vifs.append(vif_id) + self.vgws[vgw_arn].connections.append(connection_id) + else: + self.vgws[vgw_arn] = VirtualGateway( + arn=vgw_arn, + id=vgw_id, + vifs=[vif_id], + connections=[connection_id], + region=regional_client.region, + ) + + if dxgw_id: + dxgw_arn = f"arn:{self.audited_partition}:directconnect:{regional_client.region}:{self.audited_account}:dx-gateway/{dxgw_id}" + if dxgw_arn in self.dxgws: + self.dxgws[dxgw_arn].vifs.append(vif_id) + self.dxgws[dxgw_arn].connections.append(connection_id) + else: + self.dxgws[dxgw_arn] = DXGateway( + arn=dxgw_arn, + id=dxgw_id, + vifs=[vif_id], + connections=[connection_id], + region=regional_client.region, + ) + except ClientError as error: + if error.response["Error"]["Code"] == "ResourceNotFoundException": + logger.warning( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + else: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + except Exception as error: + logger.error( + f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + + +class Connection(BaseModel): + arn = str + id: str + name: Optional[str] = None + location: str + region: str + + +class VirtualInterface(BaseModel): + arn: str + id: str + name: str + connection_id: Optional[str] = None + vgw_gateway_id: str + dx_gateway_id: str + location: str + region: str + + +class VirtualGateway(BaseModel): + arn: str + id: str + vifs: list + connections: list + region: str + + +class DXGateway(BaseModel): + arn: str + id: str + vifs: list + connections: list + region: str diff --git a/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/__init__.py b/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy.metadata.json b/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy.metadata.json new file mode 100644 index 0000000000..cf16a0664e --- /dev/null +++ b/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy.metadata.json @@ -0,0 +1,34 @@ +{ + "Provider": "aws", + "CheckID": "directconnect_virtual_interface_redundancy", + "CheckTitle": "Ensure Direct Connect virtual interface(s) are providing redundant connections", + "CheckType": [ + "Resilience" + ], + "ServiceName": "directconnect", + "SubServiceName": "", + "ResourceIdTemplate": "arn:partition:directconnect:region:account-id:directconnect/resource-id", + "Severity": "medium", + "ResourceType": "", + "Description": "Checks the resilience of the AWS Direct Connect used to connect your on-premises to each Direct Connect gateway or virtual private gateway.", + "Risk": "This check alerts you if any Direct Connect gateway or virtual private gateway isn't configured with virtual interfaces across at least two distinct Direct Connect locations. Lack of location resiliency can result in unexpected downtime during maintenance, a fiber cut, a device failure, or a complete location failure.", + "RelatedUrl": "https://docs.aws.amazon.com/awssupport/latest/user/fault-tolerance-checks.html#amazon-direct-connect-location-resiliency", + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "", + "Terraform": "" + }, + "Recommendation": { + "Text": "To build Direct Connect location resiliency, you can configure the Direct Connect gateway or virtual private gateway to connect to at least two distinct Direct Connect locations.", + "Url": "https://aws.amazon.com/directconnect/resiliency-recommendation/" + } + }, + "Categories": [ + "redundancy" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "" +} diff --git a/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy.py b/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy.py new file mode 100644 index 0000000000..5ffd55a19d --- /dev/null +++ b/prowler/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy.py @@ -0,0 +1,48 @@ +from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.providers.aws.services.directconnect.directconnect_client import ( + directconnect_client, +) + + +class directconnect_virtual_interface_redundancy(Check): + def execute(self): + findings = [] + for vgw in directconnect_client.vgws.values(): + report = Check_Report_AWS(self.metadata()) + report.resource_arn = vgw.arn + report.region = vgw.region + report.resource_id = vgw.id + if len(vgw.vifs) < 2: + report.status = "FAIL" + report.status_extended = ( + f"Virtual private gateway {vgw.id} only has one VIF." + ) + elif len(vgw.connections) < 2: + report.status = "FAIL" + report.status_extended = f"Virtual private gateway {vgw.id} has more than 1 VIFs, but all the VIFs are on the same DX Connection." + else: + report.status = "PASS" + report.status_extended = f"Virtual private gateway {vgw.id} has more than 1 VIFs and the VIFs are on more than one DX connection." + + findings.append(report) + + for dxgw in directconnect_client.dxgws.values(): + report = Check_Report_AWS(self.metadata()) + report.region = dxgw.region + report.resource_arn = dxgw.arn + report.resource_id = dxgw.id + if len(dxgw.vifs) < 2: + report.status = "FAIL" + report.status_extended = ( + f"Direct Connect gateway {dxgw.id} only has one VIF." + ) + elif len(dxgw.connections) < 2: + report.status = "FAIL" + report.status_extended = f"Direct Connect gateway {dxgw.id} has more than 1 VIFs, but all the VIFs are on the same DX Connection." + else: + report.status = "PASS" + report.status_extended = f"Direct Connect gateway {dxgw.id} has more than 1 VIFs and the VIFs are on more than one DX connection." + + findings.append(report) + + return findings diff --git a/tests/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy_test.py b/tests/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy_test.py new file mode 100644 index 0000000000..557bf08b42 --- /dev/null +++ b/tests/providers/aws/services/directconnect/directconnect_connection_redundancy/directconnect_connection_redundancy_test.py @@ -0,0 +1,160 @@ +from unittest import mock + +from prowler.providers.aws.services.directconnect.directconnect_service import ( + Connection, +) +from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1 + + +class Test_directconnect_connection_redundancy: + def test_no_conn(self): + dx_client = mock.MagicMock + dx_client.connections = {} + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import ( + directconnect_connection_redundancy, + ) + + check = directconnect_connection_redundancy() + result = check.execute() + + assert len(result) == 0 + + def test_single_connection(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.audited_account_arn = ( + f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}" + ) + dx_client.region = AWS_REGION_EU_WEST_1 + dx_client.connections = {} + dx_client.connections = { + "conn-test": Connection( + id="conn-test", + name="vif-id", + location="Ashburn", + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import ( + directconnect_connection_redundancy, + ) + + check = directconnect_connection_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "There is only one Direct Connect connection." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert ( + result[0].resource_arn + == f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}" + ) + assert result[0].region == AWS_REGION_EU_WEST_1 + + def test_multiple_connections_single_location(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.audited_account_arn = ( + f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}" + ) + dx_client.region = AWS_REGION_EU_WEST_1 + dx_client.connections = {} + dx_client.connections = { + "conn-test": Connection( + id="conn-test", + name="vif-id", + location="Ashburn", + region="eu-west-1", + ), + "conn-2": Connection( + id="conn-2", + name="vif-ids", + location="Ashburn", + region="eu-west-1", + ), + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import ( + directconnect_connection_redundancy, + ) + + check = directconnect_connection_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "There is only one location Ashburn used by all the Direct Connect connections." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert ( + result[0].resource_arn + == f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}" + ) + assert result[0].region == AWS_REGION_EU_WEST_1 + + def test_multiple_connections_multiple_locations(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.audited_account_arn = ( + f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}" + ) + dx_client.region = AWS_REGION_EU_WEST_1 + dx_client.connections = {} + dx_client.connections = { + "conn-test": Connection( + id="conn-test", + name="vif-id", + location="Ashburn", + region="eu-west-1", + ), + "conn-2": Connection( + id="conn-2", + name="vif-ids", + location="Loudon", + region="eu-west-1", + ), + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_connection_redundancy.directconnect_connection_redundancy import ( + directconnect_connection_redundancy, + ) + + check = directconnect_connection_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "There are 2 Direct Connect connections across 2 locations." + ) + assert result[0].resource_id == AWS_ACCOUNT_NUMBER + assert ( + result[0].resource_arn + == f"arn:aws:directconnect:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}" + ) + assert result[0].region == AWS_REGION_EU_WEST_1 diff --git a/tests/providers/aws/services/directconnect/directconnect_service_test.py b/tests/providers/aws/services/directconnect/directconnect_service_test.py new file mode 100644 index 0000000000..08ef35c4c5 --- /dev/null +++ b/tests/providers/aws/services/directconnect/directconnect_service_test.py @@ -0,0 +1,200 @@ +import botocore +from mock import patch +from moto import mock_aws + +from prowler.providers.aws.services.directconnect.directconnect_service import ( + DirectConnect, +) +from tests.providers.aws.utils import ( + AWS_ACCOUNT_NUMBER, + AWS_REGION_US_EAST_1, + set_mocked_aws_provider, +) + +# Mocking DX Calls - Moto does not allow describe connection across all DXs +make_api_call = botocore.client.BaseClient._make_api_call + + +def mock_make_api_call(self, operation_name, kwargs): + """ + As you can see the operation_name has the list_analyzers snake_case form but + we are using the ListAnalyzers form. + Rationale -> https://github.com/boto/botocore/blob/develop/botocore/client.py#L810:L816 + + We have to mock every AWS API call using Boto3 + """ + if operation_name == "DescribeConnections": + return { + "connections": [ + { + "ownerAccount": AWS_ACCOUNT_NUMBER, + "connectionId": "dx-moto-test-conn-20241022005109", + "connectionName": "test-conn", + "connectionState": "available", + "region": AWS_REGION_US_EAST_1, + "location": "Ashburn", + "bandwidth": "5000", + "vlan": 123, + "tags": [ + {"key": "string", "value": "string"}, + ], + }, + ] + } + + if operation_name == "DescribeVirtualInterfaces": + return { + "virtualInterfaces": [ + { + "ownerAccount": AWS_ACCOUNT_NUMBER, + "virtualInterfaceId": "vif-moto-test-conn", + "location": "Ashburn", + "connectionId": "dx-moto-test-conn-20241022005109", + "virtualInterfaceType": "public", + "virtualInterfaceName": "test-viff", + "vlan": 123, + "asn": 123, + "amazonSideAsn": 123, + "addressFamily": "ipv4", + "virtualInterfaceState": "available", + "customerRouterConfig": "test", + "mtu": 123, + "jumboFrameCapable": True, + "virtualGatewayId": "vgw-moto-test-conn", + "directConnectGatewayId": "dxgw-moto-test-conn", + "region": AWS_REGION_US_EAST_1, + "tags": [ + {"key": "string", "value": "string"}, + ], + "siteLinkEnabled": True, + }, + { + "ownerAccount": AWS_ACCOUNT_NUMBER, + "virtualInterfaceId": "vif-moto-test-conn-2", + "location": "Ashburn", + "connectionId": "dx-moto-test-conn-202410220051092", + "virtualInterfaceType": "public", + "virtualInterfaceName": "test-viff-2", + "vlan": 123, + "asn": 123, + "amazonSideAsn": 123, + "addressFamily": "ipv4", + "virtualInterfaceState": "available", + "customerRouterConfig": "test", + "mtu": 123, + "jumboFrameCapable": True, + "virtualGatewayId": "vgw-moto-test-conn", + "directConnectGatewayId": "dxgw-moto-test-conn", + "region": AWS_REGION_US_EAST_1, + "tags": [ + {"key": "string", "value": "string"}, + ], + "siteLinkEnabled": True, + }, + ] + } + return make_api_call(self, operation_name, kwargs) + + +def mock_generate_regional_clients(provider, service): + regional_client = provider._session.current_session.client( + service, region_name=AWS_REGION_US_EAST_1 + ) + regional_client.region = AWS_REGION_US_EAST_1 + return {AWS_REGION_US_EAST_1: regional_client} + + +@patch( + "prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients", + new=mock_generate_regional_clients, +) +# Patch every AWS call using Boto3 +@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call) +class Test_DirectConnect_Service: + # Test DirectConnect Service + @mock_aws + def test_service(self): + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + assert directconnect.service == "directconnect" + + # Test DirectConnect client + @mock_aws + def test_client(self): + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + for regional_client in directconnect.regional_clients.values(): + assert regional_client.__class__.__name__ == "DirectConnect" + + # Test DirectConnect session + @mock_aws + def test__get_session__(self): + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + assert directconnect.session.__class__.__name__ == "Session" + + # Test DirectConnect Session + @mock_aws + def test_audited_account(self): + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + assert directconnect.audited_account == AWS_ACCOUNT_NUMBER + + @mock_aws + def test_describe_connect(self): + arn = f"arn:aws:directconnect:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:dxcon/dx-moto-test-conn-20241022005109" + + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + assert len(directconnect.connections) == 1 + assert directconnect.connections[arn].region == AWS_REGION_US_EAST_1 + assert directconnect.connections[arn].location == "Ashburn" + assert directconnect.connections[arn].name == "test-conn" + assert directconnect.connections[arn].id == "dx-moto-test-conn-20241022005109" + + @mock_aws + def test_describe_vif(self): + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + vif_arn = f"arn:aws:directconnect:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:dxvif/vif-moto-test-conn" + assert len(directconnect.vifs) == 2 + assert directconnect.vifs[vif_arn].arn == vif_arn + assert directconnect.vifs[vif_arn].region == AWS_REGION_US_EAST_1 + assert directconnect.vifs[vif_arn].location == "Ashburn" + assert ( + directconnect.vifs[vif_arn].connection_id + == "dx-moto-test-conn-20241022005109" + ) + assert directconnect.vifs[vif_arn].vgw_gateway_id == "vgw-moto-test-conn" + assert directconnect.vifs[vif_arn].dx_gateway_id == "dxgw-moto-test-conn" + assert directconnect.vifs[vif_arn].name == "test-viff" + + @mock_aws + def test_describe_vgws(self): + aws_provider = set_mocked_aws_provider(AWS_REGION_US_EAST_1) + directconnect = DirectConnect(aws_provider) + vgw_arn = f"arn:aws:directconnect:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:virtual-gateway/vgw-moto-test-conn" + dxgw_arn = f"arn:aws:directconnect:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:dx-gateway/dxgw-moto-test-conn" + assert len(directconnect.vifs) == 2 + assert len(directconnect.vgws) == 1 + assert len(directconnect.dxgws) == 1 + assert directconnect.vgws[vgw_arn].region == AWS_REGION_US_EAST_1 + assert directconnect.vgws[vgw_arn].id == "vgw-moto-test-conn" + assert directconnect.vgws[vgw_arn].connections == [ + "dx-moto-test-conn-20241022005109", + "dx-moto-test-conn-202410220051092", + ] + assert directconnect.vgws[vgw_arn].vifs == [ + "vif-moto-test-conn", + "vif-moto-test-conn-2", + ] + assert directconnect.dxgws[dxgw_arn].region == AWS_REGION_US_EAST_1 + assert directconnect.dxgws[dxgw_arn].id == "dxgw-moto-test-conn" + assert directconnect.dxgws[dxgw_arn].connections == [ + "dx-moto-test-conn-20241022005109", + "dx-moto-test-conn-202410220051092", + ] + assert directconnect.dxgws[dxgw_arn].vifs == [ + "vif-moto-test-conn", + "vif-moto-test-conn-2", + ] diff --git a/tests/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy_test.py b/tests/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy_test.py new file mode 100644 index 0000000000..416d2060b0 --- /dev/null +++ b/tests/providers/aws/services/directconnect/directconnect_virtual_interface_redundancy/directconnect_virtual_interface_redundancy_test.py @@ -0,0 +1,251 @@ +from unittest import mock + +from prowler.providers.aws.services.directconnect.directconnect_service import ( + DXGateway, + VirtualGateway, +) + +AWS_REGION = "eu-west-1" +AWS_ACCOUNT_NUMBER = "123456789012" + + +class Test_directconnect_virtual_interface_redundancy: + def test_no_vif(self): + dx_client = mock.MagicMock + dx_client.vgws = {} + dx_client.dxgws = {} + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 0 + + def test_single_vif_single_connection_vgw(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.region = AWS_REGION + dx_client.vgws = {} + dx_client.dxgws = {} + dx_client.vgws = { + "vgw-test": VirtualGateway( + arn="vgw-test", + id="vgw-test", + vifs=["vif-id"], + connections=["dx-conn"], + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Virtual private gateway vgw-test only has one VIF." + ) + assert result[0].resource_id == "vgw-test" + assert result[0].resource_arn == "vgw-test" + assert result[0].region == AWS_REGION + + def test_multiple_vifs_single_connection_vgw(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.region = AWS_REGION + dx_client.vgws = {} + dx_client.dxgws = {} + dx_client.vgws = { + "vgw-test": VirtualGateway( + arn="vgw-test", + id="vgw-test", + vifs=["vif-id", "vif-id2"], + connections=["dx-conn"], + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Virtual private gateway vgw-test has more than 1 VIFs, but all the VIFs are on the same DX Connection." + ) + assert result[0].resource_id == "vgw-test" + assert result[0].resource_arn == "vgw-test" + assert result[0].region == AWS_REGION + + def test_multiple_vifs_multiple_connections_vgw(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.region = AWS_REGION + dx_client.vgws = {} + dx_client.dxgws = {} + dx_client.vgws = { + "vgw-test": VirtualGateway( + arn="vgw-test", + id="vgw-test", + vifs=["vif-id", "vif-id2"], + connections=["dx-conn", "dx-conn2"], + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Virtual private gateway vgw-test has more than 1 VIFs and the VIFs are on more than one DX connection." + ) + assert result[0].resource_id == "vgw-test" + assert result[0].resource_arn == "vgw-test" + assert result[0].region == AWS_REGION + + def test_single_vif_single_connection_dxgw(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.region = AWS_REGION + dx_client.vgws = {} + dx_client.dxgws = {} + dx_client.dxgws = { + "dx-test": DXGateway( + arn="dx-test", + id="dx-test", + vifs=["vif-id"], + connections=["dx-conn"], + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Direct Connect gateway dx-test only has one VIF." + ) + assert result[0].resource_id == "dx-test" + assert result[0].resource_arn == "dx-test" + assert result[0].region == AWS_REGION + + def test_multiple_vifs_single_connection_dxgw(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.region = AWS_REGION + dx_client.vgws = {} + dx_client.dxgws = {} + dx_client.dxgws = { + "dx-test": DXGateway( + arn="dx-test", + id="dx-test", + vifs=["vif-id", "vif-id2"], + connections=["dx-conn"], + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Direct Connect gateway dx-test has more than 1 VIFs, but all the VIFs are on the same DX Connection." + ) + assert result[0].resource_id == "dx-test" + assert result[0].resource_arn == "dx-test" + assert result[0].region == AWS_REGION + + def test_multiple_vifs_multiple_connections_dxgw(self): + dx_client = mock.MagicMock + dx_client.audited_account = AWS_ACCOUNT_NUMBER + dx_client.region = AWS_REGION + dx_client.vgws = {} + dx_client.dxgws = {} + dx_client.dxgws = { + "dx-test": DXGateway( + arn="dx-test", + id="dx-test", + vifs=["vif-id", "vif-id2"], + connections=["dx-conn", "dx-conn2"], + region="eu-west-1", + ) + } + with mock.patch( + "prowler.providers.aws.services.directconnect.directconnect_service.DirectConnect", + new=dx_client, + ): + # Test Check + from prowler.providers.aws.services.directconnect.directconnect_virtual_interface_redundancy.directconnect_virtual_interface_redundancy import ( + directconnect_virtual_interface_redundancy, + ) + + check = directconnect_virtual_interface_redundancy() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Direct Connect gateway dx-test has more than 1 VIFs and the VIFs are on more than one DX connection." + ) + assert result[0].resource_id == "dx-test" + assert result[0].resource_arn == "dx-test" + assert result[0].region == AWS_REGION