feat(openstack): add networking service with 6 checks (#9970)

Co-authored-by: Andoni Alonso <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Daniel Barranquero
2026-03-02 11:55:37 +01:00
committed by GitHub
parent 8adb4f43ad
commit d3ba93f0c0
30 changed files with 3513 additions and 0 deletions

View File

@@ -0,0 +1,128 @@
"""Tests for network_admin_state_down check."""
from unittest import mock
from prowler.providers.openstack.services.networking.networking_service import (
NetworkResource,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_networking_admin_state_down:
def test_no_networks(self):
"""Test when no networks exist."""
network_client = mock.MagicMock()
network_client.networks = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down import (
networking_admin_state_down,
)
check = networking_admin_state_down()
result = check.execute()
assert len(result) == 0
def test_network_admin_state_up(self):
network_client = mock.MagicMock()
network_client.networks = [
NetworkResource(
id="net-1",
name="production-network",
status="ACTIVE",
admin_state_up=True,
shared=False,
external=False,
port_security_enabled=True,
subnets=["subnet-1"],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down import (
networking_admin_state_down,
)
check = networking_admin_state_down()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Network production-network (net-1) is administratively enabled."
)
assert result[0].resource_id == "net-1"
assert result[0].resource_name == "production-network"
assert result[0].region == OPENSTACK_REGION
def test_network_admin_state_down(self):
network_client = mock.MagicMock()
network_client.networks = [
NetworkResource(
id="net-2",
name="disabled-network",
status="DOWN",
admin_state_up=False,
shared=False,
external=False,
port_security_enabled=True,
subnets=[],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_admin_state_down.networking_admin_state_down import (
networking_admin_state_down,
)
check = networking_admin_state_down()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Network disabled-network (net-2) is administratively disabled (admin_state_up=False) and cannot carry traffic."
)
assert result[0].resource_id == "net-2"
assert result[0].resource_name == "disabled-network"
assert result[0].region == OPENSTACK_REGION

View File

@@ -0,0 +1,183 @@
"""Tests for network_port_security_disabled check."""
from unittest import mock
from prowler.providers.openstack.services.networking.networking_service import (
NetworkResource,
Port,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_networking_port_security_disabled:
"""Test suite for network_port_security_disabled check."""
def test_no_resources(self):
"""Test when no networks or ports exist."""
network_client = mock.MagicMock()
network_client.networks = []
network_client.ports = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import (
networking_port_security_disabled,
)
check = networking_port_security_disabled()
result = check.execute()
assert len(result) == 0
def test_network_port_security_enabled(self):
"""Test network with port security enabled (PASS)."""
network_client = mock.MagicMock()
network_client.networks = [
NetworkResource(
id="net-1",
name="secure-network",
status="ACTIVE",
admin_state_up=True,
shared=False,
external=False,
port_security_enabled=True,
subnets=["subnet-1"],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
tags=[],
)
]
network_client.ports = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import (
networking_port_security_disabled,
)
check = networking_port_security_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "net-1"
assert result[0].resource_name == "secure-network"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Network secure-network (net-1) has port security enabled."
)
def test_network_port_security_disabled(self):
"""Test network with port security disabled (FAIL)."""
network_client = mock.MagicMock()
network_client.networks = [
NetworkResource(
id="net-2",
name="insecure-network",
status="ACTIVE",
admin_state_up=True,
shared=False,
external=False,
port_security_enabled=False,
subnets=[],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
tags=[],
)
]
network_client.ports = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import (
networking_port_security_disabled,
)
check = networking_port_security_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "net-2"
assert result[0].resource_name == "insecure-network"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Network insecure-network (net-2) has port security disabled, which allows MAC and IP address spoofing attacks."
)
def test_port_security_disabled(self):
"""Test port with security disabled (FAIL)."""
network_client = mock.MagicMock()
network_client.networks = []
network_client.ports = [
Port(
id="port-1",
name="nfv-port",
network_id="net-1",
mac_address="fa:16:3e:00:00:01",
fixed_ips=[{"ip_address": "192.168.1.10"}],
port_security_enabled=False,
security_groups=[],
device_owner="compute:nova",
device_id="instance-1",
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_port_security_disabled.networking_port_security_disabled import (
networking_port_security_disabled,
)
check = networking_port_security_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "port-1"
assert result[0].resource_name == "nfv-port"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Port nfv-port (port-1) on network net-1 has port security disabled, which allows MAC and IP address spoofing."
)

View File

@@ -0,0 +1,495 @@
"""Tests for networking_security_group_allows_all_ingress_from_internet check."""
from unittest import mock
from prowler.providers.openstack.services.networking.networking_service import (
SecurityGroup,
SecurityGroupRule,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
CHECK_PATH = "prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet"
class Test_networking_security_group_allows_all_ingress_from_internet:
"""Test suite for networking_security_group_allows_all_ingress_from_internet check."""
def test_no_security_groups(self):
"""Test when no security groups exist."""
network_client = mock.MagicMock()
network_client.security_groups = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 0
def test_security_group_with_specific_tcp_rule(self):
"""Test SG with specific TCP port from internet (PASS - not all ingress)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-1",
name="web-servers",
description="Web servers security group",
security_group_rules=[
SecurityGroupRule(
id="rule-1",
security_group_id="sg-1",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=80,
port_range_max=80,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-1"
assert result[0].resource_name == "web-servers"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group web-servers (sg-1) does not allow all ingress traffic from the Internet."
)
def test_security_group_with_all_ingress_ipv4(self):
"""Test SG with all ingress from IPv4 internet (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-2",
name="wide-open",
description="Wide open security group",
security_group_rules=[
SecurityGroupRule(
id="rule-all",
security_group_id="sg-2",
direction="ingress",
protocol=None,
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-2"
assert result[0].resource_name == "wide-open"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group wide-open (sg-2) allows all ingress traffic (any protocol, any port) from the Internet via rule rule-all (0.0.0.0/0)."
)
def test_security_group_with_all_ingress_ipv6(self):
"""Test SG with all ingress from IPv6 internet (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-3",
name="ipv6-open",
description="IPv6 open security group",
security_group_rules=[
SecurityGroupRule(
id="rule-all-v6",
security_group_id="sg-3",
direction="ingress",
protocol=None,
ethertype="IPv6",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="::/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-3"
assert result[0].resource_name == "ipv6-open"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group ipv6-open (sg-3) allows all ingress traffic (any protocol, any port) from the Internet via rule rule-all-v6 (::/0)."
)
def test_security_group_with_all_ingress_from_security_group(self):
"""Test SG with all ingress from another security group (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-4",
name="sg-referenced",
description="All ingress from SG",
security_group_rules=[
SecurityGroupRule(
id="rule-sg-ref",
security_group_id="sg-4",
direction="ingress",
protocol=None,
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix=None,
remote_group_id="sg-bastion",
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-4"
assert result[0].resource_name == "sg-referenced"
assert result[0].region == OPENSTACK_REGION
def test_security_group_with_no_prefix_no_group(self):
"""Test SG with no remote_ip_prefix and no remote_group_id (FAIL).
In OpenStack, a rule with no remote_ip_prefix and no remote_group_id
means traffic from any source is allowed.
"""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-5",
name="implicit-open",
description="Implicitly open security group",
security_group_rules=[
SecurityGroupRule(
id="rule-implicit",
security_group_id="sg-5",
direction="ingress",
protocol=None,
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix=None,
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-5"
assert result[0].resource_name == "implicit-open"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group implicit-open (sg-5) allows all ingress traffic (any protocol, any port) from the Internet via rule rule-implicit (0.0.0.0/0)."
)
def test_security_group_with_all_protocol_egress(self):
"""Test SG with all-protocol egress rule (PASS - egress only)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-6",
name="egress-open",
description="All egress allowed",
security_group_rules=[
SecurityGroupRule(
id="rule-egress",
security_group_id="sg-6",
direction="egress",
protocol=None,
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-6"
assert result[0].resource_name == "egress-open"
assert result[0].region == OPENSTACK_REGION
def test_security_group_with_all_tcp_from_internet(self):
"""Test SG with all TCP (not all protocols) from internet (PASS).
This check only flags rules with NO protocol restriction (all protocols).
A rule allowing all TCP ports is not flagged by this check.
"""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-7",
name="all-tcp",
description="All TCP ports open",
security_group_rules=[
SecurityGroupRule(
id="rule-all-tcp",
security_group_id="sg-7",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-7"
assert result[0].resource_name == "all-tcp"
assert result[0].region == OPENSTACK_REGION
def test_multiple_security_groups_mixed(self):
"""Test multiple security groups with mixed results."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-pass",
name="secure-sg",
description="Secure SG",
security_group_rules=[
SecurityGroupRule(
id="rule-pass",
security_group_id="sg-pass",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=443,
port_range_max=443,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
),
SecurityGroup(
id="sg-fail",
name="insecure-sg",
description="Insecure SG",
security_group_rules=[
SecurityGroupRule(
id="rule-fail",
security_group_id="sg-fail",
direction="ingress",
protocol=None,
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
f"{CHECK_PATH}.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_all_ingress_from_internet.networking_security_group_allows_all_ingress_from_internet import (
networking_security_group_allows_all_ingress_from_internet,
)
check = networking_security_group_allows_all_ingress_from_internet()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,430 @@
"""Tests for network_security_group_allows_rdp_from_internet check."""
from unittest import mock
from prowler.providers.openstack.services.networking.networking_service import (
SecurityGroup,
SecurityGroupRule,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_networking_security_group_allows_rdp_from_internet:
"""Test suite for network_security_group_allows_rdp_from_internet check."""
def test_no_security_groups(self):
"""Test when no security groups exist."""
network_client = mock.MagicMock()
network_client.security_groups = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 0
def test_security_group_without_rdp_exposed(self):
"""Test security group without RDP exposed to internet (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-1",
name="web-servers",
description="Web servers security group",
security_group_rules=[
SecurityGroupRule(
id="rule-1",
security_group_id="sg-1",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=80,
port_range_max=80,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-1"
assert result[0].resource_name == "web-servers"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group web-servers (sg-1) does not allow RDP (port 3389) from the Internet."
)
def test_security_group_with_rdp_from_ipv4_internet(self):
"""Test security group with RDP exposed to IPv4 internet (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-2",
name="windows-servers",
description="Windows servers",
security_group_rules=[
SecurityGroupRule(
id="rule-rdp",
security_group_id="sg-2",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=3389,
port_range_max=3389,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-2"
assert result[0].resource_name == "windows-servers"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group windows-servers (sg-2) allows unrestricted RDP access (port 3389) from the Internet via rule rule-rdp (tcp/0.0.0.0/0:3389-3389)."
)
def test_security_group_with_rdp_from_ipv6_internet(self):
"""Test security group with RDP exposed to IPv6 internet (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-3",
name="ipv6-windows",
description="IPv6 Windows servers",
security_group_rules=[
SecurityGroupRule(
id="rule-rdp-ipv6",
security_group_id="sg-3",
direction="ingress",
protocol="tcp",
ethertype="IPv6",
port_range_min=3389,
port_range_max=3389,
remote_ip_prefix="::/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-3"
assert result[0].resource_name == "ipv6-windows"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group ipv6-windows (sg-3) allows unrestricted RDP access (port 3389) from the Internet via rule rule-rdp-ipv6 (tcp/::/0:3389-3389)."
)
def test_security_group_with_rdp_from_restricted_cidr(self):
"""Test security group with RDP from specific CIDR (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-4",
name="restricted-rdp",
description="RDP from specific IP",
security_group_rules=[
SecurityGroupRule(
id="rule-restricted",
security_group_id="sg-4",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=3389,
port_range_max=3389,
remote_ip_prefix="198.51.100.0/24",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-4"
assert result[0].resource_name == "restricted-rdp"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group restricted-rdp (sg-4) does not allow RDP (port 3389) from the Internet."
)
def test_security_group_with_rdp_port_range(self):
"""Test security group with port range including RDP (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-5",
name="port-range",
description="Port range including RDP",
security_group_rules=[
SecurityGroupRule(
id="rule-range",
security_group_id="sg-5",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=3000,
port_range_max=4000,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-5"
assert result[0].resource_name == "port-range"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group port-range (sg-5) allows unrestricted RDP access (port 3389) from the Internet via rule rule-range (tcp/0.0.0.0/0:3000-4000)."
)
def test_security_group_with_all_tcp_from_internet(self):
"""Test security group allowing all TCP ports from internet (FAIL).
In OpenStack Neutron, protocol=tcp with port_range_min=None and
port_range_max=None means all TCP ports are allowed.
"""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-all-tcp",
name="all-tcp-open",
description="All TCP ports open",
security_group_rules=[
SecurityGroupRule(
id="rule-all-tcp",
security_group_id="sg-all-tcp",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-all-tcp"
assert result[0].resource_name == "all-tcp-open"
assert result[0].region == OPENSTACK_REGION
def test_multiple_security_groups_mixed(self):
"""Test multiple security groups with mixed results."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-pass",
name="secure-sg",
description="Secure SG",
security_group_rules=[],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
),
SecurityGroup(
id="sg-fail",
name="insecure-sg",
description="Insecure SG",
security_group_rules=[
SecurityGroupRule(
id="rule-fail",
security_group_id="sg-fail",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=3389,
port_range_max=3389,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_rdp_from_internet.networking_security_group_allows_rdp_from_internet import (
networking_security_group_allows_rdp_from_internet,
)
check = networking_security_group_allows_rdp_from_internet()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,540 @@
"""Tests for network_security_group_allows_ssh_from_internet check."""
from unittest import mock
from prowler.providers.openstack.services.networking.networking_service import (
SecurityGroup,
SecurityGroupRule,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_networking_security_group_allows_ssh_from_internet:
"""Test suite for network_security_group_allows_ssh_from_internet check."""
def test_no_security_groups(self):
"""Test when no security groups exist."""
network_client = mock.MagicMock()
network_client.security_groups = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 0
def test_security_group_without_ssh_exposed(self):
"""Test security group without SSH exposed to internet (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-1",
name="web-servers",
description="Web servers security group",
security_group_rules=[
SecurityGroupRule(
id="rule-1",
security_group_id="sg-1",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=80,
port_range_max=80,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-1"
assert result[0].resource_name == "web-servers"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group web-servers (sg-1) does not allow SSH (port 22) from the Internet."
)
def test_security_group_with_ssh_from_ipv4_internet(self):
"""Test security group with SSH exposed to IPv4 internet (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-2",
name="admin-servers",
description="Admin servers",
security_group_rules=[
SecurityGroupRule(
id="rule-ssh",
security_group_id="sg-2",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=22,
port_range_max=22,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-2"
assert result[0].resource_name == "admin-servers"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group admin-servers (sg-2) allows unrestricted SSH access (port 22) from the Internet via rule rule-ssh (tcp/0.0.0.0/0:22-22)."
)
def test_security_group_with_ssh_from_ipv6_internet(self):
"""Test security group with SSH exposed to IPv6 internet (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-3",
name="ipv6-servers",
description="IPv6 servers",
security_group_rules=[
SecurityGroupRule(
id="rule-ssh-ipv6",
security_group_id="sg-3",
direction="ingress",
protocol="tcp",
ethertype="IPv6",
port_range_min=22,
port_range_max=22,
remote_ip_prefix="::/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-3"
assert result[0].resource_name == "ipv6-servers"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group ipv6-servers (sg-3) allows unrestricted SSH access (port 22) from the Internet via rule rule-ssh-ipv6 (tcp/::/0:22-22)."
)
def test_security_group_with_ssh_from_restricted_cidr(self):
"""Test security group with SSH from specific CIDR (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-4",
name="restricted-ssh",
description="SSH from specific IP",
security_group_rules=[
SecurityGroupRule(
id="rule-restricted",
security_group_id="sg-4",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=22,
port_range_max=22,
remote_ip_prefix="203.0.113.0/24",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-4"
assert result[0].resource_name == "restricted-ssh"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group restricted-ssh (sg-4) does not allow SSH (port 22) from the Internet."
)
def test_security_group_with_ssh_port_range(self):
"""Test security group with port range including SSH (FAIL)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-5",
name="port-range",
description="Port range including SSH",
security_group_rules=[
SecurityGroupRule(
id="rule-range",
security_group_id="sg-5",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=20,
port_range_max=25,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-5"
assert result[0].resource_name == "port-range"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group port-range (sg-5) allows unrestricted SSH access (port 22) from the Internet via rule rule-range (tcp/0.0.0.0/0:20-25)."
)
def test_security_group_with_ssh_from_security_group(self):
"""Test security group with SSH from another security group (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-6",
name="sg-referenced",
description="SSH from security group",
security_group_rules=[
SecurityGroupRule(
id="rule-sg-ref",
security_group_id="sg-6",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=22,
port_range_max=22,
remote_ip_prefix=None,
remote_group_id="sg-bastion",
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-6"
assert result[0].resource_name == "sg-referenced"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group sg-referenced (sg-6) does not allow SSH (port 22) from the Internet."
)
def test_security_group_with_egress_ssh(self):
"""Test security group with egress SSH rule (PASS)."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-7",
name="egress-only",
description="Egress SSH",
security_group_rules=[
SecurityGroupRule(
id="rule-egress",
security_group_id="sg-7",
direction="egress",
protocol="tcp",
ethertype="IPv4",
port_range_min=22,
port_range_max=22,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "sg-7"
assert result[0].resource_name == "egress-only"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Security group egress-only (sg-7) does not allow SSH (port 22) from the Internet."
)
def test_security_group_with_all_tcp_from_internet(self):
"""Test security group allowing all TCP ports from internet (FAIL).
In OpenStack Neutron, protocol=tcp with port_range_min=None and
port_range_max=None means all TCP ports are allowed.
"""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-all-tcp",
name="all-tcp-open",
description="All TCP ports open",
security_group_rules=[
SecurityGroupRule(
id="rule-all-tcp",
security_group_id="sg-all-tcp",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=None,
port_range_max=None,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "sg-all-tcp"
assert result[0].resource_name == "all-tcp-open"
assert result[0].region == OPENSTACK_REGION
def test_multiple_security_groups_mixed(self):
"""Test multiple security groups with mixed results."""
network_client = mock.MagicMock()
network_client.security_groups = [
SecurityGroup(
id="sg-pass",
name="secure-sg",
description="Secure SG",
security_group_rules=[],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
),
SecurityGroup(
id="sg-fail",
name="insecure-sg",
description="Insecure SG",
security_group_rules=[
SecurityGroupRule(
id="rule-fail",
security_group_id="sg-fail",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=22,
port_range_max=22,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
),
],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
is_default=False,
tags=[],
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_security_group_allows_ssh_from_internet.networking_security_group_allows_ssh_from_internet import (
networking_security_group_allows_ssh_from_internet,
)
check = networking_security_group_allows_ssh_from_internet()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1

View File

@@ -0,0 +1,242 @@
"""Tests for network_subnet_dhcp_disabled check."""
from unittest import mock
from prowler.providers.openstack.services.networking.networking_service import Subnet
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class Test_networking_subnet_dhcp_disabled:
"""Test suite for network_subnet_dhcp_disabled check."""
def test_no_subnets(self):
"""Test when no subnets exist."""
network_client = mock.MagicMock()
network_client.subnets = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import (
networking_subnet_dhcp_disabled,
)
check = networking_subnet_dhcp_disabled()
result = check.execute()
assert len(result) == 0
def test_subnet_dhcp_enabled(self):
"""Test subnet with DHCP enabled (PASS)."""
network_client = mock.MagicMock()
network_client.subnets = [
Subnet(
id="subnet-1",
name="production-subnet",
network_id="net-1",
ip_version=4,
cidr="192.168.1.0/24",
gateway_ip="192.168.1.1",
enable_dhcp=True,
dns_nameservers=["8.8.8.8", "8.8.4.4"],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import (
networking_subnet_dhcp_disabled,
)
check = networking_subnet_dhcp_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "subnet-1"
assert result[0].resource_name == "production-subnet"
assert (
result[0].status_extended
== "Subnet production-subnet (subnet-1) has DHCP enabled."
)
assert result[0].region == OPENSTACK_REGION
def test_subnet_dhcp_disabled(self):
"""Test subnet with DHCP disabled (FAIL)."""
network_client = mock.MagicMock()
network_client.subnets = [
Subnet(
id="subnet-2",
name="static-subnet",
network_id="net-2",
ip_version=4,
cidr="10.0.0.0/24",
gateway_ip="10.0.0.1",
enable_dhcp=False,
dns_nameservers=[],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import (
networking_subnet_dhcp_disabled,
)
check = networking_subnet_dhcp_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == "subnet-2"
assert result[0].resource_name == "static-subnet"
assert (
result[0].status_extended
== "Subnet static-subnet (subnet-2) on network net-2 has DHCP disabled, which may prevent instances from obtaining IP addresses automatically."
)
assert result[0].region == OPENSTACK_REGION
def test_multiple_subnets_mixed_results(self):
"""Test multiple subnets with mixed DHCP configurations."""
network_client = mock.MagicMock()
network_client.subnets = [
Subnet(
id="subnet-1",
name="dhcp-enabled-subnet",
network_id="net-1",
ip_version=4,
cidr="192.168.1.0/24",
gateway_ip="192.168.1.1",
enable_dhcp=True,
dns_nameservers=["8.8.8.8"],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
Subnet(
id="subnet-2",
name="dhcp-disabled-subnet",
network_id="net-2",
ip_version=4,
cidr="10.0.0.0/24",
gateway_ip="10.0.0.1",
enable_dhcp=False,
dns_nameservers=[],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
),
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import (
networking_subnet_dhcp_disabled,
)
check = networking_subnet_dhcp_disabled()
result = check.execute()
assert len(result) == 2
assert len([r for r in result if r.status == "PASS"]) == 1
assert len([r for r in result if r.status == "FAIL"]) == 1
pass_result = [r for r in result if r.status == "PASS"][0]
assert pass_result.resource_id == "subnet-1"
assert pass_result.resource_name == "dhcp-enabled-subnet"
assert pass_result.region == OPENSTACK_REGION
assert (
pass_result.status_extended
== "Subnet dhcp-enabled-subnet (subnet-1) has DHCP enabled."
)
fail_result = [r for r in result if r.status == "FAIL"][0]
assert fail_result.resource_id == "subnet-2"
assert fail_result.resource_name == "dhcp-disabled-subnet"
assert fail_result.region == OPENSTACK_REGION
assert (
fail_result.status_extended
== "Subnet dhcp-disabled-subnet (subnet-2) on network net-2 has DHCP disabled, which may prevent instances from obtaining IP addresses automatically."
)
def test_subnet_ipv6_dhcp_enabled(self):
"""Test IPv6 subnet with DHCP enabled."""
network_client = mock.MagicMock()
network_client.subnets = [
Subnet(
id="subnet-ipv6",
name="ipv6-subnet",
network_id="net-1",
ip_version=6,
cidr="2001:db8::/64",
gateway_ip="2001:db8::1",
enable_dhcp=True,
dns_nameservers=["2001:4860:4860::8888"],
project_id=OPENSTACK_PROJECT_ID,
region=OPENSTACK_REGION,
)
]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_openstack_provider(),
),
mock.patch(
"prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled.networking_client",
new=network_client,
),
):
from prowler.providers.openstack.services.networking.networking_subnet_dhcp_disabled.networking_subnet_dhcp_disabled import (
networking_subnet_dhcp_disabled,
)
check = networking_subnet_dhcp_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == "subnet-ipv6"
assert result[0].resource_name == "ipv6-subnet"
assert result[0].region == OPENSTACK_REGION
assert (
result[0].status_extended
== "Subnet ipv6-subnet (subnet-ipv6) has DHCP enabled."
)

View File

@@ -0,0 +1,542 @@
"""Tests for OpenStack Network service."""
from unittest.mock import MagicMock, patch
from openstack import exceptions as openstack_exceptions
from prowler.providers.openstack.services.networking.networking_service import (
Networking,
NetworkResource,
Port,
SecurityGroup,
SecurityGroupRule,
Subnet,
)
from tests.providers.openstack.openstack_fixtures import (
OPENSTACK_PROJECT_ID,
OPENSTACK_REGION,
set_mocked_openstack_provider,
)
class TestNetworkingService:
"""Test suite for Network service."""
def test_network_service_initialization(self):
"""Test Network service initializes correctly."""
provider = set_mocked_openstack_provider()
with (
patch.object(Networking, "_list_security_groups", return_value=[]),
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert network.service_name == "Networking"
assert network.provider == provider
assert network.connection == provider.connection
assert network.regional_connections == provider.regional_connections
assert network.audited_regions == [OPENSTACK_REGION]
assert network.region == OPENSTACK_REGION
assert network.project_id == OPENSTACK_PROJECT_ID
assert network.security_groups == []
assert network.networks == []
assert network.subnets == []
assert network.ports == []
def test_network_list_security_groups_success(self):
"""Test listing security groups successfully."""
provider = set_mocked_openstack_provider()
# Mock security group rule
mock_rule = MagicMock()
mock_rule.id = "rule-1"
mock_rule.security_group_id = "sg-1"
mock_rule.direction = "ingress"
mock_rule.protocol = "tcp"
mock_rule.ethertype = "IPv4"
mock_rule.port_range_min = 22
mock_rule.port_range_max = 22
mock_rule.remote_ip_prefix = "0.0.0.0/0"
mock_rule.remote_group_id = None
# Mock security group
mock_sg = MagicMock()
mock_sg.id = "sg-1"
mock_sg.name = "web-servers"
mock_sg.description = "Security group for web servers"
mock_sg.security_group_rules = [mock_rule]
mock_sg.project_id = OPENSTACK_PROJECT_ID
mock_sg.tags = ["production"]
provider.connection.network.security_groups.return_value = [mock_sg]
with (
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert len(network.security_groups) == 1
assert isinstance(network.security_groups[0], SecurityGroup)
assert network.security_groups[0].id == "sg-1"
assert network.security_groups[0].name == "web-servers"
assert network.security_groups[0].is_default is False
assert len(network.security_groups[0].security_group_rules) == 1
rule = network.security_groups[0].security_group_rules[0]
assert isinstance(rule, SecurityGroupRule)
assert rule.id == "rule-1"
assert rule.direction == "ingress"
assert rule.protocol == "tcp"
assert rule.port_range_min == 22
assert rule.port_range_max == 22
assert rule.remote_ip_prefix == "0.0.0.0/0"
def test_network_list_security_groups_default(self):
"""Test listing default security group."""
provider = set_mocked_openstack_provider()
mock_sg = MagicMock()
mock_sg.id = "sg-default"
mock_sg.name = "default"
mock_sg.description = "Default security group"
mock_sg.security_group_rules = []
mock_sg.project_id = OPENSTACK_PROJECT_ID
mock_sg.tags = []
provider.connection.network.security_groups.return_value = [mock_sg]
with (
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert len(network.security_groups) == 1
assert network.security_groups[0].name == "default"
assert network.security_groups[0].is_default is True
def test_network_list_security_groups_empty(self):
"""Test listing security groups when none exist."""
provider = set_mocked_openstack_provider()
provider.connection.network.security_groups.return_value = []
with (
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert network.security_groups == []
def test_network_list_security_groups_sdk_exception(self):
"""Test handling SDKException when listing security groups."""
provider = set_mocked_openstack_provider()
provider.connection.network.security_groups.side_effect = (
openstack_exceptions.SDKException("API error")
)
with (
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert network.security_groups == []
def test_network_list_networks_success(self):
"""Test listing networks successfully."""
provider = set_mocked_openstack_provider()
mock_net = MagicMock()
mock_net.id = "net-1"
mock_net.name = "private-network"
mock_net.status = "ACTIVE"
mock_net.is_admin_state_up = True
mock_net.is_shared = False
mock_net.is_router_external = False
mock_net.is_port_security_enabled = True
mock_net.subnet_ids = ["subnet-1", "subnet-2"]
mock_net.project_id = OPENSTACK_PROJECT_ID
mock_net.tags = []
provider.connection.network.networks.return_value = [mock_net]
with (
patch.object(Networking, "_list_security_groups", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert len(network.networks) == 1
assert isinstance(network.networks[0], NetworkResource)
assert network.networks[0].id == "net-1"
assert network.networks[0].name == "private-network"
assert network.networks[0].port_security_enabled is True
def test_network_list_subnets_success(self):
"""Test listing subnets successfully."""
provider = set_mocked_openstack_provider()
mock_subnet = MagicMock()
mock_subnet.id = "subnet-1"
mock_subnet.name = "private-subnet"
mock_subnet.network_id = "net-1"
mock_subnet.ip_version = 4
mock_subnet.cidr = "192.168.1.0/24"
mock_subnet.gateway_ip = "192.168.1.1"
mock_subnet.is_dhcp_enabled = True
mock_subnet.dns_nameservers = ["8.8.8.8", "8.8.4.4"]
mock_subnet.project_id = OPENSTACK_PROJECT_ID
provider.connection.network.subnets.return_value = [mock_subnet]
with (
patch.object(Networking, "_list_security_groups", return_value=[]),
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_ports", return_value=[]),
):
network = Networking(provider)
assert len(network.subnets) == 1
assert isinstance(network.subnets[0], Subnet)
assert network.subnets[0].id == "subnet-1"
assert network.subnets[0].cidr == "192.168.1.0/24"
def test_network_list_ports_success(self):
"""Test listing ports successfully."""
provider = set_mocked_openstack_provider()
mock_port = MagicMock()
mock_port.id = "port-1"
mock_port.name = "instance-port"
mock_port.network_id = "net-1"
mock_port.mac_address = "fa:16:3e:00:00:01"
mock_port.fixed_ips = [{"ip_address": "192.168.1.10", "subnet_id": "subnet-1"}]
mock_port.is_port_security_enabled = True
mock_port.security_groups = ["sg-1"]
mock_port.device_owner = "compute:nova"
mock_port.device_id = "instance-1"
mock_port.project_id = OPENSTACK_PROJECT_ID
provider.connection.network.ports.return_value = [mock_port]
with (
patch.object(Networking, "_list_security_groups", return_value=[]),
patch.object(Networking, "_list_networks", return_value=[]),
patch.object(Networking, "_list_subnets", return_value=[]),
):
network = Networking(provider)
assert len(network.ports) == 1
assert isinstance(network.ports[0], Port)
assert network.ports[0].id == "port-1"
assert network.ports[0].port_security_enabled is True
assert network.ports[0].security_groups == ["sg-1"]
def test_network_dataclasses_attributes(self):
"""Test dataclass attributes are correctly set."""
rule = SecurityGroupRule(
id="rule-1",
security_group_id="sg-1",
direction="ingress",
protocol="tcp",
ethertype="IPv4",
port_range_min=80,
port_range_max=80,
remote_ip_prefix="0.0.0.0/0",
remote_group_id=None,
)
assert rule.id == "rule-1"
assert rule.protocol == "tcp"
assert rule.port_range_min == 80
sg = SecurityGroup(
id="sg-1",
name="web",
description="Web servers",
security_group_rules=[rule],
project_id="project-1",
region="RegionOne",
is_default=False,
tags=["prod"],
)
assert sg.id == "sg-1"
assert len(sg.security_group_rules) == 1
assert sg.is_default is False
def test_networking_list_security_groups_multi_region(self):
"""Test listing security groups across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_sg_uk = MagicMock()
mock_sg_uk.id = "sg-uk"
mock_sg_uk.name = "web-uk"
mock_sg_uk.description = "UK web servers"
mock_sg_uk.security_group_rules = []
mock_sg_uk.project_id = OPENSTACK_PROJECT_ID
mock_sg_uk.tags = []
mock_sg_de = MagicMock()
mock_sg_de.id = "sg-de"
mock_sg_de.name = "web-de"
mock_sg_de.description = "DE web servers"
mock_sg_de.security_group_rules = []
mock_sg_de.project_id = OPENSTACK_PROJECT_ID
mock_sg_de.tags = []
mock_conn_uk1.network.security_groups.return_value = [mock_sg_uk]
mock_conn_de1.network.security_groups.return_value = [mock_sg_de]
mock_conn_uk1.network.networks.return_value = []
mock_conn_de1.network.networks.return_value = []
mock_conn_uk1.network.subnets.return_value = []
mock_conn_de1.network.subnets.return_value = []
mock_conn_uk1.network.ports.return_value = []
mock_conn_de1.network.ports.return_value = []
network = Networking(provider)
assert len(network.security_groups) == 2
uk_sg = next(sg for sg in network.security_groups if sg.id == "sg-uk")
de_sg = next(sg for sg in network.security_groups if sg.id == "sg-de")
assert uk_sg.region == "UK1"
assert de_sg.region == "DE1"
def test_networking_list_networks_multi_region(self):
"""Test listing networks across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_net_uk = MagicMock()
mock_net_uk.id = "net-uk"
mock_net_uk.name = "private-uk"
mock_net_uk.status = "ACTIVE"
mock_net_uk.is_admin_state_up = True
mock_net_uk.is_shared = False
mock_net_uk.is_router_external = False
mock_net_uk.is_port_security_enabled = True
mock_net_uk.subnet_ids = ["subnet-uk"]
mock_net_uk.project_id = OPENSTACK_PROJECT_ID
mock_net_uk.tags = []
mock_net_de = MagicMock()
mock_net_de.id = "net-de"
mock_net_de.name = "private-de"
mock_net_de.status = "ACTIVE"
mock_net_de.is_admin_state_up = True
mock_net_de.is_shared = False
mock_net_de.is_router_external = False
mock_net_de.is_port_security_enabled = True
mock_net_de.subnet_ids = ["subnet-de"]
mock_net_de.project_id = OPENSTACK_PROJECT_ID
mock_net_de.tags = []
mock_conn_uk1.network.security_groups.return_value = []
mock_conn_de1.network.security_groups.return_value = []
mock_conn_uk1.network.networks.return_value = [mock_net_uk]
mock_conn_de1.network.networks.return_value = [mock_net_de]
mock_conn_uk1.network.subnets.return_value = []
mock_conn_de1.network.subnets.return_value = []
mock_conn_uk1.network.ports.return_value = []
mock_conn_de1.network.ports.return_value = []
network = Networking(provider)
assert len(network.networks) == 2
uk_net = next(n for n in network.networks if n.id == "net-uk")
de_net = next(n for n in network.networks if n.id == "net-de")
assert uk_net.region == "UK1"
assert de_net.region == "DE1"
def test_networking_list_subnets_multi_region(self):
"""Test listing subnets across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_subnet_uk = MagicMock()
mock_subnet_uk.id = "subnet-uk"
mock_subnet_uk.name = "subnet-uk"
mock_subnet_uk.network_id = "net-uk"
mock_subnet_uk.ip_version = 4
mock_subnet_uk.cidr = "10.0.0.0/24"
mock_subnet_uk.gateway_ip = "10.0.0.1"
mock_subnet_uk.is_dhcp_enabled = True
mock_subnet_uk.dns_nameservers = ["8.8.8.8"]
mock_subnet_uk.project_id = OPENSTACK_PROJECT_ID
mock_subnet_de = MagicMock()
mock_subnet_de.id = "subnet-de"
mock_subnet_de.name = "subnet-de"
mock_subnet_de.network_id = "net-de"
mock_subnet_de.ip_version = 4
mock_subnet_de.cidr = "10.1.0.0/24"
mock_subnet_de.gateway_ip = "10.1.0.1"
mock_subnet_de.is_dhcp_enabled = True
mock_subnet_de.dns_nameservers = ["8.8.4.4"]
mock_subnet_de.project_id = OPENSTACK_PROJECT_ID
mock_conn_uk1.network.security_groups.return_value = []
mock_conn_de1.network.security_groups.return_value = []
mock_conn_uk1.network.networks.return_value = []
mock_conn_de1.network.networks.return_value = []
mock_conn_uk1.network.subnets.return_value = [mock_subnet_uk]
mock_conn_de1.network.subnets.return_value = [mock_subnet_de]
mock_conn_uk1.network.ports.return_value = []
mock_conn_de1.network.ports.return_value = []
network = Networking(provider)
assert len(network.subnets) == 2
uk_subnet = next(s for s in network.subnets if s.id == "subnet-uk")
de_subnet = next(s for s in network.subnets if s.id == "subnet-de")
assert uk_subnet.region == "UK1"
assert de_subnet.region == "DE1"
def test_networking_list_ports_multi_region(self):
"""Test listing ports across multiple regions."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_port_uk = MagicMock()
mock_port_uk.id = "port-uk"
mock_port_uk.name = "port-uk"
mock_port_uk.network_id = "net-uk"
mock_port_uk.mac_address = "fa:16:3e:00:00:01"
mock_port_uk.fixed_ips = [{"ip_address": "10.0.0.10", "subnet_id": "subnet-uk"}]
mock_port_uk.is_port_security_enabled = True
mock_port_uk.security_groups = ["sg-uk"]
mock_port_uk.device_owner = "compute:nova"
mock_port_uk.device_id = "instance-uk"
mock_port_uk.project_id = OPENSTACK_PROJECT_ID
mock_port_de = MagicMock()
mock_port_de.id = "port-de"
mock_port_de.name = "port-de"
mock_port_de.network_id = "net-de"
mock_port_de.mac_address = "fa:16:3e:00:00:02"
mock_port_de.fixed_ips = [{"ip_address": "10.1.0.10", "subnet_id": "subnet-de"}]
mock_port_de.is_port_security_enabled = True
mock_port_de.security_groups = ["sg-de"]
mock_port_de.device_owner = "compute:nova"
mock_port_de.device_id = "instance-de"
mock_port_de.project_id = OPENSTACK_PROJECT_ID
mock_conn_uk1.network.security_groups.return_value = []
mock_conn_de1.network.security_groups.return_value = []
mock_conn_uk1.network.networks.return_value = []
mock_conn_de1.network.networks.return_value = []
mock_conn_uk1.network.subnets.return_value = []
mock_conn_de1.network.subnets.return_value = []
mock_conn_uk1.network.ports.return_value = [mock_port_uk]
mock_conn_de1.network.ports.return_value = [mock_port_de]
network = Networking(provider)
assert len(network.ports) == 2
uk_port = next(p for p in network.ports if p.id == "port-uk")
de_port = next(p for p in network.ports if p.id == "port-de")
assert uk_port.region == "UK1"
assert de_port.region == "DE1"
def test_networking_multi_region_partial_failure(self):
"""Test that a failing region doesn't prevent other regions from being listed."""
provider = set_mocked_openstack_provider()
mock_conn_ok = MagicMock()
mock_conn_fail = MagicMock()
provider.regional_connections = {"UK1": mock_conn_ok, "DE1": mock_conn_fail}
mock_sg = MagicMock()
mock_sg.id = "sg-uk"
mock_sg.name = "web-uk"
mock_sg.description = "UK web servers"
mock_sg.security_group_rules = []
mock_sg.project_id = OPENSTACK_PROJECT_ID
mock_sg.tags = []
mock_conn_ok.network.security_groups.return_value = [mock_sg]
mock_conn_fail.network.security_groups.side_effect = (
openstack_exceptions.SDKException("API error in DE1")
)
mock_conn_ok.network.networks.return_value = []
mock_conn_fail.network.networks.side_effect = openstack_exceptions.SDKException(
"API error in DE1"
)
mock_conn_ok.network.subnets.return_value = []
mock_conn_fail.network.subnets.side_effect = openstack_exceptions.SDKException(
"API error in DE1"
)
mock_conn_ok.network.ports.return_value = []
mock_conn_fail.network.ports.side_effect = openstack_exceptions.SDKException(
"API error in DE1"
)
network = Networking(provider)
assert len(network.security_groups) == 1
assert network.security_groups[0].id == "sg-uk"
assert network.security_groups[0].region == "UK1"
def test_networking_multi_region_one_empty(self):
"""Test multi-region where one region has resources and the other is empty."""
provider = set_mocked_openstack_provider()
mock_conn_uk1 = MagicMock()
mock_conn_de1 = MagicMock()
provider.regional_connections = {"UK1": mock_conn_uk1, "DE1": mock_conn_de1}
mock_net = MagicMock()
mock_net.id = "net-uk"
mock_net.name = "private-uk"
mock_net.status = "ACTIVE"
mock_net.is_admin_state_up = True
mock_net.is_shared = False
mock_net.is_router_external = False
mock_net.is_port_security_enabled = True
mock_net.subnet_ids = []
mock_net.project_id = OPENSTACK_PROJECT_ID
mock_net.tags = []
mock_conn_uk1.network.security_groups.return_value = []
mock_conn_de1.network.security_groups.return_value = []
mock_conn_uk1.network.networks.return_value = [mock_net]
mock_conn_de1.network.networks.return_value = []
mock_conn_uk1.network.subnets.return_value = []
mock_conn_de1.network.subnets.return_value = []
mock_conn_uk1.network.ports.return_value = []
mock_conn_de1.network.ports.return_value = []
network = Networking(provider)
assert len(network.networks) == 1
assert network.networks[0].id == "net-uk"
assert network.networks[0].region == "UK1"