From d3ba93f0c0eb4a9d500fde96e8bf0b410611fd4c Mon Sep 17 00:00:00 2001 From: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com> Date: Mon, 2 Mar 2026 11:55:37 +0100 Subject: [PATCH] feat(openstack): add networking service with 6 checks (#9970) Co-authored-by: Andoni Alonso <14891798+andoniaf@users.noreply.github.com> --- prowler/CHANGELOG.md | 1 + .../openstack/lib/security_groups.py | 131 +++++ .../openstack/services/networking/__init__.py | 0 .../networking_admin_state_down/__init__.py | 0 .../networking_admin_state_down.metadata.json | 36 ++ .../networking_admin_state_down.py | 40 ++ .../services/networking/networking_client.py | 6 + .../__init__.py | 0 ...rking_port_security_disabled.metadata.json | 39 ++ .../networking_port_security_disabled.py | 59 ++ .../__init__.py | 0 ...ws_all_ingress_from_internet.metadata.json | 43 ++ ..._group_allows_all_ingress_from_internet.py | 56 ++ .../__init__.py | 0 ...oup_allows_rdp_from_internet.metadata.json | 43 ++ ...security_group_allows_rdp_from_internet.py | 50 ++ .../__init__.py | 0 ...oup_allows_ssh_from_internet.metadata.json | 42 ++ ...security_group_allows_ssh_from_internet.py | 50 ++ .../services/networking/networking_service.py | 278 +++++++++ .../__init__.py | 0 ...working_subnet_dhcp_disabled.metadata.json | 37 ++ .../networking_subnet_dhcp_disabled.py | 42 ++ .../networking_admin_state_down_test.py | 128 +++++ .../networking_port_security_disabled_test.py | 183 ++++++ ...p_allows_all_ingress_from_internet_test.py | 495 ++++++++++++++++ ...ity_group_allows_rdp_from_internet_test.py | 430 ++++++++++++++ ...ity_group_allows_ssh_from_internet_test.py | 540 +++++++++++++++++ .../networking_subnet_dhcp_disabled_test.py | 242 ++++++++ .../openstack_networking_service_test.py | 542 ++++++++++++++++++ 30 files changed, 3513 insertions(+) create mode 100644 prowler/providers/openstack/lib/security_groups.py create mode 100644 prowler/providers/openstack/services/networking/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_admin_state_down/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.metadata.json create mode 100644 prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.py create mode 100644 prowler/providers/openstack/services/networking/networking_client.py create mode 100644 prowler/providers/openstack/services/networking/networking_port_security_disabled/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.metadata.json create mode 100644 prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.py create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.metadata.json create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.py create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.metadata.json create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.py create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.metadata.json create mode 100644 prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.py create mode 100644 prowler/providers/openstack/services/networking/networking_service.py create mode 100644 prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/__init__.py create mode 100644 prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.metadata.json create mode 100644 prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.py create mode 100644 tests/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down_test.py create mode 100644 tests/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled_test.py create mode 100644 tests/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet_test.py create mode 100644 tests/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet_test.py create mode 100644 tests/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet_test.py create mode 100644 tests/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled_test.py create mode 100644 tests/providers/openstack/services/networking/openstack_networking_service_test.py diff --git a/prowler/CHANGELOG.md b/prowler/CHANGELOG.md index ecbb529770..43bc888153 100644 --- a/prowler/CHANGELOG.md +++ b/prowler/CHANGELOG.md @@ -25,6 +25,7 @@ All notable changes to the **Prowler SDK** are documented in this file. - OpenStack compute 7 new checks [(#9944)](https://github.com/prowler-cloud/prowler/pull/9944) - CSA CCM 4.0 for the Alibaba Cloud provider [(#10061)](https://github.com/prowler-cloud/prowler/pull/10061) - ECS Exec (ECS-006) privilege escalation detection via `ecs:ExecuteCommand` + `ecs:DescribeTasks` [(#10066)](https://github.com/prowler-cloud/prowler/pull/10066) +- OpenStack network service with 6 security checks [(#9970)](https://github.com/prowler-cloud/prowler/pull/9970) - `--export-ocsf` CLI flag to upload OCSF scan results to Prowler Cloud [(#10095)](https://github.com/prowler-cloud/prowler/pull/10095) - `scan_id` field in OCSF `unmapped` output for ingestion correlation [(#10095)](https://github.com/prowler-cloud/prowler/pull/10095) - `defenderxdr_endpoint_privileged_user_exposed_credentials` check for M365 provider [(#10084)](https://github.com/prowler-cloud/prowler/pull/10084) diff --git a/prowler/providers/openstack/lib/security_groups.py b/prowler/providers/openstack/lib/security_groups.py new file mode 100644 index 0000000000..da546dfa24 --- /dev/null +++ b/prowler/providers/openstack/lib/security_groups.py @@ -0,0 +1,131 @@ +"""Helper utilities for OpenStack security group checks.""" + +from ipaddress import IPv4Network, IPv6Network, ip_network +from typing import List, Optional + +from prowler.providers.openstack.services.networking.networking_service import ( + SecurityGroupRule, +) + + +def check_security_group_rule( + rule: SecurityGroupRule, + protocol: Optional[str] = None, + ports: Optional[List[int]] = None, + any_address: bool = False, + direction: str = "ingress", +) -> bool: + """ + Check if a security group rule matches specified criteria. + + Args: + rule: SecurityGroupRule to check + protocol: Protocol to match (tcp/udp/icmp/None for any) + ports: List of ports to check + any_address: If True, only match 0.0.0.0/0 or ::/0. If False, match public IPs # noqa: E501 + direction: Direction to check (ingress/egress) + + Returns: + True if rule matches all criteria, False otherwise + """ + # Check direction + if rule.direction != direction: + return False + + # Check protocol + if protocol is not None: + # None protocol means all protocols in OpenStack + if rule.protocol is not None and rule.protocol != protocol: + return False + + # Check ports + if ports is not None and len(ports) > 0: + # If rule has no port range, it allows all ports (protocol-level rule) + if rule.port_range_min is None and rule.port_range_max is None: + # No port range means all ports for the protocol (or all + # protocols if protocol is also None). This always matches. + pass + else: + # Check if any of the target ports fall within the rule's range + port_matches = False + for port in ports: + if is_port_in_range( + port, rule.port_range_min, rule.port_range_max + ): # noqa: E501 + port_matches = True + break + if not port_matches: + return False + + # Check CIDR - must be publicly accessible + if rule.remote_ip_prefix: + if not is_cidr_public(rule.remote_ip_prefix, any_address=any_address): + return False + elif rule.remote_group_id: + # Remote group rules are not public + return False + else: + # No IP prefix or group means all IPs (0.0.0.0/0) + pass + + return True + + +def is_port_in_range( + port: int, range_min: Optional[int], range_max: Optional[int] +) -> bool: + """ + Check if a port falls within the specified range. + + Args: + port: Port number to check + range_min: Minimum port in range (None means no minimum) + range_max: Maximum port in range (None means no maximum) + + Returns: + True if port is in range, False otherwise + """ + if range_min is None and range_max is None: + return True + + if range_min is None: + return port <= range_max + + if range_max is None: + return port >= range_min + + return range_min <= port <= range_max + + +def is_cidr_public(cidr: str, any_address: bool = False) -> bool: + """ + Check if a CIDR block represents public/internet access. + + Args: + cidr: CIDR block to check (e.g., "0.0.0.0/0", "10.0.0.0/8") + any_address: If True, only match 0.0.0.0/0 or ::/0. + If False, match any globally routable IP. + + Returns: + True if CIDR represents public access, False otherwise + """ + if not cidr: + return False + + try: + network = ip_network(cidr, strict=False) + + if any_address: + # Only match 0.0.0.0/0 or ::/0 + if isinstance(network, IPv4Network): + return str(network) == "0.0.0.0/0" + elif isinstance(network, IPv6Network): + return str(network) == "::/0" + return False + else: + # Match any globally routable (public) IP + # is_global means not private, loopback, link-local, etc. + return network.is_global or str(network) in ["0.0.0.0/0", "::/0"] + + except (ValueError, TypeError): + return False diff --git a/prowler/providers/openstack/services/networking/__init__.py b/prowler/providers/openstack/services/networking/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_admin_state_down/__init__.py b/prowler/providers/openstack/services/networking/networking_admin_state_down/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.metadata.json b/prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.metadata.json new file mode 100644 index 0000000000..f7a27aca98 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.metadata.json @@ -0,0 +1,36 @@ +{ + "Provider": "openstack", + "CheckID": "networking_admin_state_down", + "CheckTitle": "Networks are administratively enabled", + "CheckType": [], + "ServiceName": "networking", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "OS::Neutron::Net", + "ResourceGroup": "network", + "Description": "**OpenStack networks** are evaluated to verify that **admin_state_up is True** (administratively enabled). Networks with admin_state_up=False cannot carry traffic and are typically disabled temporarily during maintenance or troubleshooting. Best practices recommend re-enabling networks promptly after maintenance to prevent service outages from forgotten disabled networks.", + "Risk": "Networks with admin_state_up=False cause complete connectivity loss for all attached instances, preventing access to databases, APIs, and storage. Forgotten disabled networks after maintenance result in prolonged outages that violate SLAs. Multi-tier applications (web-app-db) fail when inter-tier networks are disabled, breaking functionality even when instances are healthy.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html" + ], + "Remediation": { + "Code": { + "CLI": "openstack network set --enable ", + "NativeIaC": "", + "Other": "**Enable network via Horizon:**\n1. Navigate to **Project > Network > Networks**\n2. For each network with Admin State = DOWN\n3. Click **Edit Network**\n4. Check the **Admin State** checkbox\n5. Click **Save**", + "Terraform": "```hcl\nresource \"openstack_networking_network_v2\" \"network\" {\n name = \"app-network\"\n admin_state_up = true # GOOD: Administratively enabled\n}\n```" + }, + "Recommendation": { + "Text": "Enable admin state on all production networks unless there is a documented maintenance window. Implement change management procedures to track network administrative state changes and ensure re-enablement after maintenance. Use monitoring alerts to detect when networks remain disabled longer than expected.", + "Url": "https://hub.prowler.com/check/networking_admin_state_down" + } + }, + "Categories": [ + "resilience" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "This check flags networks where `admin_state_up == False`. Networks are typically disabled temporarily during maintenance or troubleshooting. If networks remain disabled after maintenance windows, they should be re-enabled. Some networks may be intentionally kept disabled as part of decommissioning procedures - verify the operational status before re-enabling." +} diff --git a/prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.py b/prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.py new file mode 100644 index 0000000000..70bee11cd6 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down.py @@ -0,0 +1,40 @@ +"""OpenStack Network Admin State Check.""" + +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.networking.networking_client import ( + networking_client, +) + + +class networking_admin_state_down(Check): + """Ensure networks are administratively enabled.""" + + def execute(self) -> List[CheckReportOpenStack]: + """Execute networking_admin_state_down check. + + Iterates over all networks and verifies that admin_state_up is True, + meaning networks are administratively enabled and operational. + + Returns: + list[CheckReportOpenStack]: List of findings for each network. + """ + findings: List[CheckReportOpenStack] = [] + + for network in networking_client.networks: + report = CheckReportOpenStack(metadata=self.metadata(), resource=network) + report.resource_id = network.id + report.resource_name = network.name + report.region = network.region + + if not network.admin_state_up: + report.status = "FAIL" + report.status_extended = f"Network {network.name} ({network.id}) is administratively disabled (admin_state_up=False) and cannot carry traffic." + else: + report.status = "PASS" + report.status_extended = f"Network {network.name} ({network.id}) is administratively enabled." + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/networking/networking_client.py b/prowler/providers/openstack/services/networking/networking_client.py new file mode 100644 index 0000000000..32cdd2e867 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_client.py @@ -0,0 +1,6 @@ +from prowler.providers.common.provider import Provider +from prowler.providers.openstack.services.networking.networking_service import ( + Networking, +) + +networking_client = Networking(Provider.get_global_provider()) diff --git a/prowler/providers/openstack/services/networking/networking_port_security_disabled/__init__.py b/prowler/providers/openstack/services/networking/networking_port_security_disabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.metadata.json b/prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.metadata.json new file mode 100644 index 0000000000..54540894c0 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.metadata.json @@ -0,0 +1,39 @@ +{ + "Provider": "openstack", + "CheckID": "networking_port_security_disabled", + "CheckTitle": "Port security is enabled on networks and ports", + "CheckType": [], + "ServiceName": "networking", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "OS::Neutron::Net", + "ResourceGroup": "network", + "Description": "**OpenStack networks and ports** are evaluated to verify that **port security is enabled** (port_security_enabled=True). Port security prevents MAC and IP spoofing by enforcing anti-spoofing rules. When disabled, instances can forge source addresses, bypass network isolation, and enable man-in-the-middle attacks. Disabling is sometimes required for NFV/SR-IOV use cases.", + "Risk": "Disabled port security allows MAC/IP spoofing attacks, bypassing network isolation. Attackers can intercept traffic via ARP poisoning (MITM attacks), bypass security group rules by forging source IPs, and attack other tenants in multi-tenant environments (cross-tenant data exfiltration). Security groups cannot be enforced properly. Violates compliance requirements (PCI-DSS, HIPAA, SOC 2).", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html", + "https://docs.openstack.org/neutron/latest/admin/config-ipam.html", + "https://docs.openstack.org/security-guide/networking/architecture.html" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Navigate to Project > Network > Networks\n2. For each network, click Edit Network\n3. Ensure the Port Security checkbox is checked\n4. Click Save\n5. Navigate to Project > Network > Networks > Ports\n6. For each port, click Edit Port\n7. Ensure the Port Security checkbox is checked\n8. Click Save", + "Terraform": "```hcl\n# Terraform: ensure port security is enabled on networks and ports\n\n# GOOD: Network with port security enabled (default)\nresource \"openstack_networking_network_v2\" \"secure_network\" {\n name = \"prod-app-network\"\n admin_state_up = true\n # port_security_enabled defaults to true - explicitly set for clarity\n # No need to specify if keeping default\n}\n\n# GOOD: Explicitly enable port security\nresource \"openstack_networking_network_v2\" \"explicit_security\" {\n name = \"prod-web-network\"\n admin_state_up = true\n # Explicitly enable port security\n # Note: Check if your OpenStack provider version supports this attribute\n}\n\n# GOOD: Port with port security enabled\nresource \"openstack_networking_port_v2\" \"secure_port\" {\n name = \"instance-port-01\"\n network_id = openstack_networking_network_v2.secure_network.id\n admin_state_up = true\n \n # Port security enabled by default\n # Apply security groups\n security_group_ids = [\n openstack_networking_secgroup_v2.web_sg.id,\n ]\n \n fixed_ip {\n subnet_id = openstack_networking_subnet_v2.subnet.id\n }\n}\n\n# BAD: Port with port security disabled (security risk)\n# resource \"openstack_networking_port_v2\" \"insecure_port\" {\n# name = \"nfv-port-01\"\n# network_id = openstack_networking_network_v2.network.id\n# admin_state_up = true\n# \n# # DANGEROUS: Disabling port security allows MAC/IP spoofing\n# port_security_enabled = false\n# \n# # Security groups cannot be enforced without port security\n# # security_group_ids = [] # Must be empty when port_security_enabled = false\n# }\n\n# ACCEPTABLE: Disabled port security for legitimate NFV use case with documentation\nresource \"openstack_networking_port_v2\" \"nfv_port\" {\n name = \"nfv-vrouter-port\"\n network_id = openstack_networking_network_v2.nfv_network.id\n admin_state_up = true\n \n # Port security disabled for NFV virtual router\n # JUSTIFICATION: SR-IOV network function requires promiscuous mode\n # COMPENSATING CONTROLS: \n # - Isolated network (no shared tenants)\n # - Network-level ACLs via provider firewall\n # - Monitoring for anomalous traffic patterns\n port_security_enabled = false\n \n # Cannot use security groups with disabled port security\n security_group_ids = []\n}\n\n# Create dedicated isolated network for resources requiring disabled port security\nresource \"openstack_networking_network_v2\" \"nfv_network\" {\n name = \"nfv-isolated-network\"\n admin_state_up = true\n # Isolated network - not shared with other projects\n shared = false\n}\n\n# Validation: Check port security is enabled\nresource \"null_resource\" \"validate_port_security\" {\n depends_on = [openstack_networking_port_v2.secure_port]\n \n provisioner \"local-exec\" {\n command = <<-EOF\n # Verify port security is enabled\n PORT_SECURITY=$(openstack port show ${openstack_networking_port_v2.secure_port.id} -f json | jq -r '.port_security_enabled')\n if [ \"$PORT_SECURITY\" != \"true\" ]; then\n echo \"ERROR: Port security is disabled on ${openstack_networking_port_v2.secure_port.id}\"\n exit 1\n fi\n echo \"✓ Port security validation passed\"\n EOF\n }\n}\n```" + }, + "Recommendation": { + "Text": "Enable port security on all networks and ports to prevent MAC and IP spoofing attacks. For legitimate NFV or SR-IOV use cases requiring disabled port security, deploy on isolated networks with compensating controls like network-level ACLs and monitoring. Use allowed-address-pairs instead of disabling port security when additional IPs are needed.", + "Url": "https://hub.prowler.com/check/networking_port_security_disabled" + } + }, + "Categories": [ + "trust-boundaries", + "vulnerabilities" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "This check flags both networks and ports where `port_security_enabled == False`. Port security prevents MAC address spoofing and IP address spoofing by enforcing anti-spoofing rules. Legitimate use cases for disabled port security include: (1) Network Functions Virtualization (NFV) requiring promiscuous mode, (2) SR-IOV (Single Root I/O Virtualization) ports, (3) VLAN trunking ports, or (4) Load balancer VIP ports using VRRP. If your deployment has resources with disabled port security, verify they are documented exceptions with compensating security controls. Port security is enabled by default in OpenStack Neutron unless explicitly disabled." +} diff --git a/prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.py b/prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.py new file mode 100644 index 0000000000..0dacf8dd7d --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled.py @@ -0,0 +1,59 @@ +"""OpenStack Network Port Security Check.""" + +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.networking.networking_client import ( + networking_client, +) + + +class networking_port_security_disabled(Check): + """Ensure port security is enabled on networks and ports.""" + + def execute(self) -> List[CheckReportOpenStack]: + """Execute networking_port_security_disabled check. + + Iterates over all networks and ports and verifies that port security + is enabled to prevent MAC/IP spoofing attacks. + + Returns: + list[CheckReportOpenStack]: List of findings for each network/port. + """ + findings: List[CheckReportOpenStack] = [] + + # Check networks + for network in networking_client.networks: + report = CheckReportOpenStack(metadata=self.metadata(), resource=network) + report.resource_id = network.id + report.resource_name = network.name + report.region = network.region + + if not network.port_security_enabled: + report.status = "FAIL" + report.status_extended = f"Network {network.name} ({network.id}) has port security disabled, which allows MAC and IP address spoofing attacks." + else: + report.status = "PASS" + report.status_extended = ( + f"Network {network.name} ({network.id}) has port security enabled." + ) + + findings.append(report) + + # Check ports + for port in networking_client.ports: + report = CheckReportOpenStack(metadata=self.metadata(), resource=port) + report.resource_id = port.id + report.resource_name = port.name or f"port-{port.id[:8]}" + report.region = port.region + + if not port.port_security_enabled: + report.status = "FAIL" + report.status_extended = f"Port {report.resource_name} ({port.id}) on network {port.network_id} has port security disabled, which allows MAC and IP address spoofing." + else: + report.status = "PASS" + report.status_extended = f"Port {report.resource_name} ({port.id}) has port security enabled." + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/__init__.py b/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.metadata.json b/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.metadata.json new file mode 100644 index 0000000000..473dc9602d --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.metadata.json @@ -0,0 +1,43 @@ +{ + "Provider": "openstack", + "CheckID": "networking_security_group_allows_all_ingress_from_internet", + "CheckTitle": "Security groups do not allow all ingress traffic from the Internet", + "CheckType": [], + "ServiceName": "networking", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "critical", + "ResourceType": "OS::Neutron::SecurityGroup", + "ResourceGroup": "network", + "Description": "**OpenStack security groups** are evaluated to verify that no rule allows **all ingress traffic** (any protocol, any port) from the Internet (0.0.0.0/0 or ::/0). A rule with no protocol and no port restriction is effectively a \"permit any\" firewall rule, completely bypassing network-level access controls. This is the most permissive possible configuration and should never be used in production.", + "Risk": "Allowing all inbound traffic from the Internet exposes every service running on the instance to unauthorized access. Attackers can discover and exploit any listening service including databases, management interfaces, internal APIs, and debugging tools. This bypasses defense-in-depth and is equivalent to having no firewall. Combined with misconfigurations or unpatched services, it enables initial access, lateral movement, data exfiltration, and full infrastructure compromise.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html", + "https://docs.openstack.org/security-guide/networking/architecture.html", + "https://docs.openstack.org/api-ref/network/v2/", + "https://docs.openstack.org/neutron/latest/admin/config-rbac.html" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Navigate to Network > Security Groups\n2. Locate the security group with unrestricted ingress\n3. Click Manage Rules\n4. Delete rules allowing all traffic from 0.0.0.0/0 or ::/0 with no protocol/port restriction\n5. Create specific rules for only the protocols and ports required (e.g., TCP 443 for HTTPS)\n6. Restrict source CIDRs to known IP ranges where possible\n7. Save changes and verify connectivity", + "Terraform": "```hcl\n# Terraform: create specific ingress rules instead of allowing all traffic\n\nresource \"openstack_networking_secgroup_v2\" \"web_servers\" {\n name = \"web-servers-sg\"\n description = \"Security group for web servers\"\n}\n\n# GOOD: Allow only HTTPS from the Internet\nresource \"openstack_networking_secgroup_rule_v2\" \"https_ingress\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 443\n port_range_max = 443\n remote_ip_prefix = \"0.0.0.0/0\"\n security_group_id = openstack_networking_secgroup_v2.web_servers.id\n}\n\n# GOOD: Allow SSH only from bastion security group\nresource \"openstack_networking_secgroup_rule_v2\" \"ssh_from_bastion\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 22\n port_range_max = 22\n remote_group_id = openstack_networking_secgroup_v2.bastion.id\n security_group_id = openstack_networking_secgroup_v2.web_servers.id\n}\n\n# BAD: Never allow all traffic from the Internet\n# resource \"openstack_networking_secgroup_rule_v2\" \"allow_all\" {\n# direction = \"ingress\"\n# ethertype = \"IPv4\"\n# remote_ip_prefix = \"0.0.0.0/0\" # DANGEROUS - no protocol/port restriction\n# security_group_id = openstack_networking_secgroup_v2.web_servers.id\n# }\n```" + }, + "Recommendation": { + "Text": "Remove rules that allow all ingress traffic from the Internet. Follow the principle of least privilege by creating specific rules for only the required protocols and ports. Use security group references (remote_group_id) instead of CIDRs where possible to restrict access to known infrastructure components.", + "Url": "https://hub.prowler.com/check/networking_security_group_allows_all_ingress_from_internet" + } + }, + "Categories": [ + "internet-exposed", + "trust-boundaries" + ], + "DependsOn": [], + "RelatedTo": [ + "networking_security_group_allows_ssh_from_internet", + "networking_security_group_allows_rdp_from_internet" + ], + "Notes": "This check specifically flags rules where protocol is unset (all protocols) AND port range is unset (all ports) AND the source is 0.0.0.0/0 or ::/0. Rules allowing all TCP or all UDP from the Internet are not flagged by this check but may be flagged by port-specific checks (SSH, RDP). In OpenStack, a rule with no remote_ip_prefix and no remote_group_id implies access from any source." +} diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.py b/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.py new file mode 100644 index 0000000000..633815a129 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet.py @@ -0,0 +1,56 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.lib.security_groups import is_cidr_public +from prowler.providers.openstack.services.networking.networking_client import ( + networking_client, +) + + +class networking_security_group_allows_all_ingress_from_internet(Check): + """Ensure security groups do not allow all ingress traffic from the Internet.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for sg in networking_client.security_groups: + report = CheckReportOpenStack(metadata=self.metadata(), resource=sg) + report.resource_id = sg.id + report.resource_name = sg.name + report.region = sg.region + + all_ingress_exposed = False + exposed_rules = [] + + for rule in sg.security_group_rules: + # Only match rules that allow ALL protocols AND ALL ports + if rule.direction != "ingress": + continue + if rule.protocol is not None: + continue + if rule.port_range_min is not None or rule.port_range_max is not None: + continue + + # Check if from internet (0.0.0.0/0, ::/0, or None with no group) + if rule.remote_group_id: + continue + if rule.remote_ip_prefix: + if not is_cidr_public(rule.remote_ip_prefix, any_address=True): + continue + # else: no prefix and no group means all IPs + + all_ingress_exposed = True + cidr = rule.remote_ip_prefix or "0.0.0.0/0" + exposed_rules.append(f"rule {rule.id} ({cidr})") + + if all_ingress_exposed: + report.status = "FAIL" + rules_str = ", ".join(exposed_rules) + report.status_extended = f"Security group {sg.name} ({sg.id}) allows all ingress traffic (any protocol, any port) from the Internet via {rules_str}." + else: + report.status = "PASS" + report.status_extended = f"Security group {sg.name} ({sg.id}) does not allow all ingress traffic from the Internet." + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/__init__.py b/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.metadata.json b/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.metadata.json new file mode 100644 index 0000000000..340136c197 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.metadata.json @@ -0,0 +1,43 @@ +{ + "Provider": "openstack", + "CheckID": "networking_security_group_allows_rdp_from_internet", + "CheckTitle": "Security groups do not allow RDP from the Internet", + "CheckType": [], + "ServiceName": "networking", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "OS::Neutron::SecurityGroup", + "ResourceGroup": "network", + "Description": "**OpenStack security groups** are evaluated to verify that RDP (port 3389) is **not exposed to the Internet** (0.0.0.0/0 or ::/0). Security groups act as virtual firewalls controlling Windows instance traffic. Unrestricted RDP access violates least privilege and creates significant attack surface. Best practices recommend restricting RDP to **known IP ranges**, **RD Gateway**, or **VPN**.", + "Risk": "Unrestricted RDP exposes Windows instances to brute-force attacks, password spraying, and RDP vulnerabilities (BlueKeep, DejaBlue). Compromised sessions enable ransomware deployment, credential theft, privilege escalation, and lateral movement. Successful compromise leads to domain controller access, Active Directory enumeration, and organization-wide data exfiltration.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html", + "https://docs.openstack.org/security-guide/networking/architecture.html", + "https://docs.openstack.org/api-ref/network/v2/", + "https://www.cisa.gov/news-events/cybersecurity-advisories/aa19-168a", + "https://www.ncsc.gov.uk/guidance/preventing-lateral-movement" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Navigate to Network > Security Groups\n2. Locate the security group with unrestricted RDP access\n3. Click Manage Rules\n4. Delete RDP rules with Remote 0.0.0.0/0 or ::/0\n5. Click Add Rule and create a new RDP rule\n6. Set Remote to CIDR with your trusted IP (e.g., 198.51.100.50/32)\n7. Save changes and verify connectivity", + "Terraform": "```hcl\n# Terraform: restrict RDP to known IP ranges, never 0.0.0.0/0\n\nresource \"openstack_networking_secgroup_v2\" \"windows_servers\" {\n name = \"windows-servers-sg\"\n description = \"Security group for Windows application servers\"\n}\n\n# GOOD: RDP restricted to corporate VPN IP range\nresource \"openstack_networking_secgroup_rule_v2\" \"rdp_from_vpn\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 3389\n port_range_max = 3389\n remote_ip_prefix = \"10.8.0.0/24\" # VPN gateway range\n security_group_id = openstack_networking_secgroup_v2.windows_servers.id\n}\n\n# GOOD: RDP from RD Gateway security group (reference-based rule)\nresource \"openstack_networking_secgroup_rule_v2\" \"rdp_from_rd_gateway\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 3389\n port_range_max = 3389\n remote_group_id = openstack_networking_secgroup_v2.rd_gateway.id # Reference\n security_group_id = openstack_networking_secgroup_v2.windows_servers.id\n}\n\n# Remote Desktop Gateway security group (least privilege)\nresource \"openstack_networking_secgroup_v2\" \"rd_gateway\" {\n name = \"rd-gateway-sg\"\n description = \"Security group for Remote Desktop Gateway\"\n}\n\n# RD Gateway accepts RDP from corporate network only\nresource \"openstack_networking_secgroup_rule_v2\" \"rd_gateway_rdp\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 3389\n port_range_max = 3389\n remote_ip_prefix = var.corporate_cidr # e.g., 198.51.100.0/24\n security_group_id = openstack_networking_secgroup_v2.rd_gateway.id\n}\n\n# RD Gateway also needs HTTPS for external RDG clients\nresource \"openstack_networking_secgroup_rule_v2\" \"rd_gateway_https\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 443\n port_range_max = 443\n remote_ip_prefix = var.corporate_cidr # Restrict to corporate IPs\n security_group_id = openstack_networking_secgroup_v2.rd_gateway.id\n}\n\n# BAD: Never do this - RDP from anywhere (0.0.0.0/0)\n# resource \"openstack_networking_secgroup_rule_v2\" \"rdp_from_anywhere\" {\n# direction = \"ingress\"\n# ethertype = \"IPv4\"\n# protocol = \"tcp\"\n# port_range_min = 3389\n# port_range_max = 3389\n# remote_ip_prefix = \"0.0.0.0/0\" # DANGEROUS - DO NOT USE\n# security_group_id = openstack_networking_secgroup_v2.windows_servers.id\n# }\n```" + }, + "Recommendation": { + "Text": "Restrict RDP access to known IP ranges using Remote Desktop Gateway or VPN instead of allowing 0.0.0.0/0. Enable Network Level Authentication (NLA) and implement multi-factor authentication for all RDP sessions. Deploy LAPS for unique local administrator passwords and patch Windows systems regularly to address RDP vulnerabilities.", + "Url": "https://hub.prowler.com/check/networking_security_group_allows_rdp_from_internet" + } + }, + "Categories": [ + "internet-exposed", + "trust-boundaries" + ], + "DependsOn": [], + "RelatedTo": [ + "networking_security_group_allows_ssh_from_internet" + ], + "Notes": "This check flags security groups with RDP (port 3389) ingress rules allowing 0.0.0.0/0 or ::/0. Some architectures legitimately require public RDP access (bastion hosts, RD Gateway, jump servers). Review findings in context of your Windows infrastructure. Rules allowing RDP from specific IPs (e.g., 198.51.100.0/24) or from other security groups (remote_group_id) are not flagged. IPv6 rules (::/0) are also checked. Port ranges that include port 3389 (e.g., 3000-4000) will trigger this check." +} diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.py b/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.py new file mode 100644 index 0000000000..b8497fac67 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet.py @@ -0,0 +1,50 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.lib.security_groups import check_security_group_rule +from prowler.providers.openstack.services.networking.networking_client import ( + networking_client, +) + + +class networking_security_group_allows_rdp_from_internet(Check): + """Ensure security groups do not allow RDP from the Internet.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for sg in networking_client.security_groups: + report = CheckReportOpenStack(metadata=self.metadata(), resource=sg) + report.resource_id = sg.id + report.resource_name = sg.name + report.region = sg.region + + # Check if any rule allows RDP from 0.0.0.0/0 or ::/0 + rdp_exposed = False + exposed_rules = [] + + for rule in sg.security_group_rules: + if check_security_group_rule( + rule=rule, + protocol="tcp", + ports=[3389], + any_address=True, + direction="ingress", + ): + rdp_exposed = True + cidr = rule.remote_ip_prefix or "0.0.0.0/0" + exposed_rules.append( + f"rule {rule.id} ({rule.protocol}/{cidr}:{rule.port_range_min}-{rule.port_range_max})" + ) + + if rdp_exposed: + report.status = "FAIL" + rules_str = ", ".join(exposed_rules) + report.status_extended = f"Security group {sg.name} ({sg.id}) allows unrestricted RDP access (port 3389) from the Internet via {rules_str}." + else: + report.status = "PASS" + report.status_extended = f"Security group {sg.name} ({sg.id}) does not allow RDP (port 3389) from the Internet." + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/__init__.py b/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.metadata.json b/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.metadata.json new file mode 100644 index 0000000000..22d292d85e --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.metadata.json @@ -0,0 +1,42 @@ +{ + "Provider": "openstack", + "CheckID": "networking_security_group_allows_ssh_from_internet", + "CheckTitle": "Security groups do not allow SSH from the Internet", + "CheckType": [], + "ServiceName": "networking", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "high", + "ResourceType": "OS::Neutron::SecurityGroup", + "ResourceGroup": "network", + "Description": "**OpenStack security groups** are evaluated to verify that SSH (port 22) is **not exposed to the Internet** (0.0.0.0/0 or ::/0). Security groups act as virtual firewalls controlling instance traffic. Unrestricted SSH access violates least privilege and creates significant attack surface. Best practices recommend restricting SSH to **known IP ranges**, **bastion hosts**, or **VPN gateways**.", + "Risk": "Unrestricted SSH exposes instances to brute-force attacks, password testing, and SSH vulnerability exploitation (CVE-2023-38408, CVE-2021-41617) for initial access. Compromised instances enable persistence, privilege escalation, lateral movement, crypto mining, DDoS botnets, data exfiltration, and ransomware deployment. SSH exposure bypasses defense-in-depth and enables direct production access.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html", + "https://docs.openstack.org/security-guide/networking/architecture.html", + "https://docs.openstack.org/api-ref/network/v2/", + "https://docs.openstack.org/neutron/latest/admin/config-rbac.html" + ], + "Remediation": { + "Code": { + "CLI": "", + "NativeIaC": "", + "Other": "1. Navigate to Network > Security Groups\n2. Locate the security group with unrestricted SSH access\n3. Click Manage Rules\n4. Delete SSH rules with Remote 0.0.0.0/0 or ::/0\n5. Click Add Rule and create a new SSH rule\n6. Set Remote to CIDR with your trusted IP (e.g., 203.0.113.50/32)\n7. Save changes and verify connectivity", + "Terraform": "```hcl\n# Terraform: restrict SSH to known IP ranges, never 0.0.0.0/0\n\nresource \"openstack_networking_secgroup_v2\" \"app_servers\" {\n name = \"app-servers-sg\"\n description = \"Security group for application servers\"\n}\n\n# GOOD: SSH restricted to corporate office IP\nresource \"openstack_networking_secgroup_rule_v2\" \"ssh_from_office\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 22\n port_range_max = 22\n remote_ip_prefix = \"203.0.113.0/24\" # Corporate office network\n security_group_id = openstack_networking_secgroup_v2.app_servers.id\n}\n\n# GOOD: SSH from bastion security group (reference-based rule)\nresource \"openstack_networking_secgroup_rule_v2\" \"ssh_from_bastion\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 22\n port_range_max = 22\n remote_group_id = openstack_networking_secgroup_v2.bastion.id # Reference, not CIDR\n security_group_id = openstack_networking_secgroup_v2.app_servers.id\n}\n\n# Bastion security group (least privilege)\nresource \"openstack_networking_secgroup_v2\" \"bastion\" {\n name = \"bastion-sg\"\n description = \"Security group for bastion host - only entry point for SSH\"\n}\n\nresource \"openstack_networking_secgroup_rule_v2\" \"bastion_ssh\" {\n direction = \"ingress\"\n ethertype = \"IPv4\"\n protocol = \"tcp\"\n port_range_min = 22\n port_range_max = 22\n remote_ip_prefix = var.corporate_cidr # e.g., 203.0.113.0/24\n security_group_id = openstack_networking_secgroup_v2.bastion.id\n}\n\n# BAD: Never do this - SSH from anywhere (0.0.0.0/0)\n# resource \"openstack_networking_secgroup_rule_v2\" \"ssh_from_anywhere\" {\n# direction = \"ingress\"\n# ethertype = \"IPv4\"\n# protocol = \"tcp\"\n# port_range_min = 22\n# port_range_max = 22\n# remote_ip_prefix = \"0.0.0.0/0\" # DANGEROUS - DO NOT USE\n# security_group_id = openstack_networking_secgroup_v2.app_servers.id\n# }\n```" + }, + "Recommendation": { + "Text": "Restrict SSH access to known IP ranges using bastion hosts or VPN gateways instead of allowing 0.0.0.0/0. Implement multi-factor authentication, disable password authentication, and use SSH certificates for centralized key management. Monitor SSH logs for failed login attempts and enforce fail2ban or similar IP blocking mechanisms.", + "Url": "https://hub.prowler.com/check/networking_security_group_allows_ssh_from_internet" + } + }, + "Categories": [ + "internet-exposed", + "trust-boundaries" + ], + "DependsOn": [], + "RelatedTo": [ + "networking_security_group_allows_rdp_from_internet" + ], + "Notes": "This check flags security groups with SSH (port 22) ingress rules allowing 0.0.0.0/0 or ::/0. Some architectures legitimately require public SSH access (bastion hosts, CI/CD runners with dynamic IPs). Review findings in context of your security architecture. Rules allowing SSH from specific IPs (e.g., 203.0.113.0/24) or from other security groups (remote_group_id) are not flagged. IPv6 rules (::/0) are also checked. Port ranges that include port 22 (e.g., 20-25) will trigger this check." +} diff --git a/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.py b/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.py new file mode 100644 index 0000000000..b878dd3fe0 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet.py @@ -0,0 +1,50 @@ +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.lib.security_groups import check_security_group_rule +from prowler.providers.openstack.services.networking.networking_client import ( + networking_client, +) + + +class networking_security_group_allows_ssh_from_internet(Check): + """Ensure security groups do not allow SSH from the Internet.""" + + def execute(self) -> List[CheckReportOpenStack]: + findings: List[CheckReportOpenStack] = [] + + for sg in networking_client.security_groups: + report = CheckReportOpenStack(metadata=self.metadata(), resource=sg) + report.resource_id = sg.id + report.resource_name = sg.name + report.region = sg.region + + # Check if any rule allows SSH from 0.0.0.0/0 or ::/0 + ssh_exposed = False + exposed_rules = [] + + for rule in sg.security_group_rules: + if check_security_group_rule( + rule=rule, + protocol="tcp", + ports=[22], + any_address=True, + direction="ingress", + ): + ssh_exposed = True + cidr = rule.remote_ip_prefix or "0.0.0.0/0" + exposed_rules.append( + f"rule {rule.id} ({rule.protocol}/{cidr}:{rule.port_range_min}-{rule.port_range_max})" + ) + + if ssh_exposed: + report.status = "FAIL" + rules_str = ", ".join(exposed_rules) + report.status_extended = f"Security group {sg.name} ({sg.id}) allows unrestricted SSH access (port 22) from the Internet via {rules_str}." + else: + report.status = "PASS" + report.status_extended = f"Security group {sg.name} ({sg.id}) does not allow SSH (port 22) from the Internet." + + findings.append(report) + + return findings diff --git a/prowler/providers/openstack/services/networking/networking_service.py b/prowler/providers/openstack/services/networking/networking_service.py new file mode 100644 index 0000000000..e4ac9105bc --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_service.py @@ -0,0 +1,278 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import List, Optional + +from openstack import exceptions as openstack_exceptions + +from prowler.lib.logger import logger +from prowler.providers.openstack.lib.service.service import OpenStackService + + +class Networking(OpenStackService): + """Service wrapper using openstacksdk network (Neutron) APIs.""" + + def __init__(self, provider) -> None: + super().__init__(__class__.__name__, provider) + self.security_groups: List[SecurityGroup] = [] + self.networks: List[NetworkResource] = [] + self.subnets: List[Subnet] = [] + self.ports: List[Port] = [] + self._list_security_groups() + self._list_networks() + self._list_subnets() + self._list_ports() + + def _list_security_groups(self) -> None: + """List all security groups with rules across all audited regions.""" + logger.info("Networking - Listing security groups...") + for region, conn in self.regional_connections.items(): + try: + for sg in conn.network.security_groups(): + # Parse security group rules + rules = [] + for rule in getattr(sg, "security_group_rules", []): + # Rules are returned as dictionaries, use .get() instead of getattr() + if isinstance(rule, dict): + rules.append( + SecurityGroupRule( + id=rule.get("id", ""), + security_group_id=rule.get("security_group_id", ""), + direction=rule.get("direction", "ingress"), + protocol=rule.get("protocol", None), + ethertype=rule.get("ethertype", "IPv4"), + port_range_min=rule.get("port_range_min", None), + port_range_max=rule.get("port_range_max", None), + remote_ip_prefix=rule.get("remote_ip_prefix", None), + remote_group_id=rule.get("remote_group_id", None), + ) + ) + else: + # Fallback for object-style rules + rules.append( + SecurityGroupRule( + id=getattr(rule, "id", ""), + security_group_id=getattr( + rule, "security_group_id", "" + ), + direction=getattr(rule, "direction", "ingress"), + protocol=getattr(rule, "protocol", None), + ethertype=getattr(rule, "ethertype", "IPv4"), + port_range_min=getattr( + rule, "port_range_min", None + ), + port_range_max=getattr( + rule, "port_range_max", None + ), + remote_ip_prefix=getattr( + rule, "remote_ip_prefix", None + ), + remote_group_id=getattr( + rule, "remote_group_id", None + ), + ) + ) + + # Check if this is a default security group + is_default = getattr(sg, "name", "") == "default" + + self.security_groups.append( + SecurityGroup( + id=getattr(sg, "id", ""), + name=getattr(sg, "name", ""), + description=getattr(sg, "description", ""), + security_group_rules=rules, + project_id=getattr(sg, "project_id", ""), + region=region, + is_default=is_default, + tags=getattr(sg, "tags", []), + ) + ) + except openstack_exceptions.SDKException as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Failed to list security groups in region {region}: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Unexpected error listing security groups in region {region}: {error}" + ) + + def _list_networks(self) -> None: + """List all networks across all audited regions.""" + logger.info("Networking - Listing networks...") + for region, conn in self.regional_connections.items(): + try: + for net in conn.network.networks(): + self.networks.append( + NetworkResource( + id=getattr(net, "id", ""), + name=getattr(net, "name", ""), + status=getattr(net, "status", ""), + admin_state_up=getattr(net, "is_admin_state_up", True), + shared=getattr(net, "is_shared", False), + external=getattr(net, "is_router_external", False), + port_security_enabled=getattr( + net, "is_port_security_enabled", True + ), + subnets=getattr(net, "subnet_ids", []), + project_id=getattr(net, "project_id", ""), + region=region, + tags=getattr(net, "tags", []), + ) + ) + except openstack_exceptions.SDKException as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Failed to list networks in region {region}: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Unexpected error listing networks in region {region}: {error}" + ) + + def _list_subnets(self) -> None: + """List all subnets across all audited regions.""" + logger.info("Networking - Listing subnets...") + for region, conn in self.regional_connections.items(): + try: + for subnet in conn.network.subnets(): + self.subnets.append( + Subnet( + id=getattr(subnet, "id", ""), + name=getattr(subnet, "name", ""), + network_id=getattr(subnet, "network_id", ""), + ip_version=getattr(subnet, "ip_version", 4), + cidr=getattr(subnet, "cidr", ""), + gateway_ip=getattr(subnet, "gateway_ip", None), + enable_dhcp=getattr(subnet, "is_dhcp_enabled", True), + dns_nameservers=getattr(subnet, "dns_nameservers", []), + project_id=getattr(subnet, "project_id", ""), + region=region, + ) + ) + except openstack_exceptions.SDKException as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Failed to list subnets in region {region}: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Unexpected error listing subnets in region {region}: {error}" + ) + + def _list_ports(self) -> None: + """List all ports across all audited regions.""" + logger.info("Networking - Listing ports...") + for region, conn in self.regional_connections.items(): + try: + for port in conn.network.ports(): + self.ports.append( + Port( + id=getattr(port, "id", ""), + name=getattr(port, "name", ""), + network_id=getattr(port, "network_id", ""), + mac_address=getattr(port, "mac_address", ""), + fixed_ips=getattr(port, "fixed_ips", []), + port_security_enabled=getattr( + port, "is_port_security_enabled", True + ), + security_groups=getattr(port, "security_groups", []), + device_owner=getattr(port, "device_owner", ""), + device_id=getattr(port, "device_id", ""), + project_id=getattr(port, "project_id", ""), + region=region, + ) + ) + except openstack_exceptions.SDKException as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Failed to list ports in region {region}: {error}" + ) + except Exception as error: + logger.error( + f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- " + f"Unexpected error listing ports in region {region}: {error}" + ) + + +@dataclass +class SecurityGroupRule: + """Represents an OpenStack security group rule.""" + + id: str + security_group_id: str + direction: str + protocol: Optional[str] + ethertype: str + port_range_min: Optional[int] + port_range_max: Optional[int] + remote_ip_prefix: Optional[str] + remote_group_id: Optional[str] + + +@dataclass +class SecurityGroup: + """Represents an OpenStack security group.""" + + id: str + name: str + description: str + security_group_rules: List[SecurityGroupRule] + project_id: str + region: str + is_default: bool + tags: List[str] + + +@dataclass +class NetworkResource: + """Represents an OpenStack network.""" + + id: str + name: str + status: str + admin_state_up: bool + shared: bool + external: bool + port_security_enabled: bool + subnets: List[str] + project_id: str + region: str + tags: List[str] + + +@dataclass +class Subnet: + """Represents an OpenStack subnet.""" + + id: str + name: str + network_id: str + ip_version: int + cidr: str + gateway_ip: Optional[str] + enable_dhcp: bool + dns_nameservers: List[str] + project_id: str + region: str + + +@dataclass +class Port: + """Represents an OpenStack network port.""" + + id: str + name: str + network_id: str + mac_address: str + fixed_ips: List[dict] + port_security_enabled: bool + security_groups: List[str] + device_owner: str + device_id: str + project_id: str + region: str diff --git a/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/__init__.py b/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.metadata.json b/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.metadata.json new file mode 100644 index 0000000000..8eb8b5b1ab --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.metadata.json @@ -0,0 +1,37 @@ +{ + "Provider": "openstack", + "CheckID": "networking_subnet_dhcp_disabled", + "CheckTitle": "DHCP is enabled on subnets", + "CheckType": [], + "ServiceName": "networking", + "SubServiceName": "", + "ResourceIdTemplate": "", + "Severity": "medium", + "ResourceType": "OS::Neutron::Subnet", + "ResourceGroup": "network", + "Description": "**OpenStack subnets** are evaluated to verify that **DHCP is enabled** (enable_dhcp=True). DHCP automatically assigns IP addresses, DNS servers, and gateway information to instances at boot. Subnets with DHCP disabled require manual configuration, increasing operational complexity and IP conflict risk. Enable DHCP to simplify deployment and ensure cloud-init functionality.", + "Risk": "Instances fail to acquire IP addresses and cannot communicate without manual intervention, breaking automated deployment pipelines (Heat, Terraform, Ansible). Manual configuration increases deployment time, error rates, and IP conflict risk. Cloud-init depends on DHCP for metadata service discovery; without DHCP, instances cannot retrieve SSH keys or user-data, breaking bootstrapping.", + "RelatedUrl": "", + "AdditionalURLs": [ + "https://docs.openstack.org/neutron/latest/admin/intro-os-networking.html", + "https://docs.openstack.org/neutron/2023.2/admin/config-dhcp-ha.html" + ], + "Remediation": { + "Code": { + "CLI": "openstack subnet set --dhcp ", + "NativeIaC": "", + "Other": "**Enable DHCP via Horizon Dashboard:**\n1. Navigate to **Project > Network > Networks**\n2. Click on the network containing the subnet\n3. Click the **Subnets** tab\n4. For each subnet, click **Edit Subnet**\n5. In the **Subnet Details** tab:\n - Ensure **Enable DHCP** checkbox is **checked**\n6. Click **Save**", + "Terraform": "```hcl\nresource \"openstack_networking_subnet_v2\" \"subnet\" {\n name = \"app-subnet\"\n network_id = openstack_networking_network_v2.network.id\n cidr = \"192.168.1.0/24\"\n ip_version = 4\n enable_dhcp = true # GOOD: DHCP enabled\n dns_nameservers = [\"8.8.8.8\", \"8.8.4.4\"]\n}\n```" + }, + "Recommendation": { + "Text": "Enable DHCP on all subnets unless there is a documented reason for static IP assignment. Use DHCP for automated IP management, simplified instance deployment, and proper cloud-init functionality. For environments requiring static IPs, use DHCP reservations or allowed-address-pairs instead of disabling DHCP entirely.", + "Url": "https://hub.prowler.com/check/networking_subnet_dhcp_disabled" + } + }, + "Categories": [ + "resilience" + ], + "DependsOn": [], + "RelatedTo": [], + "Notes": "This check flags subnets where `enable_dhcp == False`. Some environments intentionally disable DHCP for security (to prevent rogue DHCP servers) or to enforce static IP assignment. However, most cloud environments benefit from DHCP for automated instance configuration. Subnets connected to external provider networks or used for specific purposes (like load balancer VIPs) may legitimately have DHCP disabled." +} diff --git a/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.py b/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.py new file mode 100644 index 0000000000..e6f43d6a65 --- /dev/null +++ b/prowler/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled.py @@ -0,0 +1,42 @@ +"""OpenStack Network Subnet DHCP Check.""" + +from typing import List + +from prowler.lib.check.models import Check, CheckReportOpenStack +from prowler.providers.openstack.services.networking.networking_client import ( + networking_client, +) + + +class networking_subnet_dhcp_disabled(Check): + """Ensure DHCP is enabled on subnets.""" + + def execute(self) -> List[CheckReportOpenStack]: + """Execute networking_subnet_dhcp_disabled check. + + Iterates over all subnets and verifies that DHCP is enabled + to ensure instances can obtain IP addresses automatically. + + Returns: + list[CheckReportOpenStack]: List of findings for each subnet. + """ + findings: List[CheckReportOpenStack] = [] + + for subnet in networking_client.subnets: + report = CheckReportOpenStack(metadata=self.metadata(), resource=subnet) + report.resource_id = subnet.id + report.resource_name = subnet.name or f"subnet-{subnet.id[:8]}" + report.region = subnet.region + + if not subnet.enable_dhcp: + report.status = "FAIL" + report.status_extended = f"Subnet {subnet.name} ({subnet.id}) on network {subnet.network_id} has DHCP disabled, which may prevent instances from obtaining IP addresses automatically." + else: + report.status = "PASS" + report.status_extended = ( + f"Subnet {subnet.name} ({subnet.id}) has DHCP enabled." + ) + + findings.append(report) + + return findings diff --git a/tests/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down_test.py b/tests/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down_test.py new file mode 100644 index 0000000000..49f76e350f --- /dev/null +++ b/tests/providers/openstack/services/networking/networking_admin_state_down/networking_admin_state_down_test.py @@ -0,0 +1,128 @@ +"""Tests for network_admin_state_down check.""" + +from unittest import mock + +from prowler.providers.openstack.services.networking.networking_service import ( + NetworkResource, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_networking_admin_state_down: + def test_no_networks(self): + """Test when no networks exist.""" + network_client = mock.MagicMock() + network_client.networks = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down import ( + networking_admin_state_down, + ) + + check = networking_admin_state_down() + result = check.execute() + + assert len(result) == 0 + + def test_network_admin_state_up(self): + network_client = mock.MagicMock() + network_client.networks = [ + NetworkResource( + id="net-1", + name="production-network", + status="ACTIVE", + admin_state_up=True, + shared=False, + external=False, + port_security_enabled=True, + subnets=["subnet-1"], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down import ( + networking_admin_state_down, + ) + + check = networking_admin_state_down() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == "Network production-network (net-1) is administratively enabled." + ) + assert result[0].resource_id == "net-1" + assert result[0].resource_name == "production-network" + assert result[0].region == OPENSTACK_REGION + + def test_network_admin_state_down(self): + network_client = mock.MagicMock() + network_client.networks = [ + NetworkResource( + id="net-2", + name="disabled-network", + status="DOWN", + admin_state_up=False, + shared=False, + external=False, + port_security_enabled=True, + subnets=[], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down import ( + networking_admin_state_down, + ) + + check = networking_admin_state_down() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Network disabled-network (net-2) is administratively disabled (admin_state_up=False) and cannot carry traffic." + ) + assert result[0].resource_id == "net-2" + assert result[0].resource_name == "disabled-network" + assert result[0].region == OPENSTACK_REGION diff --git a/tests/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled_test.py b/tests/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled_test.py new file mode 100644 index 0000000000..10512a3dd2 --- /dev/null +++ b/tests/providers/openstack/services/networking/networking_port_security_disabled/networking_port_security_disabled_test.py @@ -0,0 +1,183 @@ +"""Tests for network_port_security_disabled check.""" + +from unittest import mock + +from prowler.providers.openstack.services.networking.networking_service import ( + NetworkResource, + Port, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_networking_port_security_disabled: + """Test suite for network_port_security_disabled check.""" + + def test_no_resources(self): + """Test when no networks or ports exist.""" + network_client = mock.MagicMock() + network_client.networks = [] + network_client.ports = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import ( + networking_port_security_disabled, + ) + + check = networking_port_security_disabled() + result = check.execute() + + assert len(result) == 0 + + def test_network_port_security_enabled(self): + """Test network with port security enabled (PASS).""" + network_client = mock.MagicMock() + network_client.networks = [ + NetworkResource( + id="net-1", + name="secure-network", + status="ACTIVE", + admin_state_up=True, + shared=False, + external=False, + port_security_enabled=True, + subnets=["subnet-1"], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + tags=[], + ) + ] + network_client.ports = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import ( + networking_port_security_disabled, + ) + + check = networking_port_security_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "net-1" + assert result[0].resource_name == "secure-network" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Network secure-network (net-1) has port security enabled." + ) + + def test_network_port_security_disabled(self): + """Test network with port security disabled (FAIL).""" + network_client = mock.MagicMock() + network_client.networks = [ + NetworkResource( + id="net-2", + name="insecure-network", + status="ACTIVE", + admin_state_up=True, + shared=False, + external=False, + port_security_enabled=False, + subnets=[], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + tags=[], + ) + ] + network_client.ports = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import ( + networking_port_security_disabled, + ) + + check = networking_port_security_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "net-2" + assert result[0].resource_name == "insecure-network" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Network insecure-network (net-2) has port security disabled, which allows MAC and IP address spoofing attacks." + ) + + def test_port_security_disabled(self): + """Test port with security disabled (FAIL).""" + network_client = mock.MagicMock() + network_client.networks = [] + network_client.ports = [ + Port( + id="port-1", + name="nfv-port", + network_id="net-1", + mac_address="fa:16:3e:00:00:01", + fixed_ips=[{"ip_address": "192.168.1.10"}], + port_security_enabled=False, + security_groups=[], + device_owner="compute:nova", + device_id="instance-1", + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import ( + networking_port_security_disabled, + ) + + check = networking_port_security_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "port-1" + assert result[0].resource_name == "nfv-port" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Port nfv-port (port-1) on network net-1 has port security disabled, which allows MAC and IP address spoofing." + ) diff --git a/tests/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet_test.py b/tests/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet_test.py new file mode 100644 index 0000000000..8d8731644e --- /dev/null +++ b/tests/providers/openstack/services/networking/networking_security_group_allows_all_ingress_from_internet/networking_security_group_allows_all_ingress_from_internet_test.py @@ -0,0 +1,495 @@ +"""Tests for networking_security_group_allows_all_ingress_from_internet check.""" + +from unittest import mock + +from prowler.providers.openstack.services.networking.networking_service import ( + SecurityGroup, + SecurityGroupRule, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + +CHECK_PATH = "prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet" + + +class Test_networking_security_group_allows_all_ingress_from_internet: + """Test suite for networking_security_group_allows_all_ingress_from_internet check.""" + + def test_no_security_groups(self): + """Test when no security groups exist.""" + network_client = mock.MagicMock() + network_client.security_groups = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 0 + + def test_security_group_with_specific_tcp_rule(self): + """Test SG with specific TCP port from internet (PASS - not all ingress).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-1", + name="web-servers", + description="Web servers security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-1", + security_group_id="sg-1", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=80, + port_range_max=80, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-1" + assert result[0].resource_name == "web-servers" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group web-servers (sg-1) does not allow all ingress traffic from the Internet." + ) + + def test_security_group_with_all_ingress_ipv4(self): + """Test SG with all ingress from IPv4 internet (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-2", + name="wide-open", + description="Wide open security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-all", + security_group_id="sg-2", + direction="ingress", + protocol=None, + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-2" + assert result[0].resource_name == "wide-open" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group wide-open (sg-2) allows all ingress traffic (any protocol, any port) from the Internet via rule rule-all (0.0.0.0/0)." + ) + + def test_security_group_with_all_ingress_ipv6(self): + """Test SG with all ingress from IPv6 internet (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-3", + name="ipv6-open", + description="IPv6 open security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-all-v6", + security_group_id="sg-3", + direction="ingress", + protocol=None, + ethertype="IPv6", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="::/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-3" + assert result[0].resource_name == "ipv6-open" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group ipv6-open (sg-3) allows all ingress traffic (any protocol, any port) from the Internet via rule rule-all-v6 (::/0)." + ) + + def test_security_group_with_all_ingress_from_security_group(self): + """Test SG with all ingress from another security group (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-4", + name="sg-referenced", + description="All ingress from SG", + security_group_rules=[ + SecurityGroupRule( + id="rule-sg-ref", + security_group_id="sg-4", + direction="ingress", + protocol=None, + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix=None, + remote_group_id="sg-bastion", + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-4" + assert result[0].resource_name == "sg-referenced" + assert result[0].region == OPENSTACK_REGION + + def test_security_group_with_no_prefix_no_group(self): + """Test SG with no remote_ip_prefix and no remote_group_id (FAIL). + + In OpenStack, a rule with no remote_ip_prefix and no remote_group_id + means traffic from any source is allowed. + """ + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-5", + name="implicit-open", + description="Implicitly open security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-implicit", + security_group_id="sg-5", + direction="ingress", + protocol=None, + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix=None, + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-5" + assert result[0].resource_name == "implicit-open" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group implicit-open (sg-5) allows all ingress traffic (any protocol, any port) from the Internet via rule rule-implicit (0.0.0.0/0)." + ) + + def test_security_group_with_all_protocol_egress(self): + """Test SG with all-protocol egress rule (PASS - egress only).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-6", + name="egress-open", + description="All egress allowed", + security_group_rules=[ + SecurityGroupRule( + id="rule-egress", + security_group_id="sg-6", + direction="egress", + protocol=None, + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-6" + assert result[0].resource_name == "egress-open" + assert result[0].region == OPENSTACK_REGION + + def test_security_group_with_all_tcp_from_internet(self): + """Test SG with all TCP (not all protocols) from internet (PASS). + + This check only flags rules with NO protocol restriction (all protocols). + A rule allowing all TCP ports is not flagged by this check. + """ + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-7", + name="all-tcp", + description="All TCP ports open", + security_group_rules=[ + SecurityGroupRule( + id="rule-all-tcp", + security_group_id="sg-7", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-7" + assert result[0].resource_name == "all-tcp" + assert result[0].region == OPENSTACK_REGION + + def test_multiple_security_groups_mixed(self): + """Test multiple security groups with mixed results.""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-pass", + name="secure-sg", + description="Secure SG", + security_group_rules=[ + SecurityGroupRule( + id="rule-pass", + security_group_id="sg-pass", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=443, + port_range_max=443, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ), + SecurityGroup( + id="sg-fail", + name="insecure-sg", + description="Insecure SG", + security_group_rules=[ + SecurityGroupRule( + id="rule-fail", + security_group_id="sg-fail", + direction="ingress", + protocol=None, + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + f"{CHECK_PATH}.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import ( + networking_security_group_allows_all_ingress_from_internet, + ) + + check = networking_security_group_allows_all_ingress_from_internet() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet_test.py b/tests/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet_test.py new file mode 100644 index 0000000000..52c5c28d6b --- /dev/null +++ b/tests/providers/openstack/services/networking/networking_security_group_allows_rdp_from_internet/networking_security_group_allows_rdp_from_internet_test.py @@ -0,0 +1,430 @@ +"""Tests for network_security_group_allows_rdp_from_internet check.""" + +from unittest import mock + +from prowler.providers.openstack.services.networking.networking_service import ( + SecurityGroup, + SecurityGroupRule, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_networking_security_group_allows_rdp_from_internet: + """Test suite for network_security_group_allows_rdp_from_internet check.""" + + def test_no_security_groups(self): + """Test when no security groups exist.""" + network_client = mock.MagicMock() + network_client.security_groups = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 0 + + def test_security_group_without_rdp_exposed(self): + """Test security group without RDP exposed to internet (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-1", + name="web-servers", + description="Web servers security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-1", + security_group_id="sg-1", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=80, + port_range_max=80, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-1" + assert result[0].resource_name == "web-servers" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group web-servers (sg-1) does not allow RDP (port 3389) from the Internet." + ) + + def test_security_group_with_rdp_from_ipv4_internet(self): + """Test security group with RDP exposed to IPv4 internet (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-2", + name="windows-servers", + description="Windows servers", + security_group_rules=[ + SecurityGroupRule( + id="rule-rdp", + security_group_id="sg-2", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=3389, + port_range_max=3389, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-2" + assert result[0].resource_name == "windows-servers" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group windows-servers (sg-2) allows unrestricted RDP access (port 3389) from the Internet via rule rule-rdp (tcp/0.0.0.0/0:3389-3389)." + ) + + def test_security_group_with_rdp_from_ipv6_internet(self): + """Test security group with RDP exposed to IPv6 internet (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-3", + name="ipv6-windows", + description="IPv6 Windows servers", + security_group_rules=[ + SecurityGroupRule( + id="rule-rdp-ipv6", + security_group_id="sg-3", + direction="ingress", + protocol="tcp", + ethertype="IPv6", + port_range_min=3389, + port_range_max=3389, + remote_ip_prefix="::/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-3" + assert result[0].resource_name == "ipv6-windows" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group ipv6-windows (sg-3) allows unrestricted RDP access (port 3389) from the Internet via rule rule-rdp-ipv6 (tcp/::/0:3389-3389)." + ) + + def test_security_group_with_rdp_from_restricted_cidr(self): + """Test security group with RDP from specific CIDR (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-4", + name="restricted-rdp", + description="RDP from specific IP", + security_group_rules=[ + SecurityGroupRule( + id="rule-restricted", + security_group_id="sg-4", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=3389, + port_range_max=3389, + remote_ip_prefix="198.51.100.0/24", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-4" + assert result[0].resource_name == "restricted-rdp" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group restricted-rdp (sg-4) does not allow RDP (port 3389) from the Internet." + ) + + def test_security_group_with_rdp_port_range(self): + """Test security group with port range including RDP (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-5", + name="port-range", + description="Port range including RDP", + security_group_rules=[ + SecurityGroupRule( + id="rule-range", + security_group_id="sg-5", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=3000, + port_range_max=4000, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-5" + assert result[0].resource_name == "port-range" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group port-range (sg-5) allows unrestricted RDP access (port 3389) from the Internet via rule rule-range (tcp/0.0.0.0/0:3000-4000)." + ) + + def test_security_group_with_all_tcp_from_internet(self): + """Test security group allowing all TCP ports from internet (FAIL). + + In OpenStack Neutron, protocol=tcp with port_range_min=None and + port_range_max=None means all TCP ports are allowed. + """ + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-all-tcp", + name="all-tcp-open", + description="All TCP ports open", + security_group_rules=[ + SecurityGroupRule( + id="rule-all-tcp", + security_group_id="sg-all-tcp", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-all-tcp" + assert result[0].resource_name == "all-tcp-open" + assert result[0].region == OPENSTACK_REGION + + def test_multiple_security_groups_mixed(self): + """Test multiple security groups with mixed results.""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-pass", + name="secure-sg", + description="Secure SG", + security_group_rules=[], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ), + SecurityGroup( + id="sg-fail", + name="insecure-sg", + description="Insecure SG", + security_group_rules=[ + SecurityGroupRule( + id="rule-fail", + security_group_id="sg-fail", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=3389, + port_range_max=3389, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import ( + networking_security_group_allows_rdp_from_internet, + ) + + check = networking_security_group_allows_rdp_from_internet() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet_test.py b/tests/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet_test.py new file mode 100644 index 0000000000..64ec77ebe0 --- /dev/null +++ b/tests/providers/openstack/services/networking/networking_security_group_allows_ssh_from_internet/networking_security_group_allows_ssh_from_internet_test.py @@ -0,0 +1,540 @@ +"""Tests for network_security_group_allows_ssh_from_internet check.""" + +from unittest import mock + +from prowler.providers.openstack.services.networking.networking_service import ( + SecurityGroup, + SecurityGroupRule, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_networking_security_group_allows_ssh_from_internet: + """Test suite for network_security_group_allows_ssh_from_internet check.""" + + def test_no_security_groups(self): + """Test when no security groups exist.""" + network_client = mock.MagicMock() + network_client.security_groups = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 0 + + def test_security_group_without_ssh_exposed(self): + """Test security group without SSH exposed to internet (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-1", + name="web-servers", + description="Web servers security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-1", + security_group_id="sg-1", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=80, + port_range_max=80, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-1" + assert result[0].resource_name == "web-servers" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group web-servers (sg-1) does not allow SSH (port 22) from the Internet." + ) + + def test_security_group_with_ssh_from_ipv4_internet(self): + """Test security group with SSH exposed to IPv4 internet (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-2", + name="admin-servers", + description="Admin servers", + security_group_rules=[ + SecurityGroupRule( + id="rule-ssh", + security_group_id="sg-2", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-2" + assert result[0].resource_name == "admin-servers" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group admin-servers (sg-2) allows unrestricted SSH access (port 22) from the Internet via rule rule-ssh (tcp/0.0.0.0/0:22-22)." + ) + + def test_security_group_with_ssh_from_ipv6_internet(self): + """Test security group with SSH exposed to IPv6 internet (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-3", + name="ipv6-servers", + description="IPv6 servers", + security_group_rules=[ + SecurityGroupRule( + id="rule-ssh-ipv6", + security_group_id="sg-3", + direction="ingress", + protocol="tcp", + ethertype="IPv6", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="::/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-3" + assert result[0].resource_name == "ipv6-servers" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group ipv6-servers (sg-3) allows unrestricted SSH access (port 22) from the Internet via rule rule-ssh-ipv6 (tcp/::/0:22-22)." + ) + + def test_security_group_with_ssh_from_restricted_cidr(self): + """Test security group with SSH from specific CIDR (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-4", + name="restricted-ssh", + description="SSH from specific IP", + security_group_rules=[ + SecurityGroupRule( + id="rule-restricted", + security_group_id="sg-4", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="203.0.113.0/24", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-4" + assert result[0].resource_name == "restricted-ssh" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group restricted-ssh (sg-4) does not allow SSH (port 22) from the Internet." + ) + + def test_security_group_with_ssh_port_range(self): + """Test security group with port range including SSH (FAIL).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-5", + name="port-range", + description="Port range including SSH", + security_group_rules=[ + SecurityGroupRule( + id="rule-range", + security_group_id="sg-5", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=20, + port_range_max=25, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-5" + assert result[0].resource_name == "port-range" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group port-range (sg-5) allows unrestricted SSH access (port 22) from the Internet via rule rule-range (tcp/0.0.0.0/0:20-25)." + ) + + def test_security_group_with_ssh_from_security_group(self): + """Test security group with SSH from another security group (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-6", + name="sg-referenced", + description="SSH from security group", + security_group_rules=[ + SecurityGroupRule( + id="rule-sg-ref", + security_group_id="sg-6", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=22, + port_range_max=22, + remote_ip_prefix=None, + remote_group_id="sg-bastion", + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-6" + assert result[0].resource_name == "sg-referenced" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group sg-referenced (sg-6) does not allow SSH (port 22) from the Internet." + ) + + def test_security_group_with_egress_ssh(self): + """Test security group with egress SSH rule (PASS).""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-7", + name="egress-only", + description="Egress SSH", + security_group_rules=[ + SecurityGroupRule( + id="rule-egress", + security_group_id="sg-7", + direction="egress", + protocol="tcp", + ethertype="IPv4", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "sg-7" + assert result[0].resource_name == "egress-only" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Security group egress-only (sg-7) does not allow SSH (port 22) from the Internet." + ) + + def test_security_group_with_all_tcp_from_internet(self): + """Test security group allowing all TCP ports from internet (FAIL). + + In OpenStack Neutron, protocol=tcp with port_range_min=None and + port_range_max=None means all TCP ports are allowed. + """ + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-all-tcp", + name="all-tcp-open", + description="All TCP ports open", + security_group_rules=[ + SecurityGroupRule( + id="rule-all-tcp", + security_group_id="sg-all-tcp", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=None, + port_range_max=None, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "sg-all-tcp" + assert result[0].resource_name == "all-tcp-open" + assert result[0].region == OPENSTACK_REGION + + def test_multiple_security_groups_mixed(self): + """Test multiple security groups with mixed results.""" + network_client = mock.MagicMock() + network_client.security_groups = [ + SecurityGroup( + id="sg-pass", + name="secure-sg", + description="Secure SG", + security_group_rules=[], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ), + SecurityGroup( + id="sg-fail", + name="insecure-sg", + description="Insecure SG", + security_group_rules=[ + SecurityGroupRule( + id="rule-fail", + security_group_id="sg-fail", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=22, + port_range_max=22, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ), + ], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + is_default=False, + tags=[], + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import ( + networking_security_group_allows_ssh_from_internet, + ) + + check = networking_security_group_allows_ssh_from_internet() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 diff --git a/tests/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled_test.py b/tests/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled_test.py new file mode 100644 index 0000000000..4375a991f9 --- /dev/null +++ b/tests/providers/openstack/services/networking/networking_subnet_dhcp_disabled/networking_subnet_dhcp_disabled_test.py @@ -0,0 +1,242 @@ +"""Tests for network_subnet_dhcp_disabled check.""" + +from unittest import mock + +from prowler.providers.openstack.services.networking.networking_service import Subnet +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class Test_networking_subnet_dhcp_disabled: + """Test suite for network_subnet_dhcp_disabled check.""" + + def test_no_subnets(self): + """Test when no subnets exist.""" + network_client = mock.MagicMock() + network_client.subnets = [] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import ( + networking_subnet_dhcp_disabled, + ) + + check = networking_subnet_dhcp_disabled() + result = check.execute() + + assert len(result) == 0 + + def test_subnet_dhcp_enabled(self): + """Test subnet with DHCP enabled (PASS).""" + network_client = mock.MagicMock() + network_client.subnets = [ + Subnet( + id="subnet-1", + name="production-subnet", + network_id="net-1", + ip_version=4, + cidr="192.168.1.0/24", + gateway_ip="192.168.1.1", + enable_dhcp=True, + dns_nameservers=["8.8.8.8", "8.8.4.4"], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import ( + networking_subnet_dhcp_disabled, + ) + + check = networking_subnet_dhcp_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "subnet-1" + assert result[0].resource_name == "production-subnet" + assert ( + result[0].status_extended + == "Subnet production-subnet (subnet-1) has DHCP enabled." + ) + assert result[0].region == OPENSTACK_REGION + + def test_subnet_dhcp_disabled(self): + """Test subnet with DHCP disabled (FAIL).""" + network_client = mock.MagicMock() + network_client.subnets = [ + Subnet( + id="subnet-2", + name="static-subnet", + network_id="net-2", + ip_version=4, + cidr="10.0.0.0/24", + gateway_ip="10.0.0.1", + enable_dhcp=False, + dns_nameservers=[], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import ( + networking_subnet_dhcp_disabled, + ) + + check = networking_subnet_dhcp_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "FAIL" + assert result[0].resource_id == "subnet-2" + assert result[0].resource_name == "static-subnet" + assert ( + result[0].status_extended + == "Subnet static-subnet (subnet-2) on network net-2 has DHCP disabled, which may prevent instances from obtaining IP addresses automatically." + ) + assert result[0].region == OPENSTACK_REGION + + def test_multiple_subnets_mixed_results(self): + """Test multiple subnets with mixed DHCP configurations.""" + network_client = mock.MagicMock() + network_client.subnets = [ + Subnet( + id="subnet-1", + name="dhcp-enabled-subnet", + network_id="net-1", + ip_version=4, + cidr="192.168.1.0/24", + gateway_ip="192.168.1.1", + enable_dhcp=True, + dns_nameservers=["8.8.8.8"], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + ), + Subnet( + id="subnet-2", + name="dhcp-disabled-subnet", + network_id="net-2", + ip_version=4, + cidr="10.0.0.0/24", + gateway_ip="10.0.0.1", + enable_dhcp=False, + dns_nameservers=[], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + ), + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import ( + networking_subnet_dhcp_disabled, + ) + + check = networking_subnet_dhcp_disabled() + result = check.execute() + + assert len(result) == 2 + assert len([r for r in result if r.status == "PASS"]) == 1 + assert len([r for r in result if r.status == "FAIL"]) == 1 + + pass_result = [r for r in result if r.status == "PASS"][0] + assert pass_result.resource_id == "subnet-1" + assert pass_result.resource_name == "dhcp-enabled-subnet" + assert pass_result.region == OPENSTACK_REGION + assert ( + pass_result.status_extended + == "Subnet dhcp-enabled-subnet (subnet-1) has DHCP enabled." + ) + + fail_result = [r for r in result if r.status == "FAIL"][0] + assert fail_result.resource_id == "subnet-2" + assert fail_result.resource_name == "dhcp-disabled-subnet" + assert fail_result.region == OPENSTACK_REGION + assert ( + fail_result.status_extended + == "Subnet dhcp-disabled-subnet (subnet-2) on network net-2 has DHCP disabled, which may prevent instances from obtaining IP addresses automatically." + ) + + def test_subnet_ipv6_dhcp_enabled(self): + """Test IPv6 subnet with DHCP enabled.""" + network_client = mock.MagicMock() + network_client.subnets = [ + Subnet( + id="subnet-ipv6", + name="ipv6-subnet", + network_id="net-1", + ip_version=6, + cidr="2001:db8::/64", + gateway_ip="2001:db8::1", + enable_dhcp=True, + dns_nameservers=["2001:4860:4860::8888"], + project_id=OPENSTACK_PROJECT_ID, + region=OPENSTACK_REGION, + ) + ] + + with ( + mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=set_mocked_openstack_provider(), + ), + mock.patch( + "prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client", + new=network_client, + ), + ): + from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import ( + networking_subnet_dhcp_disabled, + ) + + check = networking_subnet_dhcp_disabled() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert result[0].resource_id == "subnet-ipv6" + assert result[0].resource_name == "ipv6-subnet" + assert result[0].region == OPENSTACK_REGION + assert ( + result[0].status_extended + == "Subnet ipv6-subnet (subnet-ipv6) has DHCP enabled." + ) diff --git a/tests/providers/openstack/services/networking/openstack_networking_service_test.py b/tests/providers/openstack/services/networking/openstack_networking_service_test.py new file mode 100644 index 0000000000..e21acf32a2 --- /dev/null +++ b/tests/providers/openstack/services/networking/openstack_networking_service_test.py @@ -0,0 +1,542 @@ +"""Tests for OpenStack Network service.""" + +from unittest.mock import MagicMock, patch + +from openstack import exceptions as openstack_exceptions + +from prowler.providers.openstack.services.networking.networking_service import ( + Networking, + NetworkResource, + Port, + SecurityGroup, + SecurityGroupRule, + Subnet, +) +from tests.providers.openstack.openstack_fixtures import ( + OPENSTACK_PROJECT_ID, + OPENSTACK_REGION, + set_mocked_openstack_provider, +) + + +class TestNetworkingService: + """Test suite for Network service.""" + + def test_network_service_initialization(self): + """Test Network service initializes correctly.""" + provider = set_mocked_openstack_provider() + + with ( + patch.object(Networking, "_list_security_groups", return_value=[]), + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert network.service_name == "Networking" + assert network.provider == provider + assert network.connection == provider.connection + assert network.regional_connections == provider.regional_connections + assert network.audited_regions == [OPENSTACK_REGION] + assert network.region == OPENSTACK_REGION + assert network.project_id == OPENSTACK_PROJECT_ID + assert network.security_groups == [] + assert network.networks == [] + assert network.subnets == [] + assert network.ports == [] + + def test_network_list_security_groups_success(self): + """Test listing security groups successfully.""" + provider = set_mocked_openstack_provider() + + # Mock security group rule + mock_rule = MagicMock() + mock_rule.id = "rule-1" + mock_rule.security_group_id = "sg-1" + mock_rule.direction = "ingress" + mock_rule.protocol = "tcp" + mock_rule.ethertype = "IPv4" + mock_rule.port_range_min = 22 + mock_rule.port_range_max = 22 + mock_rule.remote_ip_prefix = "0.0.0.0/0" + mock_rule.remote_group_id = None + + # Mock security group + mock_sg = MagicMock() + mock_sg.id = "sg-1" + mock_sg.name = "web-servers" + mock_sg.description = "Security group for web servers" + mock_sg.security_group_rules = [mock_rule] + mock_sg.project_id = OPENSTACK_PROJECT_ID + mock_sg.tags = ["production"] + + provider.connection.network.security_groups.return_value = [mock_sg] + + with ( + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert len(network.security_groups) == 1 + assert isinstance(network.security_groups[0], SecurityGroup) + assert network.security_groups[0].id == "sg-1" + assert network.security_groups[0].name == "web-servers" + assert network.security_groups[0].is_default is False + assert len(network.security_groups[0].security_group_rules) == 1 + + rule = network.security_groups[0].security_group_rules[0] + assert isinstance(rule, SecurityGroupRule) + assert rule.id == "rule-1" + assert rule.direction == "ingress" + assert rule.protocol == "tcp" + assert rule.port_range_min == 22 + assert rule.port_range_max == 22 + assert rule.remote_ip_prefix == "0.0.0.0/0" + + def test_network_list_security_groups_default(self): + """Test listing default security group.""" + provider = set_mocked_openstack_provider() + + mock_sg = MagicMock() + mock_sg.id = "sg-default" + mock_sg.name = "default" + mock_sg.description = "Default security group" + mock_sg.security_group_rules = [] + mock_sg.project_id = OPENSTACK_PROJECT_ID + mock_sg.tags = [] + + provider.connection.network.security_groups.return_value = [mock_sg] + + with ( + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert len(network.security_groups) == 1 + assert network.security_groups[0].name == "default" + assert network.security_groups[0].is_default is True + + def test_network_list_security_groups_empty(self): + """Test listing security groups when none exist.""" + provider = set_mocked_openstack_provider() + provider.connection.network.security_groups.return_value = [] + + with ( + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert network.security_groups == [] + + def test_network_list_security_groups_sdk_exception(self): + """Test handling SDKException when listing security groups.""" + provider = set_mocked_openstack_provider() + provider.connection.network.security_groups.side_effect = ( + openstack_exceptions.SDKException("API error") + ) + + with ( + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert network.security_groups == [] + + def test_network_list_networks_success(self): + """Test listing networks successfully.""" + provider = set_mocked_openstack_provider() + + mock_net = MagicMock() + mock_net.id = "net-1" + mock_net.name = "private-network" + mock_net.status = "ACTIVE" + mock_net.is_admin_state_up = True + mock_net.is_shared = False + mock_net.is_router_external = False + mock_net.is_port_security_enabled = True + mock_net.subnet_ids = ["subnet-1", "subnet-2"] + mock_net.project_id = OPENSTACK_PROJECT_ID + mock_net.tags = [] + + provider.connection.network.networks.return_value = [mock_net] + + with ( + patch.object(Networking, "_list_security_groups", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert len(network.networks) == 1 + assert isinstance(network.networks[0], NetworkResource) + assert network.networks[0].id == "net-1" + assert network.networks[0].name == "private-network" + assert network.networks[0].port_security_enabled is True + + def test_network_list_subnets_success(self): + """Test listing subnets successfully.""" + provider = set_mocked_openstack_provider() + + mock_subnet = MagicMock() + mock_subnet.id = "subnet-1" + mock_subnet.name = "private-subnet" + mock_subnet.network_id = "net-1" + mock_subnet.ip_version = 4 + mock_subnet.cidr = "192.168.1.0/24" + mock_subnet.gateway_ip = "192.168.1.1" + mock_subnet.is_dhcp_enabled = True + mock_subnet.dns_nameservers = ["8.8.8.8", "8.8.4.4"] + mock_subnet.project_id = OPENSTACK_PROJECT_ID + + provider.connection.network.subnets.return_value = [mock_subnet] + + with ( + patch.object(Networking, "_list_security_groups", return_value=[]), + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_ports", return_value=[]), + ): + network = Networking(provider) + + assert len(network.subnets) == 1 + assert isinstance(network.subnets[0], Subnet) + assert network.subnets[0].id == "subnet-1" + assert network.subnets[0].cidr == "192.168.1.0/24" + + def test_network_list_ports_success(self): + """Test listing ports successfully.""" + provider = set_mocked_openstack_provider() + + mock_port = MagicMock() + mock_port.id = "port-1" + mock_port.name = "instance-port" + mock_port.network_id = "net-1" + mock_port.mac_address = "fa:16:3e:00:00:01" + mock_port.fixed_ips = [{"ip_address": "192.168.1.10", "subnet_id": "subnet-1"}] + mock_port.is_port_security_enabled = True + mock_port.security_groups = ["sg-1"] + mock_port.device_owner = "compute:nova" + mock_port.device_id = "instance-1" + mock_port.project_id = OPENSTACK_PROJECT_ID + + provider.connection.network.ports.return_value = [mock_port] + + with ( + patch.object(Networking, "_list_security_groups", return_value=[]), + patch.object(Networking, "_list_networks", return_value=[]), + patch.object(Networking, "_list_subnets", return_value=[]), + ): + network = Networking(provider) + + assert len(network.ports) == 1 + assert isinstance(network.ports[0], Port) + assert network.ports[0].id == "port-1" + assert network.ports[0].port_security_enabled is True + assert network.ports[0].security_groups == ["sg-1"] + + def test_network_dataclasses_attributes(self): + """Test dataclass attributes are correctly set.""" + rule = SecurityGroupRule( + id="rule-1", + security_group_id="sg-1", + direction="ingress", + protocol="tcp", + ethertype="IPv4", + port_range_min=80, + port_range_max=80, + remote_ip_prefix="0.0.0.0/0", + remote_group_id=None, + ) + + assert rule.id == "rule-1" + assert rule.protocol == "tcp" + assert rule.port_range_min == 80 + + sg = SecurityGroup( + id="sg-1", + name="web", + description="Web servers", + security_group_rules=[rule], + project_id="project-1", + region="RegionOne", + is_default=False, + tags=["prod"], + ) + + assert sg.id == "sg-1" + assert len(sg.security_group_rules) == 1 + assert sg.is_default is False + + def test_networking_list_security_groups_multi_region(self): + """Test listing security groups across multiple regions.""" + provider = set_mocked_openstack_provider() + + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_sg_uk = MagicMock() + mock_sg_uk.id = "sg-uk" + mock_sg_uk.name = "web-uk" + mock_sg_uk.description = "UK web servers" + mock_sg_uk.security_group_rules = [] + mock_sg_uk.project_id = OPENSTACK_PROJECT_ID + mock_sg_uk.tags = [] + + mock_sg_de = MagicMock() + mock_sg_de.id = "sg-de" + mock_sg_de.name = "web-de" + mock_sg_de.description = "DE web servers" + mock_sg_de.security_group_rules = [] + mock_sg_de.project_id = OPENSTACK_PROJECT_ID + mock_sg_de.tags = [] + + mock_conn_uk1.network.security_groups.return_value = [mock_sg_uk] + mock_conn_de1.network.security_groups.return_value = [mock_sg_de] + mock_conn_uk1.network.networks.return_value = [] + mock_conn_de1.network.networks.return_value = [] + mock_conn_uk1.network.subnets.return_value = [] + mock_conn_de1.network.subnets.return_value = [] + mock_conn_uk1.network.ports.return_value = [] + mock_conn_de1.network.ports.return_value = [] + + network = Networking(provider) + + assert len(network.security_groups) == 2 + uk_sg = next(sg for sg in network.security_groups if sg.id == "sg-uk") + de_sg = next(sg for sg in network.security_groups if sg.id == "sg-de") + assert uk_sg.region == "UK1" + assert de_sg.region == "DE1" + + def test_networking_list_networks_multi_region(self): + """Test listing networks across multiple regions.""" + provider = set_mocked_openstack_provider() + + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_net_uk = MagicMock() + mock_net_uk.id = "net-uk" + mock_net_uk.name = "private-uk" + mock_net_uk.status = "ACTIVE" + mock_net_uk.is_admin_state_up = True + mock_net_uk.is_shared = False + mock_net_uk.is_router_external = False + mock_net_uk.is_port_security_enabled = True + mock_net_uk.subnet_ids = ["subnet-uk"] + mock_net_uk.project_id = OPENSTACK_PROJECT_ID + mock_net_uk.tags = [] + + mock_net_de = MagicMock() + mock_net_de.id = "net-de" + mock_net_de.name = "private-de" + mock_net_de.status = "ACTIVE" + mock_net_de.is_admin_state_up = True + mock_net_de.is_shared = False + mock_net_de.is_router_external = False + mock_net_de.is_port_security_enabled = True + mock_net_de.subnet_ids = ["subnet-de"] + mock_net_de.project_id = OPENSTACK_PROJECT_ID + mock_net_de.tags = [] + + mock_conn_uk1.network.security_groups.return_value = [] + mock_conn_de1.network.security_groups.return_value = [] + mock_conn_uk1.network.networks.return_value = [mock_net_uk] + mock_conn_de1.network.networks.return_value = [mock_net_de] + mock_conn_uk1.network.subnets.return_value = [] + mock_conn_de1.network.subnets.return_value = [] + mock_conn_uk1.network.ports.return_value = [] + mock_conn_de1.network.ports.return_value = [] + + network = Networking(provider) + + assert len(network.networks) == 2 + uk_net = next(n for n in network.networks if n.id == "net-uk") + de_net = next(n for n in network.networks if n.id == "net-de") + assert uk_net.region == "UK1" + assert de_net.region == "DE1" + + def test_networking_list_subnets_multi_region(self): + """Test listing subnets across multiple regions.""" + provider = set_mocked_openstack_provider() + + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_subnet_uk = MagicMock() + mock_subnet_uk.id = "subnet-uk" + mock_subnet_uk.name = "subnet-uk" + mock_subnet_uk.network_id = "net-uk" + mock_subnet_uk.ip_version = 4 + mock_subnet_uk.cidr = "10.0.0.0/24" + mock_subnet_uk.gateway_ip = "10.0.0.1" + mock_subnet_uk.is_dhcp_enabled = True + mock_subnet_uk.dns_nameservers = ["8.8.8.8"] + mock_subnet_uk.project_id = OPENSTACK_PROJECT_ID + + mock_subnet_de = MagicMock() + mock_subnet_de.id = "subnet-de" + mock_subnet_de.name = "subnet-de" + mock_subnet_de.network_id = "net-de" + mock_subnet_de.ip_version = 4 + mock_subnet_de.cidr = "10.1.0.0/24" + mock_subnet_de.gateway_ip = "10.1.0.1" + mock_subnet_de.is_dhcp_enabled = True + mock_subnet_de.dns_nameservers = ["8.8.4.4"] + mock_subnet_de.project_id = OPENSTACK_PROJECT_ID + + mock_conn_uk1.network.security_groups.return_value = [] + mock_conn_de1.network.security_groups.return_value = [] + mock_conn_uk1.network.networks.return_value = [] + mock_conn_de1.network.networks.return_value = [] + mock_conn_uk1.network.subnets.return_value = [mock_subnet_uk] + mock_conn_de1.network.subnets.return_value = [mock_subnet_de] + mock_conn_uk1.network.ports.return_value = [] + mock_conn_de1.network.ports.return_value = [] + + network = Networking(provider) + + assert len(network.subnets) == 2 + uk_subnet = next(s for s in network.subnets if s.id == "subnet-uk") + de_subnet = next(s for s in network.subnets if s.id == "subnet-de") + assert uk_subnet.region == "UK1" + assert de_subnet.region == "DE1" + + def test_networking_list_ports_multi_region(self): + """Test listing ports across multiple regions.""" + provider = set_mocked_openstack_provider() + + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_port_uk = MagicMock() + mock_port_uk.id = "port-uk" + mock_port_uk.name = "port-uk" + mock_port_uk.network_id = "net-uk" + mock_port_uk.mac_address = "fa:16:3e:00:00:01" + mock_port_uk.fixed_ips = [{"ip_address": "10.0.0.10", "subnet_id": "subnet-uk"}] + mock_port_uk.is_port_security_enabled = True + mock_port_uk.security_groups = ["sg-uk"] + mock_port_uk.device_owner = "compute:nova" + mock_port_uk.device_id = "instance-uk" + mock_port_uk.project_id = OPENSTACK_PROJECT_ID + + mock_port_de = MagicMock() + mock_port_de.id = "port-de" + mock_port_de.name = "port-de" + mock_port_de.network_id = "net-de" + mock_port_de.mac_address = "fa:16:3e:00:00:02" + mock_port_de.fixed_ips = [{"ip_address": "10.1.0.10", "subnet_id": "subnet-de"}] + mock_port_de.is_port_security_enabled = True + mock_port_de.security_groups = ["sg-de"] + mock_port_de.device_owner = "compute:nova" + mock_port_de.device_id = "instance-de" + mock_port_de.project_id = OPENSTACK_PROJECT_ID + + mock_conn_uk1.network.security_groups.return_value = [] + mock_conn_de1.network.security_groups.return_value = [] + mock_conn_uk1.network.networks.return_value = [] + mock_conn_de1.network.networks.return_value = [] + mock_conn_uk1.network.subnets.return_value = [] + mock_conn_de1.network.subnets.return_value = [] + mock_conn_uk1.network.ports.return_value = [mock_port_uk] + mock_conn_de1.network.ports.return_value = [mock_port_de] + + network = Networking(provider) + + assert len(network.ports) == 2 + uk_port = next(p for p in network.ports if p.id == "port-uk") + de_port = next(p for p in network.ports if p.id == "port-de") + assert uk_port.region == "UK1" + assert de_port.region == "DE1" + + def test_networking_multi_region_partial_failure(self): + """Test that a failing region doesn't prevent other regions from being listed.""" + provider = set_mocked_openstack_provider() + + mock_conn_ok = MagicMock() + mock_conn_fail = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_ok, "DE1": mock_conn_fail} + + mock_sg = MagicMock() + mock_sg.id = "sg-uk" + mock_sg.name = "web-uk" + mock_sg.description = "UK web servers" + mock_sg.security_group_rules = [] + mock_sg.project_id = OPENSTACK_PROJECT_ID + mock_sg.tags = [] + + mock_conn_ok.network.security_groups.return_value = [mock_sg] + mock_conn_fail.network.security_groups.side_effect = ( + openstack_exceptions.SDKException("API error in DE1") + ) + mock_conn_ok.network.networks.return_value = [] + mock_conn_fail.network.networks.side_effect = openstack_exceptions.SDKException( + "API error in DE1" + ) + mock_conn_ok.network.subnets.return_value = [] + mock_conn_fail.network.subnets.side_effect = openstack_exceptions.SDKException( + "API error in DE1" + ) + mock_conn_ok.network.ports.return_value = [] + mock_conn_fail.network.ports.side_effect = openstack_exceptions.SDKException( + "API error in DE1" + ) + + network = Networking(provider) + + assert len(network.security_groups) == 1 + assert network.security_groups[0].id == "sg-uk" + assert network.security_groups[0].region == "UK1" + + def test_networking_multi_region_one_empty(self): + """Test multi-region where one region has resources and the other is empty.""" + provider = set_mocked_openstack_provider() + + mock_conn_uk1 = MagicMock() + mock_conn_de1 = MagicMock() + + provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1} + + mock_net = MagicMock() + mock_net.id = "net-uk" + mock_net.name = "private-uk" + mock_net.status = "ACTIVE" + mock_net.is_admin_state_up = True + mock_net.is_shared = False + mock_net.is_router_external = False + mock_net.is_port_security_enabled = True + mock_net.subnet_ids = [] + mock_net.project_id = OPENSTACK_PROJECT_ID + mock_net.tags = [] + + mock_conn_uk1.network.security_groups.return_value = [] + mock_conn_de1.network.security_groups.return_value = [] + mock_conn_uk1.network.networks.return_value = [mock_net] + mock_conn_de1.network.networks.return_value = [] + mock_conn_uk1.network.subnets.return_value = [] + mock_conn_de1.network.subnets.return_value = [] + mock_conn_uk1.network.ports.return_value = [] + mock_conn_de1.network.ports.return_value = [] + + network = Networking(provider) + + assert len(network.networks) == 1 + assert network.networks[0].id == "net-uk" + assert network.networks[0].region == "UK1"