feat(azure/vm): add new check vm_scaleset_associated_load_balancer (#8181)

This commit is contained in:
Rubén De la Torre Vico
2025-07-08 10:40:43 +02:00
committed by GitHub
parent 3628e7b3e8
commit 3ceb86c4d9
6 changed files with 355 additions and 0 deletions
+2
View File
@@ -9,6 +9,8 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `vm_linux_enforce_ssh_authentication` check for Azure provider [(#8149)](https://github.com/prowler-cloud/prowler/pull/8149)
- `vm_ensure_using_approved_images` check for Azure provider [(#8168)](https://github.com/prowler-cloud/prowler/pull/8168)
- `vm_scaleset_associated_load_balancer` check for Azure provider [(#8181)](https://github.com/prowler-cloud/prowler/pull/8181)
### Changed
### Fixed
@@ -0,0 +1,30 @@
{
"Provider": "azure",
"CheckID": "vm_scaleset_associated_with_load_balancer",
"CheckTitle": "VM Scale Set Is Associated With Load Balancer",
"CheckType": [],
"ServiceName": "vm",
"SubServiceName": "scaleset",
"ResourceIdTemplate": "/subscriptions/{subscriptionId}/resourceGroups/{resourceGroupName}/providers/Microsoft.Compute/virtualMachineScaleSets/{vmScaleSetName}",
"Severity": "medium",
"ResourceType": "Microsoft.Compute/virtualMachineScaleSets",
"Description": "Ensure that your Azure virtual machine scale sets are using load balancers for traffic distribution.",
"Risk": "Without load balancer integration, Azure virtual machine scale sets may experience reduced availability and potential service disruptions during traffic spikes or instance failures, leading to degraded user experience and potential business impact.",
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/virtual-network/network-overview",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/azure/VirtualMachines/associated-load-balancers.html",
"Terraform": ""
},
"Recommendation": {
"Text": "Attach a load balancer to your Azure virtual machine scale set to ensure high availability and optimal traffic distribution.",
"Url": "https://docs.microsoft.com/en-us/azure/load-balancer/load-balancer-overview"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,36 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.vm.vm_client import vm_client
class vm_scaleset_associated_with_load_balancer(Check):
"""
Ensure that Azure virtual machine scale sets are associated with a load balancer backend pool.
This check evaluates whether each VM scale set is associated with at least one load balancer backend pool.
- PASS: The scale set is associated with a load balancer backend pool.
- FAIL: The scale set is not associated with any load balancer backend pool.
"""
def execute(self):
findings = []
for subscription, scale_sets in vm_client.vm_scale_sets.items():
for scale_set in scale_sets.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=scale_set
)
report.subscription = subscription
report.resource_id = scale_set.resource_id
report.resource_name = scale_set.resource_name
report.location = scale_set.location
if scale_set.load_balancer_backend_pools:
report.status = "PASS"
backend_pool_names = [
pool.split("/")[-1]
for pool in scale_set.load_balancer_backend_pools
]
report.status_extended = f"Scale set '{scale_set.resource_name}' in subscription '{subscription}' is associated with load balancer backend pool(s): {', '.join(backend_pool_names)}."
else:
report.status = "FAIL"
report.status_extended = f"Scale set '{scale_set.resource_name}' in subscription '{subscription}' is not associated with any load balancer backend pool."
findings.append(report)
return findings
@@ -15,6 +15,7 @@ class VirtualMachines(AzureService):
super().__init__(ComputeManagementClient, provider)
self.virtual_machines = self._get_virtual_machines()
self.disks = self._get_disks()
self.vm_scale_sets = self._get_vm_scale_sets()
def _get_virtual_machines(self):
logger.info("VirtualMachines - Getting virtual machines...")
@@ -155,6 +156,69 @@ class VirtualMachines(AzureService):
return disks
def _get_vm_scale_sets(self) -> dict[str, dict]:
"""
Get all needed information about VM scale sets.
Returns:
A nested dictionary with the following structure:
{
"subscription_name": {
"vm_scale_set_id": VirtualMachineScaleSet()
}
}
"""
logger.info(
"VirtualMachines - Getting VM scale sets and their load balancer associations..."
)
vm_scale_sets = {}
for subscription_name, client in self.clients.items():
try:
scale_sets = client.virtual_machine_scale_sets.list_all()
vm_scale_sets[subscription_name] = {}
for scale_set in scale_sets:
backend_pools = []
nic_configs = []
virtual_machine_profile = getattr(
scale_set, "virtual_machine_profile", None
)
if virtual_machine_profile:
network_profile = getattr(
virtual_machine_profile, "network_profile", None
)
if network_profile:
nic_configs = (
getattr(
network_profile,
"network_interface_configurations",
[],
)
or []
)
for nic in nic_configs:
ip_confs = getattr(nic, "ip_configurations", [])
for ipconf in ip_confs:
pools = getattr(
ipconf, "load_balancer_backend_address_pools", []
)
if pools:
for pool in pools:
if getattr(pool, "id", None):
backend_pools.append(pool.id)
vm_scale_sets[subscription_name][scale_set.id] = (
VirtualMachineScaleSet(
resource_id=scale_set.id,
resource_name=scale_set.name,
location=scale_set.location,
load_balancer_backend_pools=backend_pools,
)
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return vm_scale_sets
@dataclass
class UefiSettings:
@@ -219,3 +283,10 @@ class Disk(BaseModel):
vms_attached: list[str]
encryption_type: str
location: str
class VirtualMachineScaleSet(BaseModel):
resource_id: str
resource_name: str
location: str
load_balancer_backend_pools: list[str]
@@ -0,0 +1,216 @@
from unittest import mock
from uuid import uuid4
from prowler.providers.azure.services.vm.vm_service import VirtualMachineScaleSet
from tests.providers.azure.azure_fixtures import (
AZURE_SUBSCRIPTION_ID,
set_mocked_azure_provider,
)
class Test_vm_scaleset_associated_with_load_balancer:
def test_no_subscriptions(self):
vm_scale_sets = {}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.vm.vm_client.vm_client.vm_scale_sets",
new=vm_scale_sets,
),
):
from prowler.providers.azure.services.vm.vm_scaleset_associated_with_load_balancer.vm_scaleset_associated_with_load_balancer import (
vm_scaleset_associated_with_load_balancer,
)
check = vm_scaleset_associated_with_load_balancer()
result = check.execute()
assert len(result) == 0
def test_empty_scale_sets(self):
vm_scale_sets = {AZURE_SUBSCRIPTION_ID: {}}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.vm.vm_client.vm_client.vm_scale_sets",
new=vm_scale_sets,
),
):
from prowler.providers.azure.services.vm.vm_scaleset_associated_with_load_balancer.vm_scaleset_associated_with_load_balancer import (
vm_scaleset_associated_with_load_balancer,
)
check = vm_scaleset_associated_with_load_balancer()
result = check.execute()
assert len(result) == 0
def test_compliant_scale_set(self):
vmss_id = str(uuid4())
backend_pool_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/bepool"
vm_scale_sets = {
AZURE_SUBSCRIPTION_ID: {
vmss_id: VirtualMachineScaleSet(
resource_id=vmss_id,
resource_name="compliant-vmss",
location="eastus",
load_balancer_backend_pools=[backend_pool_id],
)
}
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.vm.vm_client.vm_client.vm_scale_sets",
new=vm_scale_sets,
),
):
from prowler.providers.azure.services.vm.vm_scaleset_associated_with_load_balancer.vm_scaleset_associated_with_load_balancer import (
vm_scaleset_associated_with_load_balancer,
)
check = vm_scaleset_associated_with_load_balancer()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].resource_id == vmss_id
assert result[0].resource_name == "compliant-vmss"
assert result[0].location == "eastus"
expected_status_extended = (
f"Scale set 'compliant-vmss' in subscription '{AZURE_SUBSCRIPTION_ID}' "
f"is associated with load balancer backend pool(s): bepool."
)
assert result[0].status_extended == expected_status_extended
def test_noncompliant_scale_set(self):
vmss_id = str(uuid4())
vm_scale_sets = {
AZURE_SUBSCRIPTION_ID: {
vmss_id: VirtualMachineScaleSet(
resource_id=vmss_id,
resource_name="noncompliant-vmss",
location="westeurope",
load_balancer_backend_pools=[],
)
}
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.vm.vm_client.vm_client.vm_scale_sets",
new=vm_scale_sets,
),
):
from prowler.providers.azure.services.vm.vm_scaleset_associated_with_load_balancer.vm_scaleset_associated_with_load_balancer import (
vm_scaleset_associated_with_load_balancer,
)
check = vm_scaleset_associated_with_load_balancer()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert result[0].resource_id == vmss_id
assert result[0].resource_name == "noncompliant-vmss"
assert result[0].location == "westeurope"
expected_status_extended = (
f"Scale set 'noncompliant-vmss' in subscription '{AZURE_SUBSCRIPTION_ID}' "
f"is not associated with any load balancer backend pool."
)
assert result[0].status_extended == expected_status_extended
def test_multiple_scale_sets(self):
compliant_id = str(uuid4())
noncompliant_id = str(uuid4())
backend_pool_id = f"/subscriptions/{AZURE_SUBSCRIPTION_ID}/resourceGroups/rg/providers/Microsoft.Network/loadBalancers/lb/backendAddressPools/bepool"
vm_scale_sets = {
AZURE_SUBSCRIPTION_ID: {
compliant_id: VirtualMachineScaleSet(
resource_id=compliant_id,
resource_name="compliant-vmss",
location="eastus",
load_balancer_backend_pools=[backend_pool_id],
),
noncompliant_id: VirtualMachineScaleSet(
resource_id=noncompliant_id,
resource_name="noncompliant-vmss",
location="westeurope",
load_balancer_backend_pools=[],
),
}
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.vm.vm_client.vm_client.vm_scale_sets",
new=vm_scale_sets,
),
):
from prowler.providers.azure.services.vm.vm_scaleset_associated_with_load_balancer.vm_scaleset_associated_with_load_balancer import (
vm_scaleset_associated_with_load_balancer,
)
check = vm_scaleset_associated_with_load_balancer()
result = check.execute()
assert len(result) == 2
for r in result:
if r.resource_name == "compliant-vmss":
expected_status_extended = (
f"Scale set 'compliant-vmss' in subscription '{AZURE_SUBSCRIPTION_ID}' "
f"is associated with load balancer backend pool(s): bepool."
)
assert r.status == "PASS"
assert r.status_extended == expected_status_extended
elif r.resource_name == "noncompliant-vmss":
expected_status_extended = (
f"Scale set 'noncompliant-vmss' in subscription '{AZURE_SUBSCRIPTION_ID}' "
f"is not associated with any load balancer backend pool."
)
assert r.status == "FAIL"
assert r.status_extended == expected_status_extended
def test_missing_attributes(self):
# Simulate a scale set with missing optional attributes
vmss_id = str(uuid4())
vm_scale_sets = {
AZURE_SUBSCRIPTION_ID: {
vmss_id: VirtualMachineScaleSet(
resource_id=vmss_id,
resource_name="",
location="",
load_balancer_backend_pools=[],
)
}
}
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_azure_provider(),
),
mock.patch(
"prowler.providers.azure.services.vm.vm_client.vm_client.vm_scale_sets",
new=vm_scale_sets,
),
):
from prowler.providers.azure.services.vm.vm_scaleset_associated_with_load_balancer.vm_scaleset_associated_with_load_balancer import (
vm_scaleset_associated_with_load_balancer,
)
check = vm_scaleset_associated_with_load_balancer()
result = check.execute()
assert len(result) == 1
expected_status_extended = f"Scale set '' in subscription '{AZURE_SUBSCRIPTION_ID}' is not associated with any load balancer backend pool."
assert result[0].status == "FAIL"
assert result[0].status_extended == expected_status_extended