Compare commits

...

16 Commits

Author SHA1 Message Date
Davidm4r 832516be2a fix(mcp_server): bump transitive requests to 2.33.1 (advisory 90553) (#11084) 2026-05-08 12:52:46 +02:00
Prowler Bot 34727a7237 chore(docs): Bump version to v5.25.3 (#11080)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-08 10:40:41 +02:00
Alejandro Bailo 4216a3e23a feat(ui): add cloud-gated custom alerts (#11003) 2026-05-08 10:36:43 +02:00
Pepe Fagoaga a59192e6f5 chore: changelog v5.25.3 (#11077) 2026-05-08 08:37:06 +02:00
Pepe Fagoaga 592bc6f6a8 chore: enable sponsor for prowler-cloud (#11076) 2026-05-08 08:25:28 +02:00
lydiavilchez 962ebac8e4 feat(googleworkspace): add Gmail consequence-based checks for attachment safety and spoofing (#10980) 2026-05-07 16:50:36 +02:00
Hugo Pereira Brito 2c5d47a8cd chore: route vulnerability references to canonical URLs (#10853)
Co-authored-by: Hugo P.Brito <hugopbrito@Mac.home>
2026-05-07 15:28:50 +01:00
Ivan Necheporenko bcaa6ac488 fix(sdk): scan every Azure subscription when display names collide (#10718)
Co-authored-by: Rubén De la Torre Vico <ruben@prowler.com>
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-05-07 13:59:38 +02:00
Rubén De la Torre Vico 71683f3093 chore: remove pre-push from default install hook types (#11072) 2026-05-07 11:19:40 +02:00
Hugo Pereira Brito 2357af912d fix(ui): hide line numbers in CLI command remediation block (#11059) 2026-05-06 14:04:49 +01:00
Pedro Martín 7971b40f49 feat(api): ASD Essential Eight compliance framework support (#10982)
Co-authored-by: César Arroba <cesar@prowler.com>
2026-05-06 14:03:00 +02:00
Pedro Martín e585ae45bd feat(aws): rename Essential Eight to ASD Essential Eight (#11054)
Co-authored-by: César Arroba <cesar@prowler.com>
2026-05-06 13:11:29 +02:00
Hugo Pereira Brito 4d9921a9b7 fix: PR number in changelog entry for #10529 (#11057) 2026-05-06 11:53:31 +01:00
rchotacode 19b602c381 fix(oci): scan identity in known valid region (#10529)
Co-authored-by: Ronan Chota <ronan.chota@saic.com>
Co-authored-by: Hugo P.Brito <hugopbrit@gmail.com>
2026-05-06 11:19:19 +01:00
Pedro Martín 4c3e741af7 chore(pyproject): revert API changes (#11049)
Co-authored-by: César Arroba <cesar@prowler.com>
2026-05-06 12:09:46 +02:00
César Arroba 8affbf44ce revert(ci): drop workflow-level paths on required-check workflows (#11055) 2026-05-06 11:49:42 +02:00
508 changed files with 13001 additions and 1751 deletions
+15
View File
@@ -0,0 +1,15 @@
# These are supported funding model platforms
github: [prowler-cloud]
# patreon: # Replace with a single Patreon username
# open_collective: # Replace with a single Open Collective username
# ko_fi: # Replace with a single Ko-fi username
# tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
# community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
# liberapay: # Replace with a single Liberapay username
# issuehunt: # Replace with a single IssueHunt username
# lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
# polar: # Replace with a single Polar username
# buy_me_a_coffee: # Replace with a single Buy Me a Coffee username
# thanks_dev: # Replace with a single thanks.dev username
# custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']
-10
View File
@@ -5,20 +5,10 @@ on:
branches:
- 'master'
- 'v5.*'
paths:
- 'api/**'
- '.github/workflows/api-tests.yml'
- '.github/workflows/api-code-quality.yml'
- '.github/actions/setup-python-poetry/**'
pull_request:
branches:
- 'master'
- 'v5.*'
paths:
- 'api/**'
- '.github/workflows/api-tests.yml'
- '.github/workflows/api-code-quality.yml'
- '.github/actions/setup-python-poetry/**'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
-8
View File
@@ -5,18 +5,10 @@ on:
branches:
- 'master'
- 'v5.*'
paths:
- 'api/**'
- '.github/workflows/api-tests.yml'
- '.github/actions/setup-python-poetry/**'
pull_request:
branches:
- 'master'
- 'v5.*'
paths:
- 'api/**'
- '.github/workflows/api-tests.yml'
- '.github/actions/setup-python-poetry/**'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
-16
View File
@@ -5,26 +5,10 @@ on:
branches:
- 'master'
- 'v5.*'
paths:
- 'prowler/**'
- 'tests/**'
- 'pyproject.toml'
- 'poetry.lock'
- '.github/workflows/sdk-tests.yml'
- '.github/workflows/sdk-code-quality.yml'
- '.github/actions/setup-python-poetry/**'
pull_request:
branches:
- 'master'
- 'v5.*'
paths:
- 'prowler/**'
- 'tests/**'
- 'pyproject.toml'
- 'poetry.lock'
- '.github/workflows/sdk-tests.yml'
- '.github/workflows/sdk-code-quality.yml'
- '.github/actions/setup-python-poetry/**'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
-14
View File
@@ -5,24 +5,10 @@ on:
branches:
- 'master'
- 'v5.*'
paths:
- 'prowler/**'
- 'tests/**'
- 'pyproject.toml'
- 'poetry.lock'
- '.github/workflows/sdk-tests.yml'
- '.github/actions/setup-python-poetry/**'
pull_request:
branches:
- 'master'
- 'v5.*'
paths:
- 'prowler/**'
- 'tests/**'
- 'pyproject.toml'
- 'poetry.lock'
- '.github/workflows/sdk-tests.yml'
- '.github/actions/setup-python-poetry/**'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
-6
View File
@@ -5,16 +5,10 @@ on:
branches:
- 'master'
- 'v5.*'
paths:
- 'ui/**'
- '.github/workflows/ui-tests.yml'
pull_request:
branches:
- 'master'
- 'v5.*'
paths:
- 'ui/**'
- '.github/workflows/ui-tests.yml'
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
+4 -19
View File
@@ -6,7 +6,7 @@
# P40 — security scanners
# P50 — dependency validation
default_install_hook_types: [pre-commit, pre-push]
default_install_hook_types: [pre-commit]
repos:
## GENERAL (prek built-in — no external repo needed)
@@ -62,12 +62,7 @@ repos:
- id: autoflake
name: "SDK - autoflake"
files: { glob: ["{prowler,tests,dashboard,util,scripts}/**/*.py"] }
args:
[
"--in-place",
"--remove-all-unused-imports",
"--remove-unused-variable",
]
args: ["--in-place", "--remove-all-unused-imports", "--remove-unused-variable"]
priority: 20
- repo: https://github.com/pycqa/isort
@@ -179,8 +174,7 @@ repos:
language: system
types: [python]
files: '.*\.py'
exclude:
{ glob: ["{contrib,skills}/**", "**/.venv/**", "**/*_test.py"] }
exclude: { glob: ["{contrib,skills}/**", "**/.venv/**", "**/*_test.py"] }
priority: 40
- id: safety
@@ -190,16 +184,7 @@ repos:
entry: safety check --policy-file .safety-policy.yml
language: system
pass_filenames: false
files:
{
glob:
[
"**/pyproject.toml",
"**/poetry.lock",
"**/requirements*.txt",
".safety-policy.yml",
],
}
files: { glob: ["**/pyproject.toml", "**/poetry.lock", "**/requirements*.txt", ".safety-policy.yml"] }
priority: 40
- id: vulture
+2
View File
@@ -6,6 +6,8 @@ All notable changes to the **Prowler API** are documented in this file.
### 🚀 Added
- New `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
- ASD Essential Eight (AWS) compliance framework support [(#10982)](https://github.com/prowler-cloud/prowler/pull/10982)
- `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
### 🔐 Security
+4 -4
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.3.2 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.3.4 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -6754,8 +6754,8 @@ uuid6 = "2024.7.10"
[package.source]
type = "git"
url = "https://github.com/prowler-cloud/prowler.git"
reference = "eb1b4190ab2d9c265b46c9ede0298b81bdcf35a8"
resolved_reference = "eb1b4190ab2d9c265b46c9ede0298b81bdcf35a8"
reference = "master"
resolved_reference = "16798e293da365965120961e6539e3a9756564f9"
[[package]]
name = "psutil"
@@ -9424,4 +9424,4 @@ files = [
[metadata]
lock-version = "2.1"
python-versions = ">=3.11,<3.13"
content-hash = "df8a20081fe91c40d071e508dbe19590c8b7ffb5dcc61e71cf30ed016bad5a34"
content-hash = "a3ab982d11a87d951ff15694d2ca7fd51f1f51a451abb0baa067ccf6966367a8"
+2 -2
View File
@@ -25,7 +25,7 @@ dependencies = [
"defusedxml==0.7.1",
"gunicorn==23.0.0",
"lxml==5.3.2",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@eb1b4190ab2d9c265b46c9ede0298b81bdcf35a8",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (==1.3.0)",
"sentry-sdk[django] (==2.56.0)",
@@ -63,6 +63,7 @@ docker = "7.1.0"
filelock = "3.20.3"
freezegun = "1.5.1"
mypy = "1.10.1"
prek = "0.3.9"
pylint = "3.2.5"
pytest = "9.0.3"
pytest-cov = "5.0.0"
@@ -74,4 +75,3 @@ ruff = "0.5.0"
safety = "3.7.0"
tqdm = "4.67.1"
vulture = "2.14"
prek = "0.3.9"
+4
View File
@@ -47,6 +47,9 @@ from prowler.lib.outputs.compliance.csa.csa_oraclecloud import OracleCloudCSA
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.asd_essential_eight.asd_essential_eight_aws import (
ASDEssentialEightAWS,
)
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_azure import AzureISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_gcp import GCPISO27001
@@ -100,6 +103,7 @@ COMPLIANCE_CLASS_MAP = {
(lambda name: name.startswith("ccc_"), CCC_AWS),
(lambda name: name.startswith("c5_"), AWSC5),
(lambda name: name.startswith("csa_"), AWSCSA),
(lambda name: name == "asd_essential_eight_aws", ASDEssentialEightAWS),
],
"azure": [
(lambda name: name.startswith("cis_"), AzureCIS),
+16
View File
@@ -134,6 +134,22 @@ prek installed at `.git/hooks/pre-commit`
If pre-commit hooks were previously installed, run `prek install --overwrite` to replace the existing hook. Otherwise, both tools will run on each commit.
</Warning>
#### Enable TruffleHog as a Pre-Push Hook
By default, only `pre-commit` hooks are installed. To enable [`TruffleHog`](https://github.com/trufflesecurity/trufflehog) secret scanning on every push, install the `pre-push` hook type explicitly:
```shell
prek install --hook-type pre-push
```
Successful installation should produce the following output:
```shell
prek installed at `.git/hooks/pre-push`
```
Once installed, TruffleHog runs before each push and blocks the operation when verified secrets are detected.
### Code Quality and Security Checks
Before merging pull requests, several automated checks and utilities ensure code security and updated dependencies:
@@ -121,8 +121,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.25.2"
PROWLER_API_VERSION="5.25.2"
PROWLER_UI_VERSION="5.25.3"
PROWLER_API_VERSION="5.25.3"
```
<Note>
+3 -3
View File
@@ -1009,7 +1009,7 @@ wheels = [
[[package]]
name = "requests"
version = "2.32.5"
version = "2.33.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
@@ -1017,9 +1017,9 @@ dependencies = [
{ name = "idna" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c9/74/b3ff8e6c8446842c3f5c837e9c3dfcfe2018ea6ecef224c710c85ef728f4/requests-2.32.5.tar.gz", hash = "sha256:dbba0bac56e100853db0ea71b82b4dfd5fe2bf6d3754a8893c3af500cec7d7cf", size = 134517, upload-time = "2025-08-18T20:46:02.573Z" }
sdist = { url = "https://files.pythonhosted.org/packages/5f/a4/98b9c7c6428a668bf7e42ebb7c79d576a1c3c1e3ae2d47e674b468388871/requests-2.33.1.tar.gz", hash = "sha256:18817f8c57c6263968bc123d237e3b8b08ac046f5456bd1e307ee8f4250d3517", size = 134120, upload-time = "2026-03-30T16:09:15.531Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" },
{ url = "https://files.pythonhosted.org/packages/d7/8e/7540e8a2036f79a125c1d2ebadf69ed7901608859186c856fa0388ef4197/requests-2.33.1-py3-none-any.whl", hash = "sha256:4e6d1ef462f3626a1f0a0a9c42dd93c63bad33f9f1c1937509b8c5c8718ab56a", size = 64947, upload-time = "2026-03-30T16:09:13.83Z" },
]
[[package]]
+11
View File
@@ -11,6 +11,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- ASD Essential Eight Maturity Model compliance framework for AWS (Maturity Level One, Nov 2023) [(#10808)](https://github.com/prowler-cloud/prowler/pull/10808)
- Update Vercel checks to return personalized finding status extended depending on billing plan and classify them with billing-plan categories [(#10663)](https://github.com/prowler-cloud/prowler/pull/10663)
- `bedrock_prompt_management_exists` check for AWS provider [(#10878)](https://github.com/prowler-cloud/prowler/pull/10878)
- 8 Gmail attachment safety and spoofing protection checks for Google Workspace provider using the Cloud Identity Policy API [(#10980)](https://github.com/prowler-cloud/prowler/pull/10980)
### 🔄 Changed
@@ -19,12 +20,14 @@ All notable changes to the **Prowler SDK** are documented in this file.
- AWS CodeBuild service now batches `BatchGetProjects` and `BatchGetBuilds` calls per region (up to 100 items per call) to reduce API call volume and prevent throttling-induced false positives in `codebuild_project_not_publicly_accessible` [(#10639)](https://github.com/prowler-cloud/prowler/pull/10639)
- `display_compliance_table` dispatch switched from substring `in` checks to `startswith` to prevent false matches between similarly named frameworks (e.g. `cisa` vs `cis`) [(#10301)](https://github.com/prowler-cloud/prowler/pull/10301)
- Restore the `ec2-imdsv1` category for EC2 IMDS checks to keep Attack Surface and findings filters aligned [(#10998)](https://github.com/prowler-cloud/prowler/pull/10998)
- Container image CVE findings and IaC findings now use official CVE, Prowler Hub, or GitHub Security Advisory URLs instead of Aqua advisory URLs in remediation and references; Trivy rule IDs map to Prowler Hub without the `AVD-` prefix so links resolve [(#10853)](https://github.com/prowler-cloud/prowler/pull/10853)
### 🐞 Fixed
- AWS SDK test isolation: autouse `mock_aws` fixture and leak detector in `conftest.py` to prevent tests from hitting real AWS endpoints, with idempotent organization setup for tests calling `set_mocked_aws_provider` multiple times [(#10605)](https://github.com/prowler-cloud/prowler/pull/10605)
- AWS `boto` user agent extra is now applied to every client [(#10944)](https://github.com/prowler-cloud/prowler/pull/10944)
- Image provider connection check no longer fails with a misleading `host='https'` resolution error when the registry URL includes an `http://` or `https://` scheme prefix [(#10950)](https://github.com/prowler-cloud/prowler/pull/10950)
- Azure subscriptions sharing the same display name are no longer collapsed into a single identity entry, so every subscription is scanned [(#10718)](https://github.com/prowler-cloud/prowler/pull/10718)
### 🔐 Security
@@ -33,6 +36,14 @@ All notable changes to the **Prowler SDK** are documented in this file.
---
## [5.25.3] (Prowler v5.25.3)
### 🐞 Fixed
- Oracle Cloud identity scans known or supplied regions to better support non Ashburn tenancies [(#10529)](https://github.com/prowler-cloud/prowler/pull/10529)
---
## [5.25.2] (Prowler v5.25.2)
### 🐞 Fixed
+7 -7
View File
@@ -57,6 +57,9 @@ from prowler.lib.check.models import CheckMetadata
from prowler.lib.cli.parser import ProwlerArgumentParser
from prowler.lib.logger import logger, set_logging_config
from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.asd_essential_eight.asd_essential_eight_aws import (
ASDEssentialEightAWS,
)
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
AWSWellArchitected,
)
@@ -90,9 +93,6 @@ from prowler.lib.outputs.compliance.csa.csa_oraclecloud import OracleCloudCSA
from prowler.lib.outputs.compliance.ens.ens_aws import AWSENS
from prowler.lib.outputs.compliance.ens.ens_azure import AzureENS
from prowler.lib.outputs.compliance.ens.ens_gcp import GCPENS
from prowler.lib.outputs.compliance.essential_eight.essential_eight_aws import (
EssentialEightAWS,
)
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.iso27001.iso27001_aws import AWSISO27001
from prowler.lib.outputs.compliance.iso27001.iso27001_azure import AzureISO27001
@@ -676,18 +676,18 @@ def prowler():
)
generated_outputs["compliance"].append(cis)
cis.batch_write_data_to_file()
elif compliance_name.startswith("essential_eight"):
elif compliance_name.startswith("asd_essential_eight"):
filename = (
f"{output_options.output_directory}/compliance/"
f"{output_options.output_filename}_{compliance_name}.csv"
)
essential_eight = EssentialEightAWS(
asd_essential_eight = ASDEssentialEightAWS(
findings=finding_outputs,
compliance=bulk_compliance_frameworks[compliance_name],
file_path=filename,
)
generated_outputs["compliance"].append(essential_eight)
essential_eight.batch_write_data_to_file()
generated_outputs["compliance"].append(asd_essential_eight)
asd_essential_eight.batch_write_data_to_file()
elif compliance_name == "mitre_attack_aws":
# Generate MITRE ATT&CK Finding Object
filename = (
@@ -1,5 +1,5 @@
{
"Framework": "Essential-Eight",
"Framework": "ASD-Essential-Eight",
"Name": "ASD Essential Eight Maturity Model - Maturity Level One (AWS)",
"Version": "Nov 2023",
"Provider": "AWS",
@@ -653,7 +653,9 @@
{
"Id": "3.1.3.4.1.1",
"Description": "Ensure protection against encrypted attachments from untrusted senders is enabled",
"Checks": [],
"Checks": [
"gmail_encrypted_attachment_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -674,7 +676,9 @@
{
"Id": "3.1.3.4.1.2",
"Description": "Ensure protection against attachments with scripts from untrusted senders is enabled",
"Checks": [],
"Checks": [
"gmail_script_attachment_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -695,7 +699,9 @@
{
"Id": "3.1.3.4.1.3",
"Description": "Ensure protection against anomalous attachment types in emails is enabled",
"Checks": [],
"Checks": [
"gmail_anomalous_attachment_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -785,7 +791,9 @@
{
"Id": "3.1.3.4.3.1",
"Description": "Ensure protection against domain spoofing based on similar domain names is enabled",
"Checks": [],
"Checks": [
"gmail_domain_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -806,7 +814,9 @@
{
"Id": "3.1.3.4.3.2",
"Description": "Ensure protection against spoofing of employee names is enabled",
"Checks": [],
"Checks": [
"gmail_employee_name_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -827,7 +837,9 @@
{
"Id": "3.1.3.4.3.3",
"Description": "Ensure protection against inbound emails spoofing your domain is enabled",
"Checks": [],
"Checks": [
"gmail_inbound_domain_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -848,7 +860,9 @@
{
"Id": "3.1.3.4.3.4",
"Description": "Ensure protection against any unauthenticated emails is enabled",
"Checks": [],
"Checks": [
"gmail_unauthenticated_email_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -869,7 +883,9 @@
{
"Id": "3.1.3.4.3.5",
"Description": "Ensure groups are protected from inbound emails spoofing your domain",
"Checks": [],
"Checks": [
"gmail_groups_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -649,7 +649,9 @@
{
"Id": "GWS.GMAIL.5.1",
"Description": "Protect against encrypted attachments from untrusted senders SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_encrypted_attachment_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -662,7 +664,9 @@
{
"Id": "GWS.GMAIL.5.2",
"Description": "Protect against attachments with scripts from untrusted senders SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_script_attachment_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -675,7 +679,9 @@
{
"Id": "GWS.GMAIL.5.3",
"Description": "Protect against anomalous attachment types in emails SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_anomalous_attachment_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -798,7 +804,9 @@
{
"Id": "GWS.GMAIL.7.1",
"Description": "Protect against domain spoofing based on similar domain names SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_domain_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -811,7 +819,9 @@
{
"Id": "GWS.GMAIL.7.2",
"Description": "Protect against spoofing of employee names SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_employee_name_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -824,7 +834,9 @@
{
"Id": "GWS.GMAIL.7.3",
"Description": "Protect against inbound emails spoofing your domain SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_inbound_domain_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -837,7 +849,9 @@
{
"Id": "GWS.GMAIL.7.4",
"Description": "Protect against any unauthenticated emails SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_unauthenticated_email_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
@@ -850,7 +864,9 @@
{
"Id": "GWS.GMAIL.7.5",
"Description": "Protect your Groups from inbound emails spoofing your domain SHALL be enabled",
"Checks": [],
"Checks": [
"gmail_groups_spoofing_protection_enabled"
],
"Attributes": [
{
"Section": "Gmail",
+5 -2
View File
@@ -749,8 +749,11 @@ def execute(
if global_provider.type == "cloudflare":
is_finding_muted_args["account_id"] = finding.account_id
if global_provider.type == "azure":
is_finding_muted_args["subscription_id"] = (
global_provider.identity.subscriptions.get(finding.subscription)
is_finding_muted_args["subscription_id"] = finding.subscription
is_finding_muted_args["subscription_name"] = (
global_provider.identity.subscriptions.get(
finding.subscription, finding.subscription
)
)
is_finding_muted_args["finding"] = finding
finding.muted = global_provider.mutelist.is_finding_muted(
+8 -8
View File
@@ -102,7 +102,7 @@ class CIS_Requirement_Attribute(BaseModel):
References: str
class EssentialEight_Requirement_Attribute_MaturityLevel(str, Enum):
class ASDEssentialEight_Requirement_Attribute_MaturityLevel(str, Enum):
"""ASD Essential Eight Maturity Level"""
ML1 = "ML1"
@@ -110,14 +110,14 @@ class EssentialEight_Requirement_Attribute_MaturityLevel(str, Enum):
ML3 = "ML3"
class EssentialEight_Requirement_Attribute_AssessmentStatus(str, Enum):
class ASDEssentialEight_Requirement_Attribute_AssessmentStatus(str, Enum):
"""Essential Eight Requirement Attribute Assessment Status"""
Manual = "Manual"
Automated = "Automated"
class EssentialEight_Requirement_Attribute_CloudApplicability(str, Enum):
class ASDEssentialEight_Requirement_Attribute_CloudApplicability(str, Enum):
"""How well the ASD control maps to AWS cloud infrastructure."""
Full = "full"
@@ -127,13 +127,13 @@ class EssentialEight_Requirement_Attribute_CloudApplicability(str, Enum):
# Essential Eight Requirement Attribute
class EssentialEight_Requirement_Attribute(BaseModel):
class ASDEssentialEight_Requirement_Attribute(BaseModel):
"""ASD Essential Eight Requirement Attribute"""
Section: str
MaturityLevel: EssentialEight_Requirement_Attribute_MaturityLevel
AssessmentStatus: EssentialEight_Requirement_Attribute_AssessmentStatus
CloudApplicability: EssentialEight_Requirement_Attribute_CloudApplicability
MaturityLevel: ASDEssentialEight_Requirement_Attribute_MaturityLevel
AssessmentStatus: ASDEssentialEight_Requirement_Attribute_AssessmentStatus
CloudApplicability: ASDEssentialEight_Requirement_Attribute_CloudApplicability
MitigatedThreats: list[str]
Description: str
RationaleStatement: str
@@ -292,7 +292,7 @@ class Compliance_Requirement(BaseModel):
Name: Optional[str] = None
Attributes: list[
Union[
EssentialEight_Requirement_Attribute,
ASDEssentialEight_Requirement_Attribute,
CIS_Requirement_Attribute,
ENS_Requirement_Attribute,
ISO27001_2013_Requirement_Attribute,
@@ -4,7 +4,7 @@ from tabulate import tabulate
from prowler.config.config import orange_color
def get_essential_eight_table(
def get_asd_essential_eight_table(
findings: list,
bulk_checks_metadata: dict,
compliance_framework: str,
@@ -13,7 +13,7 @@ def get_essential_eight_table(
compliance_overview: bool,
):
sections = {}
essential_eight_compliance_table = {
asd_essential_eight_compliance_table = {
"Provider": [],
"Section": [],
"Status": [],
@@ -26,7 +26,7 @@ def get_essential_eight_table(
check = bulk_checks_metadata[finding.check_metadata.CheckID]
check_compliances = check.Compliance
for compliance in check_compliances:
if compliance.Framework == "Essential-Eight":
if compliance.Framework == "ASD-Essential-Eight":
for requirement in compliance.Requirements:
for attribute in requirement.Attributes:
section = attribute.Section
@@ -50,19 +50,19 @@ def get_essential_eight_table(
sections = dict(sorted(sections.items()))
for section in sections:
essential_eight_compliance_table["Provider"].append(compliance.Provider)
essential_eight_compliance_table["Section"].append(section)
asd_essential_eight_compliance_table["Provider"].append(compliance.Provider)
asd_essential_eight_compliance_table["Section"].append(section)
if sections[section]["FAIL"] > 0:
essential_eight_compliance_table["Status"].append(
asd_essential_eight_compliance_table["Status"].append(
f"{Fore.RED}FAIL({sections[section]['FAIL']}){Style.RESET_ALL}"
)
elif sections[section]["PASS"] > 0:
essential_eight_compliance_table["Status"].append(
asd_essential_eight_compliance_table["Status"].append(
f"{Fore.GREEN}PASS({sections[section]['PASS']}){Style.RESET_ALL}"
)
else:
essential_eight_compliance_table["Status"].append("-")
essential_eight_compliance_table["Muted"].append(
asd_essential_eight_compliance_table["Status"].append("-")
asd_essential_eight_compliance_table["Muted"].append(
f"{orange_color}{sections[section]['Muted']}{Style.RESET_ALL}"
)
if len(fail_count) + len(pass_count) + len(muted_count) > 1:
@@ -84,7 +84,7 @@ def get_essential_eight_table(
)
print(
tabulate(
essential_eight_compliance_table,
asd_essential_eight_compliance_table,
headers="keys",
tablefmt="rounded_grid",
)
@@ -1,13 +1,13 @@
from prowler.config.config import timestamp
from prowler.lib.check.compliance_models import Compliance
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.compliance.essential_eight.models import (
EssentialEightAWSModel,
from prowler.lib.outputs.compliance.asd_essential_eight.models import (
ASDEssentialEightAWSModel,
)
from prowler.lib.outputs.compliance.compliance_output import ComplianceOutput
from prowler.lib.outputs.finding import Finding
class EssentialEightAWS(ComplianceOutput):
class ASDEssentialEightAWS(ComplianceOutput):
"""
This class represents the AWS ASD Essential Eight compliance output.
@@ -41,7 +41,7 @@ class EssentialEightAWS(ComplianceOutput):
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
compliance_row = ASDEssentialEightAWSModel(
Provider=finding.provider,
Description=compliance.Description,
AccountId=finding.account_uid,
@@ -77,7 +77,7 @@ class EssentialEightAWS(ComplianceOutput):
for requirement in compliance.Requirements:
if not requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = EssentialEightAWSModel(
compliance_row = ASDEssentialEightAWSModel(
Provider=compliance.Provider.lower(),
Description=compliance.Description,
AccountId="",
@@ -1,9 +1,9 @@
from pydantic.v1 import BaseModel
class EssentialEightAWSModel(BaseModel):
class ASDEssentialEightAWSModel(BaseModel):
"""
EssentialEightAWSModel generates a finding's output in AWS ASD Essential Eight Compliance format.
ASDEssentialEightAWSModel generates a finding's output in AWS ASD Essential Eight Compliance format.
"""
Provider: str
+5 -5
View File
@@ -1,6 +1,9 @@
import sys
from prowler.lib.logger import logger
from prowler.lib.outputs.compliance.asd_essential_eight.asd_essential_eight import (
get_asd_essential_eight_table,
)
from prowler.lib.outputs.compliance.c5.c5 import get_c5_table
from prowler.lib.outputs.compliance.ccc.ccc import get_ccc_table
from prowler.lib.outputs.compliance.cis.cis import get_cis_table
@@ -9,9 +12,6 @@ from prowler.lib.outputs.compliance.compliance_check import ( # noqa: F401 - re
)
from prowler.lib.outputs.compliance.csa.csa import get_csa_table
from prowler.lib.outputs.compliance.ens.ens import get_ens_table
from prowler.lib.outputs.compliance.essential_eight.essential_eight import (
get_essential_eight_table,
)
from prowler.lib.outputs.compliance.generic.generic_table import (
get_generic_compliance_table,
)
@@ -233,8 +233,8 @@ def display_compliance_table(
output_directory,
compliance_overview,
)
elif "essential_eight" in compliance_framework:
get_essential_eight_table(
elif "asd_essential_eight" in compliance_framework:
get_asd_essential_eight_table(
findings,
bulk_checks_metadata,
compliance_framework,
+4 -2
View File
@@ -187,9 +187,11 @@ class Finding(BaseModel):
output_data["account_uid"] = (
output_data["account_organization_uid"]
if "Tenant:" in check_output.subscription
else provider.identity.subscriptions[check_output.subscription]
else check_output.subscription
)
output_data["account_name"] = provider.identity.subscriptions.get(
check_output.subscription, check_output.subscription
)
output_data["account_name"] = check_output.subscription
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.location
+5 -2
View File
@@ -492,8 +492,11 @@ class HTML(Output):
"""
try:
printed_subscriptions = []
for key, value in provider.identity.subscriptions.items():
intermediate = f"{key} : {value}"
for (
subscription_id,
display_name,
) in provider.identity.subscriptions.items():
intermediate = f"{display_name} : {subscription_id}"
printed_subscriptions.append(intermediate)
# check if identity is str(coming from SP) or dict(coming from browser or)
+5 -2
View File
@@ -82,8 +82,11 @@ class Slack:
logo = gcp_logo
elif provider.type == "azure":
printed_subscriptions = []
for key, value in provider.identity.subscriptions.items():
intermediate = f"- *{key}: {value}*\n"
for (
subscription_id,
display_name,
) in provider.identity.subscriptions.items():
intermediate = f"- *{subscription_id}: {display_name}*\n"
printed_subscriptions.append(intermediate)
identity = f"Azure Subscriptions:\n{''.join(printed_subscriptions)}"
logo = azure_logo
+6 -2
View File
@@ -185,9 +185,13 @@ def display_summary_table(
print(
f"\n{entity_type} {Fore.YELLOW}{audited_entities}{Style.RESET_ALL} Scan Results (severity columns are for fails only):"
)
if provider == "azure":
if provider.type == "azure":
scanned_subscriptions = ", ".join(
f"{display_name} ({subscription_id})"
for subscription_id, display_name in provider.identity.subscriptions.items()
)
print(
f"\nSubscriptions scanned: {Fore.YELLOW}{' '.join(provider.identity.subscriptions.keys())}{Style.RESET_ALL}"
f"\nSubscriptions scanned: {Fore.YELLOW}{scanned_subscriptions}{Style.RESET_ALL}"
)
print(tabulate(findings_table, headers="keys", tablefmt="rounded_grid"))
print(
@@ -0,0 +1,90 @@
import re
from urllib.parse import parse_qs, urlparse
AQUA_REFERENCE_HOST = "avd.aquasec.com"
GITHUB_ADVISORY_URL = "https://github.com/advisories/{advisory_id}"
PROWLER_HUB_CHECK_URL = "https://hub.prowler.com/check/{check_id}"
_CVE_ID_PATTERN = re.compile(r"^CVE-\d{4}-\d+$", re.IGNORECASE)
_GHSA_ID_PATTERN = re.compile(r"^GHSA(?:-[a-z0-9]{4}){3}$", re.IGNORECASE)
def _dedupe_preserve_order(urls: list[str]) -> list[str]:
seen: set[str] = set()
ordered_urls: list[str] = []
for url in urls:
if not url or not url.strip():
continue
normalized_url = url.strip()
if normalized_url in seen:
continue
seen.add(normalized_url)
ordered_urls.append(normalized_url)
return ordered_urls
def _is_aqua_reference(url: str) -> bool:
return AQUA_REFERENCE_HOST in urlparse(url).netloc.lower()
def _build_cve_org_url(vulnerability_id: str) -> str:
return f"https://www.cve.org/CVERecord?id={vulnerability_id.upper()}"
def build_finding_reference_url(finding_id: str) -> str:
"""Map a Trivy finding ID to a stable, real reference URL.
- CVE-XXXX-NNNN cve.org record
- GHSA- github.com/advisories
- everything else hub.prowler.com/check/<id>, stripping a leading
"AVD-" prefix because Prowler Hub indexes Trivy rules by the
non-prefixed ID (e.g., "AWS-0001" not "AVD-AWS-0001").
"""
normalized = finding_id.strip().upper()
if _CVE_ID_PATTERN.match(normalized):
return _build_cve_org_url(normalized)
if _GHSA_ID_PATTERN.match(normalized):
return GITHUB_ADVISORY_URL.format(advisory_id=normalized)
hub_id = normalized[4:] if normalized.startswith("AVD-") else normalized
return PROWLER_HUB_CHECK_URL.format(check_id=hub_id)
def _is_cve_org_url(url: str, vulnerability_id: str) -> bool:
parsed_url = urlparse(url)
if parsed_url.netloc.lower() != "www.cve.org":
return False
query_value = parse_qs(parsed_url.query).get("id", [""])[0]
return query_value.upper() == vulnerability_id.upper()
def resolve_vulnerability_reference_urls(
vulnerability_id: str,
references: list[str] | None = None,
primary_url: str = "",
) -> tuple[str, list[str]]:
"""Resolve non-Aqua vulnerability URLs, prioritizing official CVE destinations."""
candidate_urls = list(references or [])
if primary_url and primary_url not in candidate_urls:
candidate_urls.append(primary_url)
filtered_urls = _dedupe_preserve_order(
[url for url in candidate_urls if not _is_aqua_reference(url)]
)
if not _CVE_ID_PATTERN.match(vulnerability_id):
return "", filtered_urls
cve_org_urls = [
url for url in filtered_urls if _is_cve_org_url(url, vulnerability_id)
]
recommendation_url = (
cve_org_urls[0] if cve_org_urls else _build_cve_org_url(vulnerability_id)
)
return recommendation_url, [recommendation_url]
+29 -18
View File
@@ -441,8 +441,8 @@ class AzureProvider(Provider):
None
"""
printed_subscriptions = []
for key, value in self._identity.subscriptions.items():
intermediate = key + ": " + value
for subscription_id, display_name in self._identity.subscriptions.items():
intermediate = display_name + ": " + subscription_id
printed_subscriptions.append(intermediate)
report_lines = [
f"Azure Tenant Domain: {Fore.YELLOW}{self._identity.tenant_domain}{Style.RESET_ALL} Azure Tenant ID: {Fore.YELLOW}{self._identity.tenant_ids[0]}{Style.RESET_ALL}",
@@ -969,19 +969,30 @@ class AzureProvider(Provider):
)
if not subscription_ids:
logger.info("Scanning all the Azure subscriptions...")
for subscription in subscriptions_client.subscriptions.list():
# TODO: get tags or labels
# TODO: fill with AzureSubscription
identity.subscriptions.update(
{subscription.display_name: subscription.subscription_id}
)
# TODO: get tags or labels
# TODO: fill with AzureSubscription
subscription_pairs = [
(subscription.display_name, subscription.subscription_id)
for subscription in subscriptions_client.subscriptions.list()
]
else:
logger.info("Scanning the subscriptions passed as argument ...")
for id in subscription_ids:
subscription = subscriptions_client.subscriptions.get(
subscription_id=id
subscription_pairs = [
(
subscriptions_client.subscriptions.get(
subscription_id=id
).display_name,
id,
)
identity.subscriptions.update({subscription.display_name: id})
for id in subscription_ids
]
# Key the subscriptions dict by subscription ID (which is
# guaranteed unique) and store the display name as the value.
# This avoids collisions when multiple subscriptions share
# the same display name.
for display_name, subscription_id in subscription_pairs:
identity.subscriptions[subscription_id] = display_name
# If there are no subscriptions listed -> checks are not going to be run against any resource
if not identity.subscriptions:
@@ -1017,28 +1028,28 @@ class AzureProvider(Provider):
Returns:
A dictionary containing the locations available for each subscription. The dictionary
has subscription display names as keys and lists of location names as values.
has subscription IDs as keys and lists of location names as values.
Examples:
>>> provider = AzureProvider(...)
>>> provider.get_locations()
{
'Subscription 1': ['eastus', 'eastus2', 'westus', 'westus2'],
'Subscription 2': ['eastus', 'eastus2', 'westus', 'westus2']
'sub-id-1': ['eastus', 'eastus2', 'westus', 'westus2'],
'sub-id-2': ['eastus', 'eastus2', 'westus', 'westus2']
}
"""
credentials = self.session
subscription_client = SubscriptionClient(credentials)
locations = {}
for display_name, subscription_id in self._identity.subscriptions.items():
locations[display_name] = []
for subscription_id, display_name in self._identity.subscriptions.items():
locations[subscription_id] = []
# List locations for each subscription
for location in subscription_client.subscriptions.list_locations(
subscription_id
):
locations[display_name].append(location.name)
locations[subscription_id].append(location.name)
return locations
@@ -8,17 +8,23 @@ class AzureMutelist(Mutelist):
self,
finding: Check_Report_Azure,
subscription_id: str,
subscription_name: str = "",
) -> bool:
return self.is_muted(
subscription_id, # support Azure Subscription ID in mutelist
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
) or self.is_muted(
finding.subscription, # support Azure Subscription Name in mutelist
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
unroll_dict(unroll_tags(finding.resource_tags)),
)
account_names = [subscription_id]
for account_name in (subscription_name, finding.subscription):
if account_name and account_name not in account_names:
account_names.append(account_name)
tags = unroll_dict(unroll_tags(finding.resource_tags))
for account_name in account_names:
if self.is_muted(
account_name,
finding.check_metadata.CheckID,
finding.location,
finding.resource_name,
tags,
):
return True
return False
@@ -49,15 +49,15 @@ class AzureService:
if "GraphServiceClient" in str(service):
clients.update({identity.tenant_domain: service(credentials=session)})
elif "LogsQueryClient" in str(service):
for display_name, id in identity.subscriptions.items():
clients.update({display_name: service(credential=session)})
for subscription_id, display_name in identity.subscriptions.items():
clients.update({subscription_id: service(credential=session)})
else:
for display_name, id in identity.subscriptions.items():
for subscription_id, display_name in identity.subscriptions.items():
clients.update(
{
display_name: service(
subscription_id: service(
credential=session,
subscription_id=id,
subscription_id=subscription_id,
base_url=region_config.base_url,
credential_scopes=region_config.credential_scopes,
)
@@ -36,7 +36,7 @@ class AISearch(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return aisearch_services
@@ -9,20 +9,23 @@ class aisearch_service_not_publicly_accessible(Check):
findings = []
for (
subscription_name,
subscription_id,
aisearch_services,
) in aisearch_client.aisearch_services.items():
subscription_name = aisearch_client.subscriptions.get(
subscription_id, subscription_id
)
for aisearch_service in aisearch_services.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=aisearch_service
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} allows public access."
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} ({subscription_id}) allows public access."
if not aisearch_service.public_network_access:
report.status = "PASS"
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} does not allows public access."
report.status_extended = f"AISearch Service {aisearch_service.name} from subscription {subscription_name} ({subscription_id}) does not allows public access."
findings.append(report)
@@ -6,16 +6,19 @@ class aks_cluster_rbac_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster in clusters.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"RBAC is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"RBAC is enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
if not cluster.rbac_enabled:
report.status = "FAIL"
report.status_extended = f"RBAC is not enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"RBAC is not enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -6,17 +6,20 @@ class aks_clusters_created_with_private_nodes(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster in clusters.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Cluster '{cluster.name}' was created with private nodes in subscription '{subscription_name}'"
report.status_extended = f"Cluster '{cluster.name}' was created with private nodes in subscription '{subscription_name} ({subscription_id})'"
for agent_pool in cluster.agent_pool_profiles:
if getattr(agent_pool, "enable_node_public_ip", True):
report.status = "FAIL"
report.status_extended = f"Cluster '{cluster.name}' was not created with private nodes in subscription '{subscription_name}'"
report.status_extended = f"Cluster '{cluster.name}' was not created with private nodes in subscription '{subscription_name} ({subscription_id})'"
break
findings.append(report)
@@ -6,18 +6,21 @@ class aks_clusters_public_access_disabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster in clusters.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Public access to nodes is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'"
report.status_extended = f"Public access to nodes is enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'"
if cluster.private_fqdn:
for agent_pool in cluster.agent_pool_profiles:
if not getattr(agent_pool, "enable_node_public_ip", False):
report.status = "PASS"
report.status_extended = f"Public access to nodes is disabled for cluster '{cluster.name}' in subscription '{subscription_name}'"
report.status_extended = f"Public access to nodes is disabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'"
findings.append(report)
@@ -6,16 +6,19 @@ class aks_network_policy_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, clusters in aks_client.clusters.items():
for subscription_id, clusters in aks_client.clusters.items():
subscription_name = aks_client.subscriptions.get(
subscription_id, subscription_id
)
for cluster_id, cluster in clusters.items():
report = Check_Report_Azure(metadata=self.metadata(), resource=cluster)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Network policy is enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"Network policy is enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
if not getattr(cluster, "network_policy", False):
report.status = "FAIL"
report.status_extended = f"Network policy is not enabled for cluster '{cluster.name}' in subscription '{subscription_name}'."
report.status_extended = f"Network policy is not enabled for cluster '{cluster.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -17,14 +17,14 @@ class AKS(AzureService):
logger.info("AKS - Getting clusters...")
clusters = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
clusters_list = client.managed_clusters.list()
clusters.update({subscription_name: {}})
clusters.update({subscription_id: {}})
for cluster in clusters_list:
if getattr(cluster, "kubernetes_version", None):
clusters[subscription_name].update(
clusters[subscription_id].update(
{
cluster.id: Cluster(
id=cluster.id,
@@ -60,7 +60,7 @@ class AKS(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return clusters
@@ -147,7 +147,7 @@ class APIM(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return instances
@@ -50,9 +50,11 @@ class apim_threat_detection_llm_jacking(Check):
],
)
# 1. Aggregate logs from all APIM instances first
all_llm_logs: List[LogsQueryLogEntry] = []
for subscription, instances in apim_client.instances.items():
subscription_name = apim_client.subscriptions.get(
subscription, subscription
)
all_llm_logs: List[LogsQueryLogEntry] = []
for instance in instances:
if instance.log_analytics_workspace_id:
logs = apim_client.get_llm_operations_logs(
@@ -60,7 +62,8 @@ class apim_threat_detection_llm_jacking(Check):
)
all_llm_logs.extend(logs)
# 2. Perform a single, global analysis on all collected logs
# Analyze logs only within the current subscription to avoid
# cross-subscription attribution when scanning multiple subscriptions.
potential_llm_jacking_attackers = {}
for log in all_llm_logs:
operation_name = log.operation_id
@@ -91,19 +94,17 @@ class apim_threat_detection_llm_jacking(Check):
report = Check_Report_Azure(self.metadata(), resource=resource)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Potential LLM Jacking attack detected from IP address {principal_ip} with a threshold of {action_ratio}."
report.status_extended = f"Potential LLM Jacking attack detected from IP address {principal_ip} in subscription {subscription_name} ({subscription}) with an action ratio of {action_ratio}, above the configured threshold of {threshold}."
findings.append(report)
# 4. If no threats were found after checking all principals, create a single PASS report
# If no threats were found after checking all principals, create a single PASS report.
if not found_potential_llm_jacking_attackers:
report = Check_Report_Azure(self.metadata(), resource={})
report.resource_name = subscription
report.resource_id = (
f"/subscriptions/{apim_client.subscriptions[subscription]}"
)
report.resource_name = subscription_name
report.resource_id = f"/subscriptions/{subscription}"
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"No potential LLM Jacking attacks detected across all monitored APIM instances in the last {threat_detection_minutes} minutes."
report.status_extended = f"No potential LLM Jacking attacks detected across monitored APIM instances in subscription {subscription_name} ({subscription}) in the last {threat_detection_minutes} minutes."
findings.append(report)
return findings
@@ -7,18 +7,21 @@ class app_client_certificates_on(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Clients are required to present a certificate for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Clients are required to present a certificate for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if app.client_cert_mode != "Required":
report.status = "FAIL"
report.status_extended = f"Clients are not required to present a certificate for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Clients are not required to present a certificate for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,18 +7,21 @@ class app_ensure_auth_is_set_up(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Authentication is set up for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Authentication is set up for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if not app.auth_enabled:
report.status = "FAIL"
report.status_extended = f"Authentication is not set up for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Authentication is not set up for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,18 +7,21 @@ class app_ensure_http_is_redirected_to_https(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"HTTP is redirected to HTTPS for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP is redirected to HTTPS for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if not app.https_only:
report.status = "FAIL"
report.status_extended = f"HTTP is not redirected to HTTPS for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP is not redirected to HTTPS for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,9 +7,12 @@ class app_ensure_java_version_is_latest(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
linux_framework = getattr(app.configurations, "linux_fx_version", "")
windows_framework_version = getattr(
@@ -18,19 +21,19 @@ class app_ensure_java_version_is_latest(Check):
if "java" in linux_framework.lower() or windows_framework_version:
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
java_latest_version = app_client.audit_config.get(
"java_latest_version", "17"
)
report.status_extended = f"Java version is set to '{f'java{windows_framework_version}' if windows_framework_version else linux_framework}', but should be set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Java version is set to '{f'java{windows_framework_version}' if windows_framework_version else linux_framework}', but should be set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
f"java{java_latest_version}" in linux_framework
or java_latest_version == windows_framework_version
):
report.status = "PASS"
report.status_extended = f"Java version is set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Java version is set to 'java {java_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,9 +7,12 @@ class app_ensure_php_version_is_latest(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
framework = getattr(app.configurations, "linux_fx_version", "")
@@ -17,14 +20,14 @@ class app_ensure_php_version_is_latest(Check):
app.configurations, "php_version", ""
):
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
php_latest_version = app_client.audit_config.get(
"php_latest_version", "8.2"
)
report.status_extended = f"PHP version is set to '{framework}', the latest version that you could use is the '{php_latest_version}' version, for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"PHP version is set to '{framework}', the latest version that you could use is the '{php_latest_version}' version, for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
php_latest_version in framework
@@ -32,7 +35,7 @@ class app_ensure_php_version_is_latest(Check):
== php_latest_version
):
report.status = "PASS"
report.status_extended = f"PHP version is set to '{php_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"PHP version is set to '{php_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,9 +7,12 @@ class app_ensure_python_version_is_latest(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
framework = getattr(app.configurations, "linux_fx_version", "")
@@ -17,12 +20,12 @@ class app_ensure_python_version_is_latest(Check):
app.configurations, "python_version", ""
):
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
python_latest_version = app_client.audit_config.get(
"python_latest_version", "3.12"
)
report.status_extended = f"Python version is '{framework}', the latest version that you could use is the '{python_latest_version}' version, for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Python version is '{framework}', the latest version that you could use is the '{python_latest_version}' version, for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
python_latest_version in framework
@@ -30,7 +33,7 @@ class app_ensure_python_version_is_latest(Check):
== python_latest_version
):
report.status = "PASS"
report.status_extended = f"Python version is set to '{python_latest_version}' for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Python version is set to '{python_latest_version}' for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,20 +7,23 @@ class app_ensure_using_http20(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"HTTP/2.0 is not enabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP/2.0 is not enabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if app.configurations and getattr(
app.configurations, "http20_enabled", False
):
report.status = "PASS"
report.status_extended = f"HTTP/2.0 is enabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"HTTP/2.0 is enabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,21 +7,24 @@ class app_ftp_deployment_disabled(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"FTP is enabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"FTP is enabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if (
app.configurations
and getattr(app.configurations, "ftps_state", "AllAllowed")
!= "AllAllowed"
):
report.status = "PASS"
report.status_extended = f"FTP is disabled for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"FTP is disabled for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,23 +7,24 @@ class app_function_access_keys_configured(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.function_keys is not None:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} does not have function keys configured."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) does not have function keys configured."
if len(function.function_keys) > 0:
report.status = "PASS"
report.status_extended = (
f"Function {function.name} has function keys configured."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has function keys configured."
findings.append(report)
@@ -7,19 +7,20 @@ class app_function_application_insights_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.enviroment_variables is not None:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = (
f"Function {function.name} is not using Application Insights."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is not using Application Insights."
if function.enviroment_variables.get(
"APPINSIGHTS_INSTRUMENTATIONKEY", None
@@ -27,9 +28,7 @@ class app_function_application_insights_enabled(Check):
"APPLICATIONINSIGHTS_CONNECTION_STRING", None
):
report.status = "PASS"
report.status_extended = (
f"Function {function.name} is using Application Insights."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is using Application Insights."
findings.append(report)
@@ -7,19 +7,20 @@ class app_function_ftps_deployment_disabled(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} has {'FTP' if function.ftps_state == 'AllAllowed' else 'FTPS' if function.ftps_state == 'FtpsOnly' else 'FTP or FTPS'} deployment enabled"
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has {'FTP' if function.ftps_state == 'AllAllowed' else 'FTPS' if function.ftps_state == 'FtpsOnly' else 'FTP or FTPS'} deployment enabled."
if function.ftps_state == "Disabled":
report.status = "PASS"
report.status_extended = (
f"Function {function.name} has FTP and FTPS deployment disabled"
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has FTP and FTPS deployment disabled."
findings.append(report)
@@ -7,18 +7,26 @@ class app_function_identity_is_configured(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} does not have a managed identity enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) does not have a managed identity enabled."
if function.identity:
identity_type = (
function.identity.type
if getattr(function.identity, "type", "")
else "managed"
)
report.status = "PASS"
report.status_extended = f"Function {function.name} has a {function.identity.type if getattr(function.identity, 'type', '') else 'managed'} identity enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has a {identity_type} identity enabled."
findings.append(report)
@@ -14,22 +14,25 @@ class app_function_identity_without_admin_privileges(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.identity:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Function {function.name} has a managed identity enabled but without admin privileges."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has a managed identity enabled but without admin privileges."
admin_roles_assigned = []
for role_assignment in iam_client.role_assignments[
subscription_name
subscription_id
].values():
if (
role_assignment.agent_id == function.identity.principal_id
@@ -43,8 +46,8 @@ class app_function_identity_without_admin_privileges(Check):
):
admin_roles_assigned.append(
getattr(
iam_client.roles[subscription_name].get(
f"/subscriptions/{iam_client.subscriptions[subscription_name]}/providers/Microsoft.Authorization/roleDefinitions/{role_assignment.role_id}"
iam_client.roles[subscription_id].get(
f"/subscriptions/{subscription_id}/providers/Microsoft.Authorization/roleDefinitions/{role_assignment.role_id}"
),
"name",
"",
@@ -53,7 +56,7 @@ class app_function_identity_without_admin_privileges(Check):
if admin_roles_assigned:
report.status = "FAIL"
report.status_extended = f"Function {function.name} has a managed identity enabled and it is configure with admin privileges using {'roles: ' + ', '.join(admin_roles_assigned) if len(admin_roles_assigned) > 1 else 'role ' + admin_roles_assigned[0]}."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has a managed identity enabled and it is configure with admin privileges using {'roles: ' + ', '.join(admin_roles_assigned) if len(admin_roles_assigned) > 1 else 'role ' + admin_roles_assigned[0]}."
findings.append(report)
@@ -7,19 +7,20 @@ class app_function_latest_runtime_version(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
if function.enviroment_variables is not None:
report = Check_Report_Azure(
metadata=self.metadata(), resource=function
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = (
f"Function {function.name} is using the latest runtime."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is using the latest runtime."
if (
function.enviroment_variables.get(
@@ -28,7 +29,7 @@ class app_function_latest_runtime_version(Check):
!= "~4"
):
report.status = "FAIL"
report.status_extended = f"Function {function.name} is not using the latest runtime. The current runtime is '{function.enviroment_variables.get('FUNCTIONS_EXTENSION_VERSION', '')}' and should be '~4'."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is not using the latest runtime. The current runtime is '{function.enviroment_variables.get('FUNCTIONS_EXTENSION_VERSION', '')}' and should be '~4'."
findings.append(report)
@@ -7,22 +7,21 @@ class app_function_not_publicly_accessible(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = (
f"Function {function.name} is publicly accessible."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is publicly accessible."
if not function.public_access:
report.status = "PASS"
report.status_extended = (
f"Function {function.name} is not publicly accessible."
)
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) is not publicly accessible."
findings.append(report)
@@ -7,18 +7,21 @@ class app_function_vnet_integration_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
functions,
) in app_client.functions.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for function in functions.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=function)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Function {function.name} does not have virtual network integration enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) does not have virtual network integration enabled."
if function.vnet_subnet_id:
report.status = "PASS"
report.status_extended = f"Function {function.name} has Virtual Network integration enabled with subnet '{function.vnet_subnet_id}' enabled."
report.status_extended = f"Function {function.name} from subscription {subscription_name} ({subscription_id}) has Virtual Network integration enabled with subnet '{function.vnet_subnet_id}' enabled."
findings.append(report)
@@ -6,25 +6,28 @@ class app_http_logs_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, apps in app_client.apps.items():
for subscription_id, apps in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
if "functionapp" not in app.kind:
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
if not app.monitor_diagnostic_settings:
report.status_extended = f"App {app.name} does not have a diagnostic setting in subscription {subscription_name}."
report.status_extended = f"App {app.name} does not have a diagnostic setting in subscription {subscription_name} ({subscription_id})."
else:
for diagnostic_setting in app.monitor_diagnostic_settings:
report.status_extended = f"App {app.name} does not have HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name}"
report.status_extended = f"App {app.name} does not have HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name} ({subscription_id})"
for log in diagnostic_setting.logs:
if log.category == "AppServiceHTTPLogs" and log.enabled:
report.status = "PASS"
report.status_extended = f"App {app.name} has HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name}"
report.status_extended = f"App {app.name} has HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name} ({subscription_id})"
break
elif log.category_group == "allLogs" and log.enabled:
report.status = "PASS"
report.status_extended = f"App {app.name} has allLogs category group which includes HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name}"
report.status_extended = f"App {app.name} has allLogs category group which includes HTTP Logs enabled in diagnostic setting {diagnostic_setting.name} in subscription {subscription_name} ({subscription_id})"
break
findings.append(report)
@@ -7,20 +7,23 @@ class app_minimum_tls_version_12(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Minimum TLS version is not set to 1.2 for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Minimum TLS version is not set to 1.2 for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
if app.configurations and getattr(
app.configurations, "min_tls_version", ""
) in ["1.2", "1.3"]:
report.status = "PASS"
report.status_extended = f"Minimum TLS version is set to {app.configurations.min_tls_version} for app '{app.name}' in subscription '{subscription_name}'."
report.status_extended = f"Minimum TLS version is set to {app.configurations.min_tls_version} for app '{app.name}' in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -7,18 +7,21 @@ class app_register_with_identity(Check):
findings = []
for (
subscription_name,
subscription_id,
apps,
) in app_client.apps.items():
subscription_name = app_client.subscriptions.get(
subscription_id, subscription_id
)
for app in apps.values():
report = Check_Report_Azure(metadata=self.metadata(), resource=app)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"App '{app.name}' in subscription '{subscription_name}' has an identity configured."
report.status_extended = f"App '{app.name}' in subscription '{subscription_name} ({subscription_id})' has an identity configured."
if not app.identity:
report.status = "FAIL"
report.status_extended = f"App '{app.name}' in subscription '{subscription_name}' does not have an identity configured."
report.status_extended = f"App '{app.name}' in subscription '{subscription_name} ({subscription_id})' does not have an identity configured."
findings.append(report)
@@ -20,10 +20,10 @@ class App(AzureService):
logger.info("App - Getting apps...")
apps = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
apps_list = client.web_apps.list()
apps.update({subscription_name: {}})
apps.update({subscription_id: {}})
for app in apps_list:
# Filter function apps
@@ -41,7 +41,7 @@ class App(AzureService):
resource_group_name=app.resource_group, name=app.name
)
apps[subscription_name].update(
apps[subscription_id].update(
{
app.id: WebApp(
resource_id=app.id,
@@ -81,7 +81,7 @@ class App(AzureService):
getattr(app, "client_cert_mode", "Ignore"),
),
monitor_diagnostic_settings=self._get_app_monitor_settings(
app.name, app.resource_group, subscription_name
app.name, app.resource_group, subscription_id
),
https_only=getattr(app, "https_only", False),
identity=ManagedServiceIdentity(
@@ -106,7 +106,7 @@ class App(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return apps
@@ -115,17 +115,17 @@ class App(AzureService):
logger.info("Function - Getting functions...")
functions = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
functions_list = client.web_apps.list()
functions.update({subscription_name: {}})
functions.update({subscription_id: {}})
for function in functions_list:
# Filter function apps
if getattr(function, "kind", "").startswith("functionapp"):
# List host keys
host_keys = self._get_function_host_keys(
subscription_name, function.resource_group, function.name
subscription_id, function.resource_group, function.name
)
if host_keys is not None:
function_keys = getattr(host_keys, "function_keys", {})
@@ -133,16 +133,16 @@ class App(AzureService):
function_keys = None
application_settings = self._list_application_settings(
subscription_name, function.resource_group, function.name
subscription_id, function.resource_group, function.name
)
function_config = self._get_function_config(
subscription_name,
subscription_id,
function.resource_group,
function.name,
)
functions[subscription_name].update(
functions[subscription_id].update(
{
function.id: FunctionApp(
id=function.id,
@@ -175,7 +175,7 @@ class App(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return functions
@@ -200,13 +200,13 @@ class App(AzureService):
monitor_diagnostics_settings = []
try:
monitor_diagnostics_settings = monitor_client.diagnostic_settings_with_uri(
self.subscriptions[subscription],
f"subscriptions/{self.subscriptions[subscription]}/resourceGroups/{resource_group}/providers/Microsoft.Web/sites/{app_name}",
subscription,
f"subscriptions/{subscription}/resourceGroups/{resource_group}/providers/Microsoft.Web/sites/{app_name}",
monitor_client.clients[subscription],
)
except Exception as error:
logger.error(
f"Subscription name: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return monitor_diagnostics_settings
@@ -8,19 +8,20 @@ class appinsights_ensure_is_configured(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription_name, components in appinsights_client.components.items():
for subscription_id, components in appinsights_client.components.items():
subscription_name = appinsights_client.subscriptions.get(
subscription_id, subscription_id
)
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.status = "PASS"
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{appinsights_client.subscriptions[subscription_name]}"
)
report.status_extended = f"There is at least one AppInsight configured in subscription {subscription_name}."
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status_extended = f"There is at least one AppInsight configured in subscription {subscription_name} ({subscription_id})."
if len(components) < 1:
report.status = "FAIL"
report.status_extended = f"There are no AppInsight configured in subscription {subscription_name}."
report.status_extended = f"There are no AppInsight configured in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -15,13 +15,13 @@ class AppInsights(AzureService):
logger.info("AppInsights - Getting components...")
components = {}
for subscription_name, client in self.clients.items():
for subscription_id, client in self.clients.items():
try:
components_list = client.components.list()
components.update({subscription_name: {}})
components.update({subscription_id: {}})
for component in components_list:
components[subscription_name].update(
components[subscription_id].update(
{
component.app_id: Component(
resource_id=component.id,
@@ -35,7 +35,7 @@ class AppInsights(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription_id} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return components
@@ -9,17 +9,20 @@ class containerregistry_admin_user_disabled(Check):
findings = []
for subscription, registries in containerregistry_client.registries.items():
subscription_name = containerregistry_client.subscriptions.get(
subscription, subscription
)
for container_registry_info in registries.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=container_registry_info
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} has its admin user enabled."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) has its admin user enabled."
if not container_registry_info.admin_user_enabled:
report.status = "PASS"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} has its admin user disabled."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) has its admin user disabled."
findings.append(report)
@@ -9,17 +9,20 @@ class containerregistry_not_publicly_accessible(Check):
findings = []
for subscription, registries in containerregistry_client.registries.items():
subscription_name = containerregistry_client.subscriptions.get(
subscription, subscription
)
for container_registry_info in registries.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=container_registry_info
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} allows unrestricted network access."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) allows unrestricted network access."
if not container_registry_info.public_network_access:
report.status = "PASS"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} does not allow unrestricted network access."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) does not allow unrestricted network access."
findings.append(report)
@@ -64,7 +64,7 @@ class ContainerRegistry(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return registries
@@ -81,13 +81,13 @@ class ContainerRegistry(AzureService):
monitor_diagnostics_settings = []
try:
monitor_diagnostics_settings = monitor_client.diagnostic_settings_with_uri(
self.subscriptions[subscription],
f"subscriptions/{self.subscriptions[subscription]}/resourceGroups/{resource_group}/providers/Microsoft.ContainerRegistry/registries/{registry_name}",
subscription,
f"subscriptions/{subscription}/resourceGroups/{resource_group}/providers/Microsoft.ContainerRegistry/registries/{registry_name}",
monitor_client.clients[subscription],
)
except Exception as error:
logger.error(
f"Subscription name: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return monitor_diagnostics_settings
@@ -9,17 +9,20 @@ class containerregistry_uses_private_link(Check):
findings = []
for subscription, registries in containerregistry_client.registries.items():
subscription_name = containerregistry_client.subscriptions.get(
subscription, subscription
)
for container_registry_info in registries.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=container_registry_info
)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} does not use a private link."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) does not use a private link."
if container_registry_info.private_endpoint_connections:
report.status = "PASS"
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} uses a private link."
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription_name} ({subscription}) uses a private link."
findings.append(report)
@@ -6,14 +6,17 @@ class cosmosdb_account_firewall_use_selected_networks(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
subscription_name = cosmosdb_client.subscriptions.get(
subscription, subscription
)
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has firewall rules that allow access from all networks."
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) has firewall rules that allow access from all networks."
if account.is_virtual_network_filter_enabled:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} has firewall rules that allow access only from selected networks."
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) has firewall rules that allow access only from selected networks."
findings.append(report)
return findings
@@ -6,14 +6,17 @@ class cosmosdb_account_use_aad_and_rbac(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
subscription_name = cosmosdb_client.subscriptions.get(
subscription, subscription
)
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is not using AAD and RBAC"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is not using AAD and RBAC"
if account.disable_local_auth:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is using AAD and RBAC"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is using AAD and RBAC"
findings.append(report)
return findings
@@ -6,14 +6,17 @@ class cosmosdb_account_use_private_endpoints(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, accounts in cosmosdb_client.accounts.items():
subscription_name = cosmosdb_client.subscriptions.get(
subscription, subscription
)
for account in accounts:
report = Check_Report_Azure(metadata=self.metadata(), resource=account)
report.subscription = subscription
report.status = "FAIL"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is not using private endpoints connections"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is not using private endpoints connections"
if account.private_endpoint_connections:
report.status = "PASS"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription} is using private endpoints connections"
report.status_extended = f"CosmosDB account {account.name} from subscription {subscription_name} ({subscription}) is using private endpoints connections"
findings.append(report)
return findings
@@ -48,7 +48,7 @@ class CosmosDB(AzureService):
)
except Exception as error:
logger.error(
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"Subscription ID: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return accounts
@@ -17,6 +17,9 @@ class databricks_workspace_cmk_encryption_enabled(Check):
def execute(self):
findings = []
for subscription, workspaces in databricks_client.workspaces.items():
subscription_name = databricks_client.subscriptions.get(
subscription, subscription
)
for workspace in workspaces.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=workspace
@@ -25,9 +28,9 @@ class databricks_workspace_cmk_encryption_enabled(Check):
enc = workspace.managed_disk_encryption
if enc:
report.status = "PASS"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} has customer-managed key (CMK) encryption enabled with key {enc.key_vault_uri}/{enc.key_name}/{enc.key_version}."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) has customer-managed key (CMK) encryption enabled with key {enc.key_vault_uri}/{enc.key_name}/{enc.key_version}."
else:
report.status = "FAIL"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} does not have customer-managed key (CMK) encryption enabled."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) does not have customer-managed key (CMK) encryption enabled."
findings.append(report)
return findings
@@ -17,6 +17,9 @@ class databricks_workspace_vnet_injection_enabled(Check):
def execute(self):
findings = []
for subscription, workspaces in databricks_client.workspaces.items():
subscription_name = databricks_client.subscriptions.get(
subscription, subscription
)
for workspace in workspaces.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=workspace
@@ -24,9 +27,9 @@ class databricks_workspace_vnet_injection_enabled(Check):
report.subscription = subscription
if workspace.custom_managed_vnet_id:
report.status = "PASS"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} is deployed in a customer-managed VNet ({workspace.custom_managed_vnet_id})."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) is deployed in a customer-managed VNet ({workspace.custom_managed_vnet_id})."
else:
report.status = "FAIL"
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription} is not deployed in a customer-managed VNet (VNet Injection is not enabled)."
report.status_extended = f"Databricks workspace {workspace.name} in subscription {subscription_name} ({subscription}) is not deployed in a customer-managed VNet (VNet Injection is not enabled)."
findings.append(report)
return findings
@@ -7,9 +7,12 @@ class defender_additional_email_configured_with_a_security_contact(Check):
findings = []
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=contact_configuration
@@ -19,14 +22,14 @@ class defender_additional_email_configured_with_a_security_contact(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
if len(contact_configuration.emails) > 0:
report.status = "PASS"
report.status_extended = f"There is another correct email configured for subscription {subscription_name}."
report.status_extended = f"There is another correct email configured for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"There is not another correct email configured for subscription {subscription_name}."
report.status_extended = f"There is not another correct email configured for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Install endpoint protection solution on virtual machines"
in assessments
@@ -20,9 +23,9 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
"Install endpoint protection solution on virtual machines"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Endpoint protection is set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Endpoint protection is set up in all VMs in subscription {subscription_name} ({subscription_id})."
if (
assessments[
@@ -31,7 +34,7 @@ class defender_assessments_vm_endpoint_protection_installed(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Endpoint protection is not set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Endpoint protection is not set up in all VMs in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -24,9 +24,12 @@ class defender_attack_path_notifications_properly_configured(Check):
min_risk_index = risk_levels.index(min_risk_level)
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=contact_configuration
@@ -36,21 +39,21 @@ class defender_attack_path_notifications_properly_configured(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
actual_risk_level = getattr(
contact_configuration, "attack_path_minimal_risk_level", None
)
if not actual_risk_level or actual_risk_level not in risk_levels:
report.status = "FAIL"
report.status_extended = f"Attack path notifications are not enabled in subscription {subscription_name} for security contact {contact_configuration.name}."
report.status_extended = f"Attack path notifications are not enabled in subscription {subscription_name} ({subscription_id}) for security contact {contact_configuration.name}."
else:
actual_risk_index = risk_levels.index(actual_risk_level)
if actual_risk_index <= min_risk_index:
report.status = "PASS"
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} for security contact {contact_configuration.name}."
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} ({subscription_id}) for security contact {contact_configuration.name}."
else:
report.status = "FAIL"
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} for security contact {contact_configuration.name}."
report.status_extended = f"Attack path notifications are enabled with minimal risk level {actual_risk_level} in subscription {subscription_name} ({subscription_id}) for security contact {contact_configuration.name}."
findings.append(report)
return findings
@@ -7,21 +7,24 @@ class defender_auto_provisioning_log_analytics_agent_vms_on(Check):
findings = []
for (
subscription_name,
subscription_id,
auto_provisioning_settings,
) in defender_client.auto_provisioning_settings.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for auto_provisioning_setting in auto_provisioning_settings.values():
report = Check_Report_Azure(
metadata=self.metadata(),
resource=auto_provisioning_setting,
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} is set to ON."
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} ({subscription_id}) is set to ON."
if auto_provisioning_setting.auto_provision != "On":
report.status = "FAIL"
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} is set to OFF."
report.status_extended = f"Defender Auto Provisioning Log Analytics Agents from subscription {subscription_name} ({subscription_id}) is set to OFF."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Machines should have a vulnerability assessment solution"
in assessments
@@ -20,9 +23,9 @@ class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
"Machines should have a vulnerability assessment solution"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Vulnerability assessment is set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Vulnerability assessment is set up in all VMs in subscription {subscription_name} ({subscription_id})."
if (
assessments[
@@ -31,7 +34,7 @@ class defender_auto_provisioning_vulnerabilty_assessments_machines_on(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Vulnerability assessment is not set up in all VMs in subscription {subscription_name}."
report.status_extended = f"Vulnerability assessment is not set up in all VMs in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_container_images_resolved_vulnerabilities(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
in assessments
@@ -28,9 +31,9 @@ class defender_container_images_resolved_vulnerabilities(Check):
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"Azure running container images do not have unresolved vulnerabilities in subscription '{subscription_name}'."
report.status_extended = f"Azure running container images do not have unresolved vulnerabilities in subscription '{subscription_name} ({subscription_id})'."
if (
assessments[
"Azure running container images should have vulnerabilities resolved (powered by Microsoft Defender Vulnerability Management)"
@@ -38,7 +41,7 @@ class defender_container_images_resolved_vulnerabilities(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"Azure running container images have unresolved vulnerabilities in subscription '{subscription_name}'."
report.status_extended = f"Azure running container images have unresolved vulnerabilities in subscription '{subscription_name} ({subscription_id})'."
findings.append(report)
@@ -6,20 +6,21 @@ class defender_container_images_scan_enabled(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Containers" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Containers"]
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = (
f"Container image scan is enabled in subscription {subscription}."
)
report.status_extended = f"Container image scan is enabled in subscription {subscription_name} ({subscription})."
if not pricings["Containers"].extensions.get(
"ContainerRegistriesVulnerabilityAssessments"
):
report.status = "FAIL"
report.status_extended = f"Container image scan is disabled in subscription {subscription}."
report.status_extended = f"Container image scan is disabled in subscription {subscription_name} ({subscription})."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_app_services_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "AppServices" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["AppServices"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_app_services_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan App Services"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["AppServices"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for App Services from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_arm_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Arm" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Arm"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_arm_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan ARM"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["Arm"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for ARM from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class defender_ensure_defender_for_azure_sql_databases_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "SqlServers" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["SqlServers"]
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["SqlServers"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Azure SQL DB Servers from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,16 +6,19 @@ class defender_ensure_defender_for_containers_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Containers" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Containers"]
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["Containers"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Containers from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_cosmosdb_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "CosmosDbs" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["CosmosDbs"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_cosmosdb_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Cosmos DB"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["CosmosDbs"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Cosmos DB from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_databases_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if (
"SqlServers" in pricings
and "SqlServerVirtualMachines" in pricings
@@ -17,7 +20,7 @@ class defender_ensure_defender_for_databases_is_on(Check):
)
report.subscription = subscription
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if (
pricings["SqlServers"].pricing_tier != "Standard"
or pricings["SqlServerVirtualMachines"].pricing_tier != "Standard"
@@ -26,7 +29,7 @@ class defender_ensure_defender_for_databases_is_on(Check):
or pricings["CosmosDbs"].pricing_tier != "Standard"
):
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Databases from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_dns_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "Dns" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["Dns"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_dns_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan DNS"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["Dns"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for DNS from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_keyvault_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "KeyVaults" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(), resource=pricings["KeyVaults"]
@@ -13,10 +16,10 @@ class defender_ensure_defender_for_keyvault_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan KeyVaults"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["KeyVaults"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for KeyVaults from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_os_relational_databases_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "OpenSourceRelationalDatabases" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_os_relational_databases_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Open-Source Relational Databases"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["OpenSourceRelationalDatabases"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Open-Source Relational Databases from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_server_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "VirtualMachines" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_server_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Servers"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["VirtualMachines"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Servers from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_sql_servers_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "SqlServerVirtualMachines" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_sql_servers_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan SQL Server VMs"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["SqlServerVirtualMachines"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for SQL Server VMs from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -6,6 +6,9 @@ class defender_ensure_defender_for_storage_is_on(Check):
def execute(self) -> Check_Report_Azure:
findings = []
for subscription, pricings in defender_client.pricings.items():
subscription_name = defender_client.subscriptions.get(
subscription, subscription
)
if "StorageAccounts" in pricings:
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -14,10 +17,10 @@ class defender_ensure_defender_for_storage_is_on(Check):
report.subscription = subscription
report.resource_name = "Defender plan Storage Accounts"
report.status = "PASS"
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription} is set to ON (pricing tier standard)."
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription_name} ({subscription}) is set to ON (pricing tier standard)."
if pricings["StorageAccounts"].pricing_tier != "Standard":
report.status = "FAIL"
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription} is set to OFF (pricing tier not standard)."
report.status_extended = f"Defender plan Defender for Storage Accounts from subscription {subscription_name} ({subscription}) is set to OFF (pricing tier not standard)."
findings.append(report)
return findings
@@ -7,18 +7,19 @@ class defender_ensure_iot_hub_defender_is_on(Check):
findings = []
for (
subscription_name,
subscription_id,
iot_security_solutions,
) in defender_client.iot_security_solutions.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if not iot_security_solutions:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.status = "FAIL"
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{defender_client.subscriptions[subscription_name]}"
)
report.status_extended = f"No IoT Security Solutions found in the subscription {subscription_name}."
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status_extended = f"No IoT Security Solutions found in the subscription {subscription_name} ({subscription_id})."
findings.append(report)
else:
for iot_security_solution in iot_security_solutions.values():
@@ -26,13 +27,13 @@ class defender_ensure_iot_hub_defender_is_on(Check):
metadata=self.metadata(),
resource=iot_security_solution,
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"The security solution {iot_security_solution.name} is enabled in subscription {subscription_name}."
report.status_extended = f"The security solution {iot_security_solution.name} is enabled in subscription {subscription_name} ({subscription_id})."
if iot_security_solution.status != "Enabled":
report.status = "FAIL"
report.status_extended = f"The security solution {iot_security_solution.name} is disabled in subscription {subscription_name}"
report.status_extended = f"The security solution {iot_security_solution.name} is disabled in subscription {subscription_name} ({subscription_id})"
findings.append(report)
@@ -7,29 +7,30 @@ class defender_ensure_mcas_is_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
settings,
) in defender_client.settings.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if "MCAS" not in settings:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{defender_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Cloud Apps not exists for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Cloud Apps not exists for subscription {subscription_name} ({subscription_id})."
else:
report = Check_Report_Azure(
metadata=self.metadata(), resource=settings["MCAS"]
)
report.subscription = subscription_name
report.subscription = subscription_id
if settings["MCAS"].enabled:
report.status = "PASS"
report.status_extended = f"Microsoft Defender for Cloud Apps is enabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Cloud Apps is enabled for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Cloud Apps is disabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Cloud Apps is disabled for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_ensure_notify_alerts_severity_is_high(Check):
findings = []
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(), resource=contact_configuration
@@ -19,16 +22,16 @@ class defender_ensure_notify_alerts_severity_is_high(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "FAIL"
report.status_extended = f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {subscription_name}."
report.status_extended = f"Notifications are not enabled for alerts with a minimum severity of high or lower in subscription {subscription_name} ({subscription_id})."
if (
contact_configuration.alert_minimal_severity
and contact_configuration.alert_minimal_severity != "Critical"
):
report.status = "PASS"
report.status_extended = f"Notifications are enabled for alerts with a minimum severity of high or lower ({contact_configuration.alert_minimal_severity}) in subscription {subscription_name}."
report.status_extended = f"Notifications are enabled for alerts with a minimum severity of high or lower ({contact_configuration.alert_minimal_severity}) in subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_ensure_notify_emails_to_owners(Check):
findings = []
for (
subscription_name,
subscription_id,
security_contact_configurations,
) in defender_client.security_contact_configurations.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
for contact_configuration in security_contact_configurations.values():
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -20,16 +23,16 @@ class defender_ensure_notify_emails_to_owners(Check):
if contact_configuration.name
else "Security Contact"
)
report.subscription = subscription_name
report.subscription = subscription_id
if (
contact_configuration.notifications_by_role.state
and "Owner" in contact_configuration.notifications_by_role.roles
):
report.status = "PASS"
report.status_extended = f"The Owner role is notified for subscription {subscription_name}."
report.status_extended = f"The Owner role is notified for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"The Owner role is not notified for subscription {subscription_name}."
report.status_extended = f"The Owner role is not notified for subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,9 +7,12 @@ class defender_ensure_system_updates_are_applied(Check):
findings = []
for (
subscription_name,
subscription_id,
assessments,
) in defender_client.assessments.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if (
"Log Analytics agent should be installed on virtual machines"
in assessments
@@ -23,9 +26,9 @@ class defender_ensure_system_updates_are_applied(Check):
"System updates should be installed on your machines"
],
)
report.subscription = subscription_name
report.subscription = subscription_id
report.status = "PASS"
report.status_extended = f"System updates are applied for all the VMs in the subscription {subscription_name}."
report.status_extended = f"System updates are applied for all the VMs in the subscription {subscription_name} ({subscription_id})."
if (
assessments[
@@ -42,7 +45,7 @@ class defender_ensure_system_updates_are_applied(Check):
== "Unhealthy"
):
report.status = "FAIL"
report.status_extended = f"System updates are not applied for all the VMs in the subscription {subscription_name}."
report.status_extended = f"System updates are not applied for all the VMs in the subscription {subscription_name} ({subscription_id})."
findings.append(report)
@@ -7,29 +7,30 @@ class defender_ensure_wdatp_is_enabled(Check):
findings = []
for (
subscription_name,
subscription_id,
settings,
) in defender_client.settings.items():
subscription_name = defender_client.subscriptions.get(
subscription_id, subscription_id
)
if "WDATP" not in settings:
report = Check_Report_Azure(metadata=self.metadata(), resource={})
report.subscription = subscription_name
report.resource_name = subscription_name
report.resource_id = (
f"/subscriptions/{defender_client.subscriptions[subscription_name]}"
)
report.subscription = subscription_id
report.resource_name = subscription_id
report.resource_id = f"/subscriptions/{subscription_id}"
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Endpoint integration not exists for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Endpoint integration not exists for subscription {subscription_name} ({subscription_id})."
else:
report = Check_Report_Azure(
metadata=self.metadata(), resource=settings["WDATP"]
)
report.subscription = subscription_name
report.subscription = subscription_id
if settings["WDATP"].enabled:
report.status = "PASS"
report.status_extended = f"Microsoft Defender for Endpoint integration is enabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Endpoint integration is enabled for subscription {subscription_name} ({subscription_id})."
else:
report.status = "FAIL"
report.status_extended = f"Microsoft Defender for Endpoint integration is disabled for subscription {subscription_name}."
report.status_extended = f"Microsoft Defender for Endpoint integration is disabled for subscription {subscription_name} ({subscription_id})."
findings.append(report)

Some files were not shown because too many files have changed in this diff Show More