fix(sdk): scan every Azure subscription when display names collide (#10718)

Co-authored-by: Rubén De la Torre Vico <ruben@prowler.com>
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
Ivan Necheporenko
2026-05-07 13:59:38 +02:00
committed by GitHub
parent 71683f3093
commit bcaa6ac488
339 changed files with 3210 additions and 1235 deletions
+1
View File
@@ -25,6 +25,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
- AWS `boto` user agent extra is now applied to every client [(#10944)](https://github.com/prowler-cloud/prowler/pull/10944)
- Image provider connection check no longer fails with a misleading `host='https'` resolution error when the registry URL includes an `http://` or `https://` scheme prefix [(#10950)](https://github.com/prowler-cloud/prowler/pull/10950)
- Azure subscriptions sharing the same display name are no longer collapsed into a single identity entry, so every subscription is scanned [(#10718)](https://github.com/prowler-cloud/prowler/pull/10718)
### 🔐 Security
+5 -2
View File
@@ -749,8 +749,11 @@ def execute(
if global_provider.type == "cloudflare":
is_finding_muted_args["account_id"] = finding.account_id
if global_provider.type == "azure":
is_finding_muted_args["subscription_id"] = (
global_provider.identity.subscriptions.get(finding.subscription)
is_finding_muted_args["subscription_id"] = finding.subscription
is_finding_muted_args["subscription_name"] = (
global_provider.identity.subscriptions.get(
finding.subscription, finding.subscription
)
)
is_finding_muted_args["finding"] = finding
finding.muted = global_provider.mutelist.is_finding_muted(
+4 -2
View File
@@ -187,9 +187,11 @@ class Finding(BaseModel):
output_data["account_uid"] = (
output_data["account_organization_uid"]
if "Tenant:" in check_output.subscription
else provider.identity.subscriptions[check_output.subscription]
else check_output.subscription
)
output_data["account_name"] = provider.identity.subscriptions.get(
check_output.subscription, check_output.subscription
)
output_data["account_name"] = check_output.subscription
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
+5 -2
View File
@@ -492,8 +492,11 @@ class HTML(Output):
"""
try:
printed_subscriptions = []
for key, value in provider.identity.subscriptions.items():
intermediate = f"{key} : {value}"
for (
subscription_id,
display_name,
) in provider.identity.subscriptions.items():
intermediate = f"{display_name} : {subscription_id}"
printed_subscriptions.append(intermediate)
# check if identity is str(coming from SP) or dict(coming from browser or)
+5 -2
View File
@@ -82,8 +82,11 @@ class Slack:
logo = gcp_logo
elif provider.type == "azure":
printed_subscriptions = []
for key, value in provider.identity.subscriptions.items():
intermediate = f"- *{key}: {value}*\n"
for (
subscription_id,
display_name,
) in provider.identity.subscriptions.items():
intermediate = f"- *{subscription_id}: {display_name}*\n"
printed_subscriptions.append(intermediate)
identity = f"Azure Subscriptions:\n{''.join(printed_subscriptions)}"
logo = azure_logo
+6 -2
View File
@@ -185,9 +185,13 @@ def display_summary_table(
print(
f"\n{entity_type} {Fore.YELLOW}{audited_entities}{Style.RESET_ALL} Scan Results (severity columns are for fails only):"
)
if provider == "azure":
if provider.type == "azure":
scanned_subscriptions = ", ".join(
f"{display_name} ({subscription_id})"
for subscription_id, display_name in provider.identity.subscriptions.items()
)
print(
f"\nSubscriptions scanned: {Fore.YELLOW}{' '.join(provider.identity.subscriptions.keys())}{Style.RESET_ALL}"
f"\nSubscriptions scanned: {Fore.YELLOW}{scanned_subscriptions}{Style.RESET_ALL}"
)
print(tabulate(findings_table, headers="keys", tablefmt="rounded_grid"))
print(
+29 -18
View File
@@ -441,8 +441,8 @@ class AzureProvider(Provider):
None
"""
printed_subscriptions = []
for key, value in self._identity.subscriptions.items():
intermediate = key + ": " + value
for subscription_id, display_name in self._identity.subscriptions.items():
intermediate = display_name + ": " + subscription_id
printed_subscriptions.append(intermediate)
report_lines = [
f"Azure Tenant Domain: {Fore.YELLOW}{self._identity.tenant_domain}{Style.RESET_ALL} Azure Tenant ID: {Fore.YELLOW}{self._identity.tenant_ids[0]}{Style.RESET_ALL}",
@@ -969,19 +969,30 @@ class AzureProvider(Provider):
)
if not subscription_ids:
logger.info("Scanning all the Azure subscriptions...")
for subscription in subscriptions_client.subscriptions.list():
# TODO: get tags or labels
# TODO: fill with AzureSubscription
identity.subscriptions.update(
{subscription.display_name: subscription.subscription_id}
)
# TODO: get tags or labels
# TODO: fill with AzureSubscription
subscription_pairs = [
(subscription.display_name, subscription.subscription_id)
for subscription in subscriptions_client.subscriptions.list()
]
else:
logger.info("Scanning the subscriptions passed as argument ...")
for id in subscription_ids:
subscription = subscriptions_client.subscriptions.get(
subscription_id=id
subscription_pairs = [
(
subscriptions_client.subscriptions.get(
subscription_id=id
).display_name,
id,
)
identity.subscriptions.update({subscription.display_name: id})
for id in subscription_ids
]
# Key the subscriptions dict by subscription ID (which is
# guaranteed unique) and store the display name as the value.
# This avoids collisions when multiple subscriptions share
# the same display name.
for display_name, subscription_id in subscription_pairs:
identity.subscriptions[subscription_id] = display_name
# If there are no subscriptions listed -> checks are not going to be run against any resource
if not identity.subscriptions:
@@ -1017,28 +1028,28 @@ class AzureProvider(Provider):
Returns:
A dictionary containing the locations available for each subscription. The dictionary
has subscription display names as keys and lists of location names as values.
has subscription IDs as keys and lists of location names as values.
Examples:
>>> provider = AzureProvider(...)
>>> provider.get_locations()
{
'Subscription 1': ['eastus', 'eastus2', 'westus', 'westus2'],
'Subscription 2': ['eastus', 'eastus2', 'westus', 'westus2']
'sub-id-1': ['eastus', 'eastus2', 'westus', 'westus2'],
'sub-id-2': ['eastus', 'eastus2', 'westus', 'westus2']
}
"""
credentials = self.session
subscription_client = SubscriptionClient(credentials)
locations = {}
for display_name, subscription_id in self._identity.subscriptions.items():
locations[display_name] = []
for subscription_id, display_name in self._identity.subscriptions.items():
locations[subscription_id] = []
# List locations for each subscription
for location in subscription_client.subscriptions.list_locations(
subscription_id
):
locations[display_name].append(location.name)
locations[subscription_id].append(location.name)
return locations
@@ -8,17 +8,23 @@ class AzureMutelist(Mutelist):
self,
finding: Check_Report_Azure,
subscription_id: str,
subscription_name: str = "",
) -> bool:
return self.is_muted(
subscription_id, # support Azure Subscription ID in mutelist
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
) or self.is_muted(
finding.subscription, # support Azure Subscription Name in mutelist
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)
account_names = [subscription_id]
for account_name in (subscription_name, finding.subscription):
if account_name and account_name not in account_names:
account_names.append(account_name)
tags = unroll_dict(unroll_tags(finding.resource_tags))
for account_name in account_names:
if self.is_muted(
account_name,
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
tags,
):
return True
return False
@@ -49,15 +49,15 @@ class AzureService:
if "GraphServiceClient" in str(service):
clients.update({identity.tenant_domain: service(credentials=session)})
elif "LogsQueryClient" in str(service):
for display_name, id in identity.subscriptions.items():
clients.update({display_name: service(credential=session)})
for subscription_id, display_name in identity.subscriptions.items():
clients.update({subscription_id: service(credential=session)})
else:
for display_name, id in identity.subscriptions.items():
for subscription_id, display_name in identity.subscriptions.items():
clients.update(
{
display_name: service(
subscription_id: service(
credential=session,
subscription_id=id,
subscription_id=subscription_id,
base_url=region_config.base_url,
credential_scopes=region_config.credential_scopes,
)
@@ -36,7 +36,7 @@ class AISearch(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return aisearch_services
@@ -9,20 +9,23 @@ class aisearch_service_not_publicly_accessible(Check):
findings = []
for (
subscription_name,
subscription_id,
aisearch_services,
) in aisearch_client.aisearch_services.items():
subscription_name = aisearch_client.subscriptions.get(
subscription_id, subscription_id
)
for aisearch_service in aisearch_services.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=aisearch_service
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} allows public access."
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} ({subscription_id}) allows public access."
if not aisearch_service.public_network_access:
report.status = "PASS"
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} does not allows public access."
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} ({subscription_id}) does not allows public access."
findings.append(report)
@@ -6,16 +6,19 @@ class aks_cluster_rbac_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster in clusters.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"RBAC is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"RBAC is enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
if not cluster.rbac_enabled:
report.status = "FAIL"
report.status_extended = f"RBAC is not enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"RBAC is not enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -6,17 +6,20 @@ class aks_clusters_created_with_private_nodes(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster in clusters.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Cluster '{cluster.name}' was created with private nodes in subscription '{subscription_name}'"
report.status_extended = f"Cluster '{cluster.name}' was created with private nodes in subscription '{subscription_name} ({subscription_id})'"
for agent_pool in cluster.agent_pool_profiles:
if getattr(agent_pool, "enable_node_public_ip", True):
report.status = "FAIL"
report.status_extended = f"Cluster '{cluster.name}' was not created with private nodes in subscription '{subscription_name}'"
report.status_extended = f"Cluster '{cluster.name}' was not created with private nodes in subscription '{subscription_name} ({subscription_id})'"
break
findings.append(report)
@@ -6,18 +6,21 @@ class aks_clusters_public_access_disabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster in clusters.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Public access to nodes is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'"
report.status_extended = f"Public access to nodes is enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'"
if cluster.private_fqdn:
for agent_pool in cluster.agent_pool_profiles:
if not getattr(agent_pool, "enable_node_public_ip", False):
report.status = "PASS"
report.status_extended = f"Public access to nodes is disabled for cluster '{cluster.name}' in subscription '{subscription_name}'"
report.status_extended = f"Public access to nodes is disabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'"
findings.append(report)
@@ -6,16 +6,19 @@ class aks_network_policy_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster_id, cluster in clusters.items():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Network policy is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"Network policy is enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
if not getattr(cluster, "network_policy", False):
report.status = "FAIL"
report.status_extended = f"Network policy is not enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"Network policy is not enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -17,14 +17,14 @@ class AKS(AzureService):
logger.info("AKS - Getting clusters...")
clusters = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
clusters_list = client.managed_clusters.list()
clusters.update({subscription_name: {}})
clusters.update({subscription_id: {}})
for cluster in clusters_list:
if getattr(cluster, "kubernetes_version", None):
clusters[subscription_name].update(
clusters[subscription_id].update(
{
cluster.id: Cluster(
id=cluster.id,
@@ -60,7 +60,7 @@ class AKS(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return clusters
@@ -147,7 +147,7 @@ class APIM(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return instances
@@ -50,9 +50,11 @@ class apim_threat_detection_llm_jacking(Check):
],
)
# 1. Aggregate logs from all APIM instances first
all_llm_logs: List[LogsQueryLogEntry] = []
for subscription, instances in apim_client.instances.items():
subscription_name = apim_client.subscriptions.get(
subscription, subscription
)
all_llm_logs: List[LogsQueryLogEntry] = []
for instance in instances:
if instance.log_analytics_workspace_id:
logs = apim_client.get_llm_operations_logs(
@@ -60,7 +62,8 @@ class apim_threat_detection_llm_jacking(Check):
)
all_llm_logs.extend(logs)
# 2. Perform a single, global analysis on all collected logs
# Analyze logs only within the current subscription to avoid
# cross-subscription attribution when scanning multiple subscriptions.
potential_llm_jacking_attackers = {}
for log in all_llm_logs:
operation_name = log.operation_id
@@ -91,19 +94,17 @@ class apim_threat_detection_llm_jacking(Check):
report = Check_Report_Azure(self.metadata(), resource=resource)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Potential LLM Jacking attack detected from IP address {principal_ip} with a threshold of {action_ratio}."
report.status_extended = f"Potential LLM Jacking attack detected from IP address {principal_ip} in subscription {subscription_name} ({subscription}) with an action ratio of {action_ratio}, above the configured threshold of {threshold}."
findings.append(report)
# 4. If no threats were found after checking all principals, create a single PASS report
# If no threats were found after checking all principals, create a single PASS report.
if not found_potential_llm_jacking_attackers:
report = Check_Report_Azure(self.metadata(), resource={})
report.resource_name = subscription
report.resource_id = (
f"/subscriptions/{apim_client.subscriptions[subscription]}"
)
report.resource_name = subscription_name
report.resource_id = f"/subscriptions/{subscription}"
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"No potential LLM Jacking attacks detected across all monitored APIM instances in the last {threat_detection_minutes} minutes."
report.status_extended = f"No potential LLM Jacking attacks detected across monitored APIM instances in subscription {subscription_name} ({subscription}) in the last {threat_detection_minutes} minutes."
findings.append(report)
return findings
@@ -7,18 +7,21 @@ class app_client_certificates_on(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Clients are required to present a certificate for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Clients are required to present a certificate for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if app.client_cert_mode != "Required":
report.status = "FAIL"
report.status_extended = f"Clients are not required to present a certificate for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Clients are not required to present a certificate for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,18 +7,21 @@ class app_ensure_auth_is_set_up(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Authentication is set up for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Authentication is set up for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if not app.auth_enabled:
report.status = "FAIL"
report.status_extended = f"Authentication is not set up for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Authentication is not set up for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,18 +7,21 @@ class app_ensure_http_is_redirected_to_https(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"HTTP is redirected to HTTPS for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP is redirected to HTTPS for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if not app.https_only:
report.status = "FAIL"
report.status_extended = f"HTTP is not redirected to HTTPS for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP is not redirected to HTTPS for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,9 +7,12 @@ class app_ensure_java_version_is_latest(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
linux_framework = getattr(app.configurations, "linux_fx_version", "")
windows_framework_version = getattr(
@@ -18,19 +21,19 @@ class app_ensure_java_version_is_latest(Check):
if "java" in linux_framework.lower() or windows_framework_version:
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
java_latest_version = app_client.audit_config.get(
"java_latest_version", "17"
)
report.status_extended = f"Java version is set to '{f'java{windows_framework_version}' if windows_framework_version else linux_framework}', but should be set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Java version is set to '{f'java{windows_framework_version}' if windows_framework_version else linux_framework}', but should be set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
f"java{java_latest_version}" in linux_framework
or java_latest_version == windows_framework_version
):
report.status = "PASS"
report.status_extended = f"Java version is set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Java version is set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,9 +7,12 @@ class app_ensure_php_version_is_latest(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
framework = getattr(app.configurations, "linux_fx_version", "")
@@ -17,14 +20,14 @@ class app_ensure_php_version_is_latest(Check):
app.configurations, "php_version", ""
):
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
php_latest_version = app_client.audit_config.get(
"php_latest_version", "8.2"
)
report.status_extended = f"PHP version is set to '{framework}', the latest version that you could use is the '{php_latest_version}' version, for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"PHP version is set to '{framework}', the latest version that you could use is the '{php_latest_version}' version, for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
php_latest_version in framework
@@ -32,7 +35,7 @@ class app_ensure_php_version_is_latest(Check):
== php_latest_version
):
report.status = "PASS"
report.status_extended = f"PHP version is set to '{php_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"PHP version is set to '{php_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,9 +7,12 @@ class app_ensure_python_version_is_latest(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
framework = getattr(app.configurations, "linux_fx_version", "")
@@ -17,12 +20,12 @@ class app_ensure_python_version_is_latest(Check):
app.configurations, "python_version", ""
):
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
python_latest_version = app_client.audit_config.get(
"python_latest_version", "3.12"
)
report.status_extended = f"Python version is '{framework}', the latest version that you could use is the '{python_latest_version}' version, for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Python version is '{framework}', the latest version that you could use is the '{python_latest_version}' version, for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
python_latest_version in framework
@@ -30,7 +33,7 @@ class app_ensure_python_version_is_latest(Check):
== python_latest_version
):
report.status = "PASS"
report.status_extended = f"Python version is set to '{python_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Python version is set to '{python_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,20 +7,23 @@ class app_ensure_using_http20(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"HTTP/2.0 is not enabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP/2.0 is not enabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if app.configurations and getattr(
app.configurations, "http20_enabled", False
):
report.status = "PASS"
report.status_extended = f"HTTP/2.0 is enabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP/2.0 is enabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,21 +7,24 @@ class app_ftp_deployment_disabled(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"FTP is enabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"FTP is enabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
app.configurations
and getattr(app.configurations, "ftps_state", "AllAllowed")
!= "AllAllowed"
):
report.status = "PASS"
report.status_extended = f"FTP is disabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"FTP is disabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,23 +7,24 @@ class app_function_access_keys_configured(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.function_keys is not None:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} does not have function keys configured."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) does not have function keys configured."
if len(function.function_keys) > 0:
report.status = "PASS"
report.status_extended = (
f"Function {function.name} has function keys configured."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has function keys configured."
findings.append(report)
@@ -7,19 +7,20 @@ class app_function_application_insights_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.enviroment_variables is not None:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = (
f"Function {function.name} is not using Application Insights."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is not using Application Insights."
if function.enviroment_variables.get(
"APPINSIGHTS_INSTRUMENTATIONKEY", None
@@ -27,9 +28,7 @@ class app_function_application_insights_enabled(Check):
"APPLICATIONINSIGHTS_CONNECTION_STRING", None
):
report.status = "PASS"
report.status_extended = (
f"Function {function.name} is using Application Insights."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is using Application Insights."
findings.append(report)
@@ -7,19 +7,20 @@ class app_function_ftps_deployment_disabled(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} has {'FTP' if function.ftps_state == 'AllAllowed' else 'FTPS' if function.ftps_state == 'FtpsOnly' else 'FTP or FTPS'} deployment enabled"
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has {'FTP' if function.ftps_state == 'AllAllowed' else 'FTPS' if function.ftps_state == 'FtpsOnly' else 'FTP or FTPS'} deployment enabled."
if function.ftps_state == "Disabled":
report.status = "PASS"
report.status_extended = (
f"Function {function.name} has FTP and FTPS deployment disabled"
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has FTP and FTPS deployment disabled."
findings.append(report)
@@ -7,18 +7,26 @@ class app_function_identity_is_configured(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} does not have a managed identity enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) does not have a managed identity enabled."
if function.identity:
identity_type = (
function.identity.type
if getattr(function.identity, "type", "")
else "managed"
)
report.status = "PASS"
report.status_extended = f"Function {function.name} has a {function.identity.type if getattr(function.identity, 'type', '') else 'managed'} identity enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has a {identity_type} identity enabled."
findings.append(report)
@@ -14,22 +14,25 @@ class app_function_identity_without_admin_privileges(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.identity:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Function {function.name} has a managed identity enabled but without admin privileges."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has a managed identity enabled but without admin privileges."
admin_roles_assigned = []
for role_assignment in iam_client.role_assignments[
subscription_name
subscription_id
].values():
if (
role_assignment.agent_id == function.identity.principal_id
@@ -43,8 +46,8 @@ class app_function_identity_without_admin_privileges(Check):
):
admin_roles_assigned.append(
getattr(
iam_client.roles[subscription_name].get(
f"/subscriptions/{iam_client.subscriptions[subscription_name]}/providers/Microsoft.Authorization/roleDefinitions/{role_assignment.role_id}"
iam_client.roles[subscription_id].get(
f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/roleDefinitions/{role_assignment.role_id}"
),
"name",
"",
@@ -53,7 +56,7 @@ class app_function_identity_without_admin_privileges(Check):
if admin_roles_assigned:
report.status = "FAIL"
report.status_extended = f"Function {function.name} has a managed identity enabled and it is configure with admin privileges using {'roles: ' + ', '.join(admin_roles_assigned) if len(admin_roles_assigned) > 1 else 'role ' + admin_roles_assigned[0]}."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has a managed identity enabled and it is configure with admin privileges using {'roles: ' + ', '.join(admin_roles_assigned) if len(admin_roles_assigned) > 1 else 'role ' + admin_roles_assigned[0]}."
findings.append(report)
@@ -7,19 +7,20 @@ class app_function_latest_runtime_version(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.enviroment_variables is not None:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = (
f"Function {function.name} is using the latest runtime."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is using the latest runtime."
if (
function.enviroment_variables.get(
@@ -28,7 +29,7 @@ class app_function_latest_runtime_version(Check):
!= "~4"
):
report.status = "FAIL"
report.status_extended = f"Function {function.name} is not using the latest runtime. The current runtime is '{function.enviroment_variables.get('FUNCTIONS_EXTENSION_VERSION', '')}' and should be '~4'."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is not using the latest runtime. The current runtime is '{function.enviroment_variables.get('FUNCTIONS_EXTENSION_VERSION', '')}' and should be '~4'."
findings.append(report)
@@ -7,22 +7,21 @@ class app_function_not_publicly_accessible(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = (
f"Function {function.name} is publicly accessible."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is publicly accessible."
if not function.public_access:
report.status = "PASS"
report.status_extended = (
f"Function {function.name} is not publicly accessible."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is not publicly accessible."
findings.append(report)
@@ -7,18 +7,21 @@ class app_function_vnet_integration_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} does not have virtual network integration enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) does not have virtual network integration enabled."
if function.vnet_subnet_id:
report.status = "PASS"
report.status_extended = f"Function {function.name} has Virtual Network integration enabled with subnet '{function.vnet_subnet_id}' enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has Virtual Network integration enabled with subnet '{function.vnet_subnet_id}' enabled."
findings.append(report)
@@ -6,25 +6,28 @@ class app_http_logs_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, apps in app_client.apps.items():
for subscription_id, apps in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
if "functionapp" not in app.kind:
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
if not app.monitor_diagnostic_settings:
report.status_extended = f"App {app.name} does not have a diagnostic setting in subscription {subscription_name}."
report.status_extended = f"App {app.name} does not have a diagnostic setting in subscription {subscription_name} ({subscription_id})."
else:
for diagnostic_setting in app.monitor_diagnostic_settings:
report.status_extended = f"App {app.name} does not have HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name}"
report.status_extended = f"App {app.name} does not have HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name} ({subscription_id})"
for log in diagnostic_setting.logs:
if log.category == "AppServiceHTTPLogs" and log.enabled:
report.status = "PASS"
report.status_extended = f"App {app.name} has HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name}"
report.status_extended = f"App {app.name} has HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name} ({subscription_id})"
break
elif log.category_group == "allLogs" and log.enabled:
report.status = "PASS"
report.status_extended = f"App {app.name} has allLogs category group which includes HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name}"
report.status_extended = f"App {app.name} has allLogs category group which includes HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name} ({subscription_id})"
break
findings.append(report)
@@ -7,20 +7,23 @@ class app_minimum_tls_version_12(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Minimum TLS version is not set to 1.2 for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Minimum TLS version is not set to 1.2 for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if app.configurations and getattr(
app.configurations, "min_tls_version", ""
) in ["1.2", "1.3"]:
report.status = "PASS"
report.status_extended = f"Minimum TLS version is set to {app.configurations.min_tls_version} for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Minimum TLS version is set to {app.configurations.min_tls_version} for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,18 +7,21 @@ class app_register_with_identity(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"App '{app.name}' in subscription '{subscription_name}' has an identity configured."
report.status_extended = f"App '{app.name}' in subscription '{subscription_name} ({subscription_id})' has an identity configured."
if not app.identity:
report.status = "FAIL"
report.status_extended = f"App '{app.name}' in subscription '{subscription_name}' does not have an identity configured."
report.status_extended = f"App '{app.name}' in subscription '{subscription_name} ({subscription_id})' does not have an identity configured."
findings.append(report)
@@ -20,10 +20,10 @@ class App(AzureService):
logger.info("App - Getting apps...")
apps = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
apps_list = client.web_apps.list()
apps.update({subscription_name: {}})
apps.update({subscription_id: {}})
for app in apps_list:
# Filter function apps
@@ -41,7 +41,7 @@ class App(AzureService):
resource_group_name=app.resource_group, name=app.name
)
apps[subscription_name].update(
apps[subscription_id].update(
{
app.id: WebApp(
resource_id=app.id,
@@ -81,7 +81,7 @@ class App(AzureService):
getattr(app, "client_cert_mode", "Ignore"),
),
monitor_diagnostic_settings=self._get_app_monitor_settings(
app.name, app.resource_group, subscription_name
app.name, app.resource_group, subscription_id
),
https_only=getattr(app, "https_only", False),
identity=ManagedServiceIdentity(
@@ -106,7 +106,7 @@ class App(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return apps
@@ -115,17 +115,17 @@ class App(AzureService):
logger.info("Function - Getting functions...")
functions = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
functions_list = client.web_apps.list()
functions.update({subscription_name: {}})
functions.update({subscription_id: {}})
for function in functions_list:
# Filter function apps
if getattr(function, "kind", "").startswith("functionapp"):
# List host keys
host_keys = self._get_function_host_keys(
subscription_name, function.resource_group, function.name
subscription_id, function.resource_group, function.name
)
if host_keys is not None:
function_keys = getattr(host_keys, "function_keys", {})
@@ -133,16 +133,16 @@ class App(AzureService):
function_keys = None
application_settings = self._list_application_settings(
subscription_name, function.resource_group, function.name
subscription_id, function.resource_group, function.name
)
function_config = self._get_function_config(
subscription_name,
subscription_id,
function.resource_group,
function.name,
)
functions[subscription_name].update(
functions[subscription_id].update(
{
function.id: FunctionApp(
id=function.id,
@@ -175,7 +175,7 @@ class App(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return functions
@@ -200,13 +200,13 @@ class App(AzureService):
monitor_diagnostics_settings = []
try:
monitor_diagnostics_settings = monitor_client.diagnostic_settings_with_uri(
self.subscriptions[subscription],
f"subscriptions/{self.subscriptions[subscription]}/resourceGroups/{resource_group}/providers/Microsoft.Web/sites/{app_name}",
subscription,
f"subscriptions/{subscription}/resourceGroups/{resource_group}/providers/Microsoft.Web/sites/{app_name}",
monitor_client.clients[subscription],
)
except Exception as error:
logger.error(
f"Subscription name: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return monitor_diagnostics_settings
@@ -8,19 +8,20 @@ class appinsights_ensure_is_configured(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, components in appinsights_client.components.items():
for subscription_id, components in appinsights_client.components.items():
subscription_name = appinsights_client.subscriptions.get(
subscription_id, subscription_id
)
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{appinsights_client.subscriptions[subscription_name]}"
)
report.status_extended = f"There is at least one AppInsight configured in subscription {subscription_name}."
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status_extended = f"There is at least one AppInsight configured in subscription {subscription_name} ({subscription_id})."
if len(components) < 1:
report.status = "FAIL"
report.status_extended = f"There are no AppInsight configured in subscription {subscription_name}."
report.status_extended = f"There are no AppInsight configured in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -15,13 +15,13 @@ class AppInsights(AzureService):
logger.info("AppInsights - Getting components...")
components = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
components_list = client.components.list()
components.update({subscription_name: {}})
components.update({subscription_id: {}})
for component in components_list:
components[subscription_name].update(
components[subscription_id].update(
{
component.app_id: Component(
resource_id=component.id,
@@ -35,7 +35,7 @@ class AppInsights(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return components
@@ -9,17 +9,20 @@ class containerregistry_admin_user_disabled(Check):
findings = []
for subscription, registries in containerregistry_client.registries.items():
subscription_name = containerregistry_client.subscriptions.get(
subscription, subscription
)
for container_registry_info in registries.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=container_registry_info
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} has its admin user enabled."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) has its admin user enabled."
if not container_registry_info.admin_user_enabled:
report.status = "PASS"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} has its admin user disabled."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) has its admin user disabled."
findings.append(report)
@@ -9,17 +9,20 @@ class containerregistry_not_publicly_accessible(Check):
findings = []
for subscription, registries in containerregistry_client.registries.items():
subscription_name = containerregistry_client.subscriptions.get(
subscription, subscription
)
for container_registry_info in registries.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=container_registry_info
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} allows unrestricted network access."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) allows unrestricted network access."
if not container_registry_info.public_network_access:
report.status = "PASS"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} does not allow unrestricted network access."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) does not allow unrestricted network access."
findings.append(report)
@@ -64,7 +64,7 @@ class ContainerRegistry(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return registries
@@ -81,13 +81,13 @@ class ContainerRegistry(AzureService):
monitor_diagnostics_settings = []
try:
monitor_diagnostics_settings = monitor_client.diagnostic_settings_with_uri(
self.subscriptions[subscription],
f"subscriptions/{self.subscriptions[subscription]}/resourceGroups/{resource_group}/providers/Microsoft.ContainerRegistry/registries/{registry_name}",
subscription,
f"subscriptions/{subscription}/resourceGroups/{resource_group}/providers/Microsoft.ContainerRegistry/registries/{registry_name}",
monitor_client.clients[subscription],
)
except Exception as error:
logger.error(
f"Subscription name: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return monitor_diagnostics_settings
@@ -9,17 +9,20 @@ class containerregistry_uses_private_link(Check):
findings = []
for subscription, registries in containerregistry_client.registries.items():
subscription_name = containerregistry_client.subscriptions.get(
subscription, subscription
)
for container_registry_info in registries.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=container_registry_info
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} does not use a private link."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) does not use a private link."
if container_registry_info.private_endpoint_connections:
report.status = "PASS"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} uses a private link."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) uses a private link."
findings.append(report)
@@ -6,14 +6,17 @@ class cosmosdb_account_firewall_use_selected_networks(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
subscription_name = cosmosdb_client.subscriptions.get(
subscription, subscription
)
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has firewall rules that allow access from all networks."
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) has firewall rules that allow access from all networks."
if account.is_virtual_network_filter_enabled:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has firewall rules that allow access only from selected networks."
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) has firewall rules that allow access only from selected networks."
findings.append(report)
return findings
@@ -6,14 +6,17 @@ class cosmosdb_account_use_aad_and_rbac(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
subscription_name = cosmosdb_client.subscriptions.get(
subscription, subscription
)
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is not using AAD and RBAC"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is not using AAD and RBAC"
if account.disable_local_auth:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is using AAD and RBAC"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is using AAD and RBAC"
findings.append(report)
return findings
@@ -6,14 +6,17 @@ class cosmosdb_account_use_private_endpoints(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
subscription_name = cosmosdb_client.subscriptions.get(
subscription, subscription
)
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is not using private endpoints connections"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is not using private endpoints connections"
if account.private_endpoint_connections:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is using private endpoints connections"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is using private endpoints connections"
findings.append(report)
return findings
@@ -48,7 +48,7 @@ class CosmosDB(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return accounts
@@ -17,6 +17,9 @@ class databricks_workspace_cmk_encryption_enabled(Check):
def execute(self):
findings = []
for subscription, workspaces in databricks_client.workspaces.items():
subscription_name = databricks_client.subscriptions.get(
subscription, subscription
)
for workspace in workspaces.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=workspace
@@ -25,9 +28,9 @@ class databricks_workspace_cmk_encryption_enabled(Check):
enc = workspace.managed_disk_encryption
if enc:
report.status = "PASS"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} has customer-managed key (CMK) encryption enabled with key {enc.key_vault_uri}/{enc.key_name}/{enc.key_version}."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) has customer-managed key (CMK) encryption enabled with key {enc.key_vault_uri}/{enc.key_name}/{enc.key_version}."
else:
report.status = "FAIL"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} does not have customer-managed key (CMK) encryption enabled."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) does not have customer-managed key (CMK) encryption enabled."
findings.append(report)
return findings
@@ -17,6 +17,9 @@ class databricks_workspace_vnet_injection_enabled(Check):
def execute(self):
findings = []
for subscription, workspaces in databricks_client.workspaces.items():
subscription_name = databricks_client.subscriptions.get(
subscription, subscription
)
for workspace in workspaces.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=workspace
@@ -24,9 +27,9 @@ class databricks_workspace_vnet_injection_enabled(Check):
report.subscription = subscription
if workspace.custom_managed_vnet_id:
report.status = "PASS"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} is deployed in a customer-managed VNet ({workspace.custom_managed_vnet_id})."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) is deployed in a customer-managed VNet ({workspace.custom_managed_vnet_id})."
else:
report.status = "FAIL"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} is not deployed in a customer-managed VNet (VNet Injection is not enabled)."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) is not deployed in a customer-managed VNet (VNet Injection is not enabled)."
findings.append(report)
return findings
@@ -7,9 +7,12 @@ class defender_additional_email_configured_with_a_security_contact(Check):
findings = []
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=contact_configuration
@@ -19,14 +22,14 @@ class defender_additional_email_configured_with_a_security_contact(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
if len(contact_configuration.emails) > 0:
report.status = "PASS"
report.status_extended = f"There is another correct email configured for subscription {subscription_name}."
report.status_extended = f"There is another correct email configured for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"There is not another correct email configured for subscription {subscription_name}."
report.status_extended = f"There is not another correct email configured for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Install endpoint protection solution on virtual machines"
in assessments
@@ -20,9 +23,9 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
"Install endpoint protection solution on virtual machines"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Endpoint protection is set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Endpoint protection is set up in all VMs in subscription {subscription_name} ({subscription_id})."
if (
assessments[
@@ -31,7 +34,7 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Endpoint protection is not set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Endpoint protection is not set up in all VMs in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -24,9 +24,12 @@ class defender_attack_path_notifications_properly_configured(Check):
min_risk_index = risk_levels.index(min_risk_level)
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=contact_configuration
@@ -36,21 +39,21 @@ class defender_attack_path_notifications_properly_configured(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
actual_risk_level = getattr(
contact_configuration, "attack_path_minimal_risk_level", None
)
if not actual_risk_level or actual_risk_level not in risk_levels:
report.status = "FAIL"
report.status_extended = f"Attack path notifications are not enabled in subscription {subscription_name} for security contact {contact_configuration.name}."
report.status_extended = f"Attack path notifications are not enabled in subscription {subscription_name} ({subscription_id}) for security contact {contact_configuration.name}."
else:
actual_risk_index = risk_levels.index(actual_risk_level)
if actual_risk_index <= min_risk_index:
report.status = "PASS"
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} for security contact {contact_configuration.name}."
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} ({subscription_id}) for security contact {contact_configuration.name}."
else:
report.status = "FAIL"
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} for security contact {contact_configuration.name}."
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} ({subscription_id}) for security contact {contact_configuration.name}."
findings.append(report)
return findings
@@ -7,21 +7,24 @@ class defender_auto_provisioning_log_analytics_agent_vms_on(Check):
findings = []
for (
subscription_name,
subscription_id,
auto_provisioning_settings,
) in defender_client.auto_provisioning_settings.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for auto_provisioning_setting in auto_provisioning_settings.values():
report = Check_Report_Azure(
metadata=self.metadata(),
resource=auto_provisioning_setting,
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} is set to ON."
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} ({subscription_id}) is set to ON."
if auto_provisioning_setting.auto_provision != "On":
report.status = "FAIL"
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} is set to OFF."
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} ({subscription_id}) is set to OFF."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Machines should have a vulnerability assessment solution"
in assessments
@@ -20,9 +23,9 @@ class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
"Machines should have a vulnerability assessment solution"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Vulnerability assessment is set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Vulnerability assessment is set up in all VMs in subscription {subscription_name} ({subscription_id})."
if (
assessments[
@@ -31,7 +34,7 @@ class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Vulnerability assessment is not set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Vulnerability assessment is not set up in all VMs in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_container_images_resolved_vulnerabilities(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
in assessments
@@ -28,9 +31,9 @@ class defender_container_images_resolved_vulnerabilities(Check):
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Azure running container images do not have unresolved vulnerabilities in subscription '{subscription_name}'."
report.status_extended = f"Azure running container images do not have unresolved vulnerabilities in subscription '{subscription_name} ({subscription_id})'."
if (
assessments[
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
@@ -38,7 +41,7 @@ class defender_container_images_resolved_vulnerabilities(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Azure running container images have unresolved vulnerabilities in subscription '{subscription_name}'."
report.status_extended = f"Azure running container images have unresolved vulnerabilities in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -6,20 +6,21 @@ class defender_container_images_scan_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Containers" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Containers"]
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = (
f"Container image scan is enabled in subscription {subscription}."
)
report.status_extended = f"Container image scan is enabled in subscription {subscription_name} ({subscription})."
if not pricings["Containers"].extensions.get(
"ContainerRegistriesVulnerabilityAssessments"
):
report.status = "FAIL"
report.status_extended = f"Container image scan is disabled in subscription {subscription}."
report.status_extended = f"Container image scan is disabled in subscription {subscription_name} ({subscription})."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_app_services_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "AppServices" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["AppServices"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_app_services_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan App Services"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["AppServices"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_arm_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Arm" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Arm"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_arm_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan ARM"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["Arm"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class defender_ensure_defender_for_azure_sql_databases_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "SqlServers" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["SqlServers"]
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["SqlServers"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class defender_ensure_defender_for_containers_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Containers" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Containers"]
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["Containers"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_cosmosdb_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "CosmosDbs" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["CosmosDbs"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_cosmosdb_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Cosmos DB"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["CosmosDbs"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_databases_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if (
"SqlServers" in pricings
and "SqlServerVirtualMachines" in pricings
@@ -17,7 +20,7 @@ class defender_ensure_defender_for_databases_is_on(Check):
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if (
pricings["SqlServers"].pricing_tier != "Standard"
or pricings["SqlServerVirtualMachines"].pricing_tier != "Standard"
@@ -26,7 +29,7 @@ class defender_ensure_defender_for_databases_is_on(Check):
or pricings["CosmosDbs"].pricing_tier != "Standard"
):
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_dns_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Dns" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Dns"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_dns_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan DNS"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["Dns"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_keyvault_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "KeyVaults" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["KeyVaults"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_keyvault_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan KeyVaults"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["KeyVaults"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_os_relational_databases_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "OpenSourceRelationalDatabases" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_os_relational_databases_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Open-Source Relational Databases"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["OpenSourceRelationalDatabases"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_server_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "VirtualMachines" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_server_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Servers"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["VirtualMachines"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_sql_servers_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "SqlServerVirtualMachines" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_sql_servers_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan SQL Server VMs"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["SqlServerVirtualMachines"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_storage_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "StorageAccounts" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_storage_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Storage Accounts"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["StorageAccounts"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -7,18 +7,19 @@ class defender_ensure_iot_hub_defender_is_on(Check):
findings = []
for (
subscription_name,
subscription_id,
iot_security_solutions,
) in defender_client.iot_security_solutions.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if not iot_security_solutions:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{defender_client.subscriptions[subscription_name]}"
)
report.status_extended = f"No IoT Security Solutions found in the subscription {subscription_name}."
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status_extended = f"No IoT Security Solutions found in the subscription {subscription_name} ({subscription_id})."
findings.append(report)
else:
for iot_security_solution in iot_security_solutions.values():
@@ -26,13 +27,13 @@ class defender_ensure_iot_hub_defender_is_on(Check):
metadata=self.metadata(),
resource=iot_security_solution,
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"The security solution {iot_security_solution.name} is enabled in subscription {subscription_name}."
report.status_extended = f"The security solution {iot_security_solution.name} is enabled in subscription {subscription_name} ({subscription_id})."
if iot_security_solution.status != "Enabled":
report.status = "FAIL"
report.status_extended = f"The security solution {iot_security_solution.name} is disabled in subscription {subscription_name}"
report.status_extended = f"The security solution {iot_security_solution.name} is disabled in subscription {subscription_name} ({subscription_id})"
findings.append(report)
@@ -7,29 +7,30 @@ class defender_ensure_mcas_is_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
settings,
) in defender_client.settings.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if "MCAS" not in settings:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{defender_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Cloud Apps not exists for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Cloud Apps not exists for subscription {subscription_name} ({subscription_id})."
else:
report = Check_Report_Azure(
metadata=self.metadata(), resource=settings["MCAS"]
)
report.subscription = subscription_name
report.subscription = subscription_id
if settings["MCAS"].enabled:
report.status = "PASS"
report.status_extended = f"Microsoft Defender for Cloud Apps is enabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Cloud Apps is enabled for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Cloud Apps is disabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Cloud Apps is disabled for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_ensure_notify_alerts_severity_is_high(Check):
findings = []
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=contact_configuration
@@ -19,16 +22,16 @@ class defender_ensure_notify_alerts_severity_is_high(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {subscription_name}."
report.status_extended = f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {subscription_name} ({subscription_id})."
if (
contact_configuration.alert_minimal_severity
and contact_configuration.alert_minimal_severity != "Critical"
):
report.status = "PASS"
report.status_extended = f"Notifications are enabled for alerts with a minimum severity of high or lower ({contact_configuration.alert_minimal_severity}) in subscription {subscription_name}."
report.status_extended = f"Notifications are enabled for alerts with a minimum severity of high or lower ({contact_configuration.alert_minimal_severity}) in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_ensure_notify_emails_to_owners(Check):
findings = []
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -20,16 +23,16 @@ class defender_ensure_notify_emails_to_owners(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
if (
contact_configuration.notifications_by_role.state
and "Owner" in contact_configuration.notifications_by_role.roles
):
report.status = "PASS"
report.status_extended = f"The Owner role is notified for subscription {subscription_name}."
report.status_extended = f"The Owner role is notified for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"The Owner role is not notified for subscription {subscription_name}."
report.status_extended = f"The Owner role is not notified for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_ensure_system_updates_are_applied(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Log Analytics agent should be installed on virtual machines"
in assessments
@@ -23,9 +26,9 @@ class defender_ensure_system_updates_are_applied(Check):
"System updates should be installed on your machines"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"System updates are applied for all the VMs in the subscription {subscription_name}."
report.status_extended = f"System updates are applied for all the VMs in the subscription {subscription_name} ({subscription_id})."
if (
assessments[
@@ -42,7 +45,7 @@ class defender_ensure_system_updates_are_applied(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"System updates are not applied for all the VMs in the subscription {subscription_name}."
report.status_extended = f"System updates are not applied for all the VMs in the subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,29 +7,30 @@ class defender_ensure_wdatp_is_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
settings,
) in defender_client.settings.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if "WDATP" not in settings:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{defender_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Endpoint integration not exists for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Endpoint integration not exists for subscription {subscription_name} ({subscription_id})."
else:
report = Check_Report_Azure(
metadata=self.metadata(), resource=settings["WDATP"]
)
report.subscription = subscription_name
report.subscription = subscription_id
if settings["WDATP"].enabled:
report.status = "PASS"
report.status_extended = f"Microsoft Defender for Endpoint integration is enabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Endpoint integration is enabled for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Endpoint integration is disabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Endpoint integration is disabled for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -30,14 +30,14 @@ class Defender(AzureService):
def _get_pricings(self):
logger.info("Defender - Getting pricings...")
pricings = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
pricings_list = client.pricings.list(
scope_id=f"subscriptions/{self.subscriptions[subscription_name]}"
scope_id=f"subscriptions/{subscription_id}"
)
pricings.update({subscription_name: {}})
pricings.update({subscription_id: {}})
for pricing in pricings_list.value:
pricings[subscription_name].update(
pricings[subscription_id].update(
{
pricing.name: Pricing(
resource_id=pricing.id,
@@ -60,23 +60,23 @@ class Defender(AzureService):
except ResourceNotFoundError as error:
if "Subscription Not Registered" in error.message:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Subscription Not Registered - Please register to Microsoft.Security in order to view your security status"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Subscription Not Registered - Please register to Microsoft.Security in order to view your security status"
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return pricings
def _get_auto_provisioning_settings(self):
logger.info("Defender - Getting auto provisioning settings...")
auto_provisioning = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
auto_provisioning_settings = client.auto_provisioning_settings.list()
auto_provisioning.update({subscription_name: {}})
auto_provisioning.update({subscription_id: {}})
for ap in auto_provisioning_settings:
auto_provisioning[subscription_name].update(
auto_provisioning[subscription_id].update(
{
ap.name: AutoProvisioningSetting(
resource_id=ap.id,
@@ -89,25 +89,25 @@ class Defender(AzureService):
except ClientAuthenticationError as error:
if "Subscription Not Registered" in error.message:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Subscription Not Registered - Please register to Microsoft.Security in order to view your security status"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Subscription Not Registered - Please register to Microsoft.Security in order to view your security status"
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return auto_provisioning
def _get_assessments(self):
logger.info("Defender - Getting assessments...")
assessments = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
assessments_list = client.assessments.list(
f"subscriptions/{self.subscriptions[subscription_name]}"
f"subscriptions/{subscription_id}"
)
assessments.update({subscription_name: {}})
assessments.update({subscription_id: {}})
for assessment in assessments_list:
assessments[subscription_name].update(
assessments[subscription_id].update(
{
assessment.display_name: Assesment(
resource_id=assessment.id,
@@ -120,19 +120,19 @@ class Defender(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return assessments
def _get_settings(self):
logger.info("Defender - Getting settings...")
settings = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
settings_list = client.settings.list()
settings.update({subscription_name: {}})
settings.update({subscription_id: {}})
for setting in settings_list:
settings[subscription_name].update(
settings[subscription_id].update(
{
setting.name: Setting(
resource_id=setting.id,
@@ -146,11 +146,11 @@ class Defender(AzureService):
except ClientAuthenticationError as error:
if "Subscription Not Registered" in error.message:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Subscription Not Registered - Please register to Microsoft.Security in order to view your security status"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: Subscription Not Registered - Please register to Microsoft.Security in order to view your security status"
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return settings
@@ -166,7 +166,7 @@ class Defender(AzureService):
"""
logger.info("Defender - Getting security contacts...")
security_contacts = {}
for subscription_name, subscription_id in self.subscriptions.items():
for subscription_id, display_name in self.subscriptions.items():
try:
url = f"https://management.azure.com/subscriptions/{subscription_id}/providers/Microsoft.Security/securityContacts?api-version=2023-12-01-preview"
headers = {
@@ -176,7 +176,7 @@ class Defender(AzureService):
response = requests.get(url, headers=headers)
response.raise_for_status()
contact_configurations = response.json().get("value", [])
security_contacts[subscription_name] = {}
security_contacts[subscription_id] = {}
for contact_configuration in contact_configurations:
props = contact_configuration.get("properties", {})
@@ -204,7 +204,7 @@ class Defender(AzureService):
if value is not None:
alert_minimal_severity = value
security_contacts[subscription_name][
security_contacts[subscription_id][
contact_configuration.get("name", "default")
] = SecurityContactConfiguration(
id=contact_configuration.get("id", ""),
@@ -221,21 +221,21 @@ class Defender(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return security_contacts
def _get_iot_security_solutions(self):
logger.info("Defender - Getting IoT Security Solutions...")
iot_security_solutions = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
iot_security_solutions_list = (
client.iot_security_solution.list_by_subscription()
)
iot_security_solutions.update({subscription_name: {}})
iot_security_solutions.update({subscription_id: {}})
for iot_security_solution in iot_security_solutions_list:
iot_security_solutions[subscription_name].update(
iot_security_solutions[subscription_id].update(
{
iot_security_solution.id: IoTSecuritySolution(
resource_id=iot_security_solution.id,
@@ -246,7 +246,7 @@ class Defender(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return iot_security_solutions
@@ -257,22 +257,22 @@ class Defender(AzureService):
Returns:
A dictionary of JIT policies for each subscription. The format will be:
{
"subscription_name": {
"subscription_id": {
"jit_policy_id": JITPolicy
}
}
"""
logger.info("Defender - Getting JIT policies...")
jit_policies = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
jit_policies[subscription_name] = {}
jit_policies[subscription_id] = {}
policies = client.jit_network_access_policies.list()
for policy in policies:
vm_ids = set()
for vm in getattr(policy, "virtual_machines", []):
vm_ids.add(vm.id)
jit_policies[subscription_name].update(
jit_policies[subscription_id].update(
{
policy.id: JITPolicy(
id=policy.id,
@@ -284,7 +284,7 @@ class Defender(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return jit_policies
@@ -20,10 +20,13 @@ class entra_user_with_vm_access_has_mfa(Check):
for users in entra_client.users.values():
for user in users.values():
for (
subscription_name,
subscription_id,
role_assigns,
) in iam_client.role_assignments.items():
if (user.id, subscription_name) in already_reported:
subscription_name = entra_client.subscriptions.get(
subscription_id, subscription_id
)
if (user.id, subscription_id) in already_reported:
continue
for assignment in role_assigns.values():
@@ -44,15 +47,15 @@ class entra_user_with_vm_access_has_mfa(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=user
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"User {user.name} without MFA can access VMs in subscription {subscription_name}"
report.status_extended = f"User {user.name} without MFA can access VMs in subscription {subscription_name} ({subscription_id})"
if user.is_mfa_capable:
report.status = "PASS"
report.status_extended = f"User {user.name} can access VMs in subscription {subscription_name} but it has MFA."
report.status_extended = f"User {user.name} can access VMs in subscription {subscription_name} ({subscription_id}) but it has MFA."
findings.append(report)
already_reported.add((user.id, subscription_name))
already_reported.add((user.id, subscription_id))
break
return findings
@@ -8,6 +8,7 @@ class iam_custom_role_has_permissions_to_administer_resource_locks(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, roles in iam_client.custom_roles.items():
subscription_name = iam_client.subscriptions.get(subscription, subscription)
exits_role_with_permission_over_locks = False
for custom_role in roles.values():
@@ -18,7 +19,7 @@ class iam_custom_role_has_permissions_to_administer_resource_locks(Check):
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Role {custom_role.name} from subscription {subscription} has no permission to administer resource locks."
report.status_extended = f"Role {custom_role.name} from subscription {subscription_name} ({subscription}) has no permission to administer resource locks."
for permission_item in custom_role.permissions:
if exits_role_with_permission_over_locks:
@@ -26,7 +27,7 @@ class iam_custom_role_has_permissions_to_administer_resource_locks(Check):
for action in permission_item.actions:
if search("^Microsoft.Authorization/locks/.*", action):
report.status = "PASS"
report.status_extended = f"Role {custom_role.name} from subscription {subscription} has permission to administer resource locks."
report.status_extended = f"Role {custom_role.name} from subscription {subscription_name} ({subscription}) has permission to administer resource locks."
exits_role_with_permission_over_locks = True
break
findings.append(report)
@@ -6,11 +6,14 @@ class iam_role_user_access_admin_restricted(Check):
def execute(self):
findings = []
for subscription_name, assignments in iam_client.role_assignments.items():
for subscription_id, assignments in iam_client.role_assignments.items():
subscription_name = iam_client.subscriptions.get(
subscription_id, subscription_id
)
for assignment in assignments.values():
role_assignment_name = getattr(
iam_client.roles[subscription_name].get(
f"/subscriptions/{iam_client.subscriptions[subscription_name]}/providers/Microsoft.Authorization/roleDefinitions/{assignment.role_id}"
iam_client.roles[subscription_id].get(
f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/roleDefinitions/{assignment.role_id}"
),
"name",
"",
@@ -18,12 +21,12 @@ class iam_role_user_access_admin_restricted(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=assignment
)
report.subscription = subscription_name
report.subscription = subscription_id
if role_assignment_name == "User Access Administrator":
report.status = "FAIL"
report.status_extended = f"Role assignment {assignment.name} in subscription {subscription_name} grants User Access Administrator role to {getattr(assignment, 'agent_type', '')} {getattr(assignment, 'agent_id', '')}."
report.status_extended = f"Role assignment {assignment.name} in subscription {subscription_name} ({subscription_id}) grants User Access Administrator role to {getattr(assignment, 'agent_type', '')} {getattr(assignment, 'agent_id', '')}."
else:
report.status = "PASS"
report.status_extended = f"Role assignment {assignment.name} in subscription {subscription_name} does not grant User Access Administrator role."
report.status_extended = f"Role assignment {assignment.name} in subscription {subscription_name} ({subscription_id}) does not grant User Access Administrator role."
findings.append(report)
return findings
@@ -23,7 +23,7 @@ class IAM(AzureService):
builtin_roles.update({subscription: {}})
custom_roles.update({subscription: {}})
all_roles = client.role_definitions.list(
scope=f"/subscriptions/{self.subscriptions[subscription]}",
scope=f"/subscriptions/{subscription}",
)
for role in all_roles:
if role.role_type == "CustomRole":
@@ -53,7 +53,7 @@ class IAM(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return builtin_roles, custom_roles
@@ -83,7 +83,7 @@ class IAM(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return role_assignments
@@ -8,20 +8,21 @@ class iam_subscription_roles_owner_custom_not_created(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, roles in iam_client.custom_roles.items():
subscription_name = iam_client.subscriptions.get(subscription, subscription)
for custom_role in roles.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=custom_role
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Role {custom_role.name} from subscription {subscription} is not a custom owner role."
report.status_extended = f"Role {custom_role.name} from subscription {subscription_name} ({subscription}) is not a custom owner role."
for scope in custom_role.assignable_scopes:
if search("^/.*", scope):
for permission_item in custom_role.permissions:
for action in permission_item.actions:
if action == "*":
report.status = "FAIL"
report.status_extended = f"Role {custom_role.name} from subscription {subscription} is a custom owner role."
report.status_extended = f"Role {custom_role.name} from subscription {subscription_name} ({subscription}) is a custom owner role."
break
findings.append(report)
@@ -17,6 +17,9 @@ class keyvault_access_only_through_private_endpoints(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
if (
keyvault.properties
@@ -29,9 +32,9 @@ class keyvault_access_only_through_private_endpoints(Check):
if keyvault.properties.public_network_access_disabled:
report.status = "PASS"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} has public network access disabled and is using private endpoints."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) has public network access disabled and is using private endpoints."
else:
report.status = "FAIL"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} has public network access enabled while using private endpoints."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) has public network access enabled while using private endpoints."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class keyvault_key_expiration_set_in_non_rbac(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
if not keyvault.properties.enable_rbac_authorization:
for key in keyvault.keys or []:
@@ -17,9 +20,9 @@ class keyvault_key_expiration_set_in_non_rbac(Check):
report.subscription = subscription
if not key.attributes.expires:
report.status = "FAIL"
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription} does not have an expiration date set."
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) does not have an expiration date set."
else:
report.status = "PASS"
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription} has an expiration date set."
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) has an expiration date set."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class keyvault_key_rotation_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
for key in keyvault.keys or []:
report = Check_Report_Azure(metadata=self.metadata(), resource=key)
@@ -19,9 +22,9 @@ class keyvault_key_rotation_enabled(Check):
)
):
report.status = "PASS"
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription} has a rotation policy set."
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) has a rotation policy set."
else:
report.status = "FAIL"
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription} does not have a rotation policy set."
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) does not have a rotation policy set."
findings.append(report)
return findings
@@ -6,12 +6,15 @@ class keyvault_logging_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, key_vaults in keyvault_client.key_vaults.items():
for subscription_id, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription_id, subscription_id
)
for keyvault in key_vaults:
report = Check_Report_Azure(metadata=self.metadata(), resource=keyvault)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Key Vault {keyvault.name} in subscription {subscription_name} does not have a diagnostic setting with audit logging."
report.status_extended = f"Key Vault {keyvault.name} in subscription {subscription_name} ({subscription_id}) does not have a diagnostic setting with audit logging."
for diagnostic_setting in keyvault.monitor_diagnostic_settings or []:
has_audit = False
has_all_logs = False
@@ -22,7 +25,7 @@ class keyvault_logging_enabled(Check):
has_all_logs = True
if has_audit and has_all_logs:
report.status = "PASS"
report.status_extended = f"Key Vault {keyvault.name} in subscription {subscription_name} has a diagnostic setting with audit logging."
report.status_extended = f"Key Vault {keyvault.name} in subscription {subscription_name} ({subscription_id}) has a diagnostic setting with audit logging."
break
findings.append(report)
@@ -6,6 +6,9 @@ class keyvault_non_rbac_secret_expiration_set(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
if not keyvault.properties.enable_rbac_authorization:
for secret in keyvault.secrets or []:
@@ -17,9 +20,9 @@ class keyvault_non_rbac_secret_expiration_set(Check):
report.subscription = subscription
if not secret.attributes.expires:
report.status = "FAIL"
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription} does not have an expiration date set."
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) does not have an expiration date set."
else:
report.status = "PASS"
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription} has an expiration date set."
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) has an expiration date set."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class keyvault_private_endpoints(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
report = Check_Report_Azure(metadata=self.metadata(), resource=keyvault)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} is not using private endpoints."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) is not using private endpoints."
if (
keyvault.properties
and keyvault.properties.private_endpoint_connections
):
report.status = "PASS"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} is using private endpoints."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) is using private endpoints."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class keyvault_rbac_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
report = Check_Report_Azure(metadata=self.metadata(), resource=keyvault)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} is not using RBAC for access control."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) is not using RBAC for access control."
if (
keyvault.properties
and keyvault.properties.enable_rbac_authorization
):
report.status = "PASS"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} is using RBAC for access control."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) is using RBAC for access control."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class keyvault_rbac_key_expiration_set(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
if keyvault.properties.enable_rbac_authorization:
for key in keyvault.keys or []:
@@ -17,9 +20,9 @@ class keyvault_rbac_key_expiration_set(Check):
report.subscription = subscription
if not key.attributes.expires:
report.status = "FAIL"
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription} does not have an expiration date set."
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) does not have an expiration date set."
else:
report.status = "PASS"
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription} has an expiration date set."
report.status_extended = f"Key {key.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) has an expiration date set."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class keyvault_rbac_secret_expiration_set(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
if keyvault.properties.enable_rbac_authorization:
for secret in keyvault.secrets or []:
@@ -17,9 +20,9 @@ class keyvault_rbac_secret_expiration_set(Check):
report.subscription = subscription
if not secret.attributes.expires:
report.status = "FAIL"
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription} does not have an expiration date set."
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) does not have an expiration date set."
else:
report.status = "PASS"
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription} has an expiration date set."
report.status_extended = f"Secret {secret.name} in Key Vault {keyvault.name} from subscription {subscription_name} ({subscription}) has an expiration date set."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class keyvault_recoverable(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, key_vaults in keyvault_client.key_vaults.items():
subscription_name = keyvault_client.subscriptions.get(
subscription, subscription
)
for keyvault in key_vaults:
report = Check_Report_Azure(metadata=self.metadata(), resource=keyvault)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} is not recoverable."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) is not recoverable."
if (
keyvault.properties.enable_soft_delete
and keyvault.properties.enable_purge_protection
):
report.status = "PASS"
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription} is recoverable."
report.status_extended = f"Keyvault {keyvault.name} from subscription {subscription_name} ({subscription}) is recoverable."
findings.append(report)
return findings
@@ -56,7 +56,7 @@ class KeyVault(AzureService):
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return key_vaults
@@ -172,7 +172,7 @@ class KeyVault(AzureService):
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
try:
@@ -204,7 +204,7 @@ class KeyVault(AzureService):
# TODO: handle different errors here since we are catching all HTTP Errors here
except HttpResponseError:
logger.warning(
f"Subscription name: {subscription} -- has no access policy configured for keyvault {keyvault_name}"
f"Subscription ID: {subscription} -- has no access policy configured for keyvault {keyvault_name}"
)
return keys
@@ -256,7 +256,7 @@ class KeyVault(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return secrets
@@ -268,13 +268,13 @@ class KeyVault(AzureService):
monitor_diagnostics_settings = []
try:
monitor_diagnostics_settings = monitor_client.diagnostic_settings_with_uri(
self.subscriptions[subscription],
f"subscriptions/{self.subscriptions[subscription]}/resourceGroups/{resource_group}/providers/Microsoft.KeyVault/vaults/{keyvault_name}",
subscription,
f"subscriptions/{subscription}/resourceGroups/{resource_group}/providers/Microsoft.KeyVault/vaults/{keyvault_name}",
monitor_client.clients[subscription],
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return monitor_diagnostics_settings
@@ -8,9 +8,12 @@ class monitor_alert_create_policy_assignment(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Authorization/policyAssignments/write"
@@ -18,19 +21,17 @@ class monitor_alert_create_policy_assignment(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for creating Policy Assignments in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for creating Policy Assignments in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for creating Policy Assignments in subscription {subscription_name}."
report.status_extended = f"There is not an alert for creating Policy Assignments in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_create_update_nsg(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Network/networkSecurityGroups/write"
@@ -18,19 +21,17 @@ class monitor_alert_create_update_nsg(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for creating/updating Network Security Groups in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for creating/updating Network Security Groups in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for creating/updating Network Security Groups in subscription {subscription_name}."
report.status_extended = f"There is not an alert for creating/updating Network Security Groups in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_create_update_public_ip_address_rule(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Network/publicIPAddresses/write"
@@ -18,19 +21,17 @@ class monitor_alert_create_update_public_ip_address_rule(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for creating/updating Public IP address rule in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for creating/updating Public IP address rule in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for creating/updating Public IP address rule in subscription {subscription_name}."
report.status_extended = f"There is not an alert for creating/updating Public IP address rule in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_create_update_security_solution(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Security/securitySolutions/write"
@@ -18,19 +21,17 @@ class monitor_alert_create_update_security_solution(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for creating/updating Security Solution in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for creating/updating Security Solution in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for creating/updating Security Solution in subscription {subscription_name}."
report.status_extended = f"There is not an alert for creating/updating Security Solution in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_create_update_sqlserver_fr(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Sql/servers/firewallRules/write"
@@ -18,19 +21,17 @@ class monitor_alert_create_update_sqlserver_fr(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for creating/updating SQL Server firewall rule in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for creating/updating SQL Server firewall rule in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for creating/updating SQL Server firewall rule in subscription {subscription_name}."
report.status_extended = f"There is not an alert for creating/updating SQL Server firewall rule in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_delete_nsg(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Network/networkSecurityGroups/delete"
@@ -20,19 +23,17 @@ class monitor_alert_delete_nsg(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for deleting Network Security Groups in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for deleting Network Security Groups in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for deleting Network Security Groups in subscription {subscription_name}."
report.status_extended = f"There is not an alert for deleting Network Security Groups in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_delete_policy_assignment(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Authorization/policyAssignments/delete"
@@ -18,19 +21,17 @@ class monitor_alert_delete_policy_assignment(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for deleting policy assignment in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for deleting policy assignment in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for deleting policy assignment in subscription {subscription_name}."
report.status_extended = f"There is not an alert for deleting policy assignment in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -8,9 +8,12 @@ class monitor_alert_delete_public_ip_address_rule(Check):
findings = []
for (
subscription_name,
subscription_id,
activity_log_alerts,
) in monitor_client.alert_rules.items():
subscription_name = monitor_client.subscriptions.get(
subscription_id, subscription_id
)
for alert_rule in activity_log_alerts:
if check_alert_rule(
alert_rule, "Microsoft.Network/publicIPAddresses/delete"
@@ -18,19 +21,17 @@ class monitor_alert_delete_public_ip_address_rule(Check):
report = Check_Report_Azure(
metadata=self.metadata(), resource=alert_rule
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"There is an alert configured for deleting public IP address rule in subscription {subscription_name}."
report.status_extended = f"There is an alert configured for deleting public IP address rule in subscription {subscription_name} ({subscription_id})."
break
else:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{monitor_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"There is not an alert for deleting public IP address rule in subscription {subscription_name}."
report.status_extended = f"There is not an alert for deleting public IP address rule in subscription {subscription_name} ({subscription_id})."
findings.append(report)

Some files were not shown because too many files have changed in this diff Show More