Compare commits

...

35 Commits

Author SHA1 Message Date
Adrián Jesús Peña Rodríguez
221310ba94 fix: update unittest 2025-04-22 17:27:53 +02:00
Adrián Jesús Peña Rodríguez
33135f6c9a chore: update lock files 2025-04-22 17:05:03 +02:00
Adrián Jesús Peña Rodríguez
4aaacc9e16 feat: add bash to debian 2025-04-22 16:12:22 +02:00
Adrián Jesús Peña Rodríguez
165d09768f feat: change api base image to debian 2025-04-22 16:12:22 +02:00
Adrián Jesús Peña Rodríguez
6b8af51c23 fix: update base image 2025-04-22 16:12:08 +02:00
Adrián Jesús Peña Rodríguez
e40591a25f chore: change name 2025-04-22 16:12:08 +02:00
Adrián Jesús Peña Rodríguez
d6ac87341b chore: update changelog and spec 2025-04-22 16:12:08 +02:00
Adrián Jesús Peña Rodríguez
d431af6186 feat: change to domain_id 2025-04-22 16:12:08 +02:00
Adrián Jesús Peña Rodríguez
7b3b728207 feat: add m365 to API 2025-04-22 16:11:47 +02:00
César Arroba
cfab4dcef5 chore: pass labels on PR merge trigger (#7558) 2025-04-22 16:11:47 +02:00
César Arroba
cd48037670 chore: revert pass labels (#7556) 2025-04-22 16:11:47 +02:00
César Arroba
ab7e352029 chore: pass labels as json is required (#7555) 2025-04-22 16:11:47 +02:00
César Arroba
79ce899812 chore: fix merged PR action, incorrect order on payload (#7554) 2025-04-22 16:11:47 +02:00
César Arroba
29d4159b8c chore: pass labels (#7553) 2025-04-22 16:11:47 +02:00
César Arroba
aaf7188365 chore: fix json body (#7552) 2025-04-22 16:11:47 +02:00
César Arroba
6cbba3c5b1 chore: fix trigger (#7551) 2025-04-22 16:11:47 +02:00
César Arroba
9510b4c3ef chore(gha): trigger cloud pull-request when a PR is merged (#7212) 2025-04-22 16:11:47 +02:00
Felix Dreissig
c7754c4331 fix(aws): remove SHA-1 from ACM insecure key algorithms (#7547) 2025-04-22 16:11:47 +02:00
Daniel Barranquero
7800320189 feat(defender): add new check defender_antiphishing_policy_configured (#7453) 2025-04-22 16:11:47 +02:00
Daniel Barranquero
972c88e421 feat(defender): add new check defender_malware_policy_notifications_internal_users_malware_enabled (#7435)
Co-authored-by: HugoPBrito <hugopbrit@gmail.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-04-22 16:11:47 +02:00
Daniel Barranquero
3ee420769a feat(defender): add service and new check defender_malware_policy_common_attachments_filter_enabled (#7425)
Co-authored-by: HugoPBrito <hugopbrit@gmail.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-04-22 16:11:47 +02:00
Daniel Barranquero
f8d9873655 feat(exchange): add new check exchange_mailbox_audit_bypass_disabled (#7418)
Co-authored-by: HugoPBrito <hugopbrit@gmail.com>
Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
2025-04-22 16:11:47 +02:00
Daniel Barranquero
3f5508847a feat(exchange): add service and new check exchange_organization_mailbox_auditing_enabled (#7408)
Co-authored-by: HugoPBrito <hugopbrit@gmail.com>
Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
2025-04-22 16:11:47 +02:00
Hugo Pereira Brito
ed54b39211 feat(teams): add new check teams_email_sending_to_channel_disabled (#7533)
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-04-22 16:11:47 +02:00
HugoPBrito
a6dd97b6bc feat: pwsh setup and print credentials comprobation 2025-04-21 17:54:12 +02:00
HugoPBrito
b6e1551397 Revert "feat: remove provider_id and adapt logic"
This reverts commit 3d9d118d2c.
2025-04-21 17:49:30 +02:00
HugoPBrito
c8567b5d2b Revert "fix: restore 2 tests"
This reverts commit 361269370a.
2025-04-21 17:49:12 +02:00
HugoPBrito
361269370a fix: restore 2 tests 2025-04-21 15:02:53 +02:00
HugoPBrito
3d9d118d2c feat: remove provider_id and adapt logic 2025-04-21 14:58:13 +02:00
HugoPBrito
b50a91d3e6 feat: enhance json parse logging 2025-04-21 13:23:37 +02:00
HugoPBrito
0843c49475 feat: add tests 2025-04-21 12:52:32 +02:00
HugoPBrito
3fe65b7a03 fix: missing provider_id 2025-04-21 11:40:53 +02:00
HugoPBrito
68369ca54f chore: remove old implementation 2025-04-21 11:31:18 +02:00
HugoPBrito
63f4b4f7d4 feat: ensure pwsh user belongs to tenant 2025-04-16 19:10:57 +02:00
HugoPBrito
754a0748da fix(m365): test_connection 2025-04-16 16:38:15 +02:00
66 changed files with 4992 additions and 2102 deletions

View File

@@ -0,0 +1,34 @@
name: Prowler - Merged Pull Request
on:
pull_request_target:
branches: ['master']
types: ['closed']
jobs:
trigger-cloud-pull-request:
name: Trigger Cloud Pull Request
if: github.event.pull_request.merged == true && github.repository == 'prowler-cloud/prowler'
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set short git commit SHA
id: vars
run: |
shortSha=$(git rev-parse --short ${{ github.sha }})
echo "SHORT_SHA=${shortSha}" >> $GITHUB_ENV
- name: Trigger pull request
uses: peter-evans/repository-dispatch@ff45666b9427631e3450c54a1bcbee4d9ff4d7c0 # v3.0.0
with:
token: ${{ secrets.PROWLER_BOT_ACCESS_TOKEN }}
repository: ${{ secrets.CLOUD_DISPATCH }}
event-type: prowler-pull-request-merged
client-payload: '{
"PROWLER_COMMIT_SHA": "${{ github.sha }}",
"PROWLER_COMMIT_SHORT_SHA": "${{ env.SHORT_SHA }}",
"PROWLER_PR_TITLE": "${{ github.event.pull_request.title }}",
"PROWLER_PR_LABELS": ${{ toJson(github.event.pull_request.labels.*.name) }},
"PROWLER_PR_BODY": ${{ toJson(github.event.pull_request.body) }}
}'

View File

@@ -21,6 +21,7 @@ on:
paths-ignore:
- 'ui/**'
- 'api/**'
- '.github/**'
pull_request:
branches:
- "master"
@@ -30,6 +31,7 @@ on:
paths-ignore:
- 'ui/**'
- 'api/**'
- '.github/**'
schedule:
- cron: '00 12 * * *'

View File

@@ -3,6 +3,14 @@
All notable changes to the **Prowler API** are documented in this file.
## [v1.7.0] (UNRELEASED)
### Added
- Added M365 as a new provider [(#7563)](https://github.com/prowler-cloud/prowler/pull/7563).
---
## [v1.6.0] (Prowler v5.5.0)
### Added

View File

@@ -1,13 +1,34 @@
FROM python:3.12.8-alpine3.20 AS build
FROM python:3.12-slim-bookworm AS build
LABEL maintainer="https://github.com/prowler-cloud/api"
# hadolint ignore=DL3018
RUN apk --no-cache add gcc python3-dev musl-dev linux-headers curl-dev
# hadolint ignore=DL3008
RUN apt-get update && apt-get install -y --no-install-recommends \
gcc python3-dev bash \
libcurl4-openssl-dev ca-certificates less ncurses-base \
krb5-multidev libgcc-s1 gettext libssl3 \
libstdc++6 tzdata liburcu8 zlib1g libicu72 curl openssh-client \
&& rm -rf /var/lib/apt/lists/*
# Install PowerShell
RUN ARCH=$(uname -m) && \
if [ "$ARCH" = "x86_64" ]; then \
curl -L https://github.com/PowerShell/PowerShell/releases/download/v7.5.0/powershell-7.5.0-linux-x64.tar.gz -o /tmp/powershell.tar.gz ; \
elif [ "$ARCH" = "aarch64" ]; then \
curl -L https://github.com/PowerShell/PowerShell/releases/download/v7.5.0/powershell-7.5.0-linux-arm64.tar.gz -o /tmp/powershell.tar.gz ; \
else \
echo "Unsupported architecture: $ARCH" && exit 1 ; \
fi && \
mkdir -p /opt/microsoft/powershell/7 && \
tar zxf /tmp/powershell.tar.gz -C /opt/microsoft/powershell/7 && \
chmod +x /opt/microsoft/powershell/7/pwsh && \
ln -s /opt/microsoft/powershell/7/pwsh /usr/bin/pwsh && \
rm /tmp/powershell.tar.gz
# Add prowler user
RUN addgroup --gid 1000 prowler && \
adduser --uid 1000 --gid 1000 --disabled-password --gecos "" prowler
RUN apk --no-cache upgrade && \
addgroup -g 1000 prowler && \
adduser -D -u 1000 -G prowler prowler
USER prowler
WORKDIR /home/prowler
@@ -17,7 +38,7 @@ COPY pyproject.toml ./
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir poetry
COPY src/backend/ ./backend/
COPY src/backend/ ./backend/
ENV PATH="/home/prowler/.local/bin:$PATH"
@@ -30,12 +51,13 @@ COPY docker-entrypoint.sh ./docker-entrypoint.sh
WORKDIR /home/prowler/backend
# Development image
# hadolint ignore=DL3006
FROM build AS dev
USER 0
# hadolint ignore=DL3018
RUN apk --no-cache add curl vim
USER root
# hadolint ignore=DL3008
RUN apt-get update && apt-get install -y --no-install-recommends \
curl vim \
&& rm -rf /var/lib/apt/lists/*
USER prowler

1877
api/poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -46,6 +46,7 @@ coverage = "7.5.4"
django-silk = "5.3.2"
docker = "7.1.0"
freezegun = "1.5.1"
marshmallow = ">=3.15.0,<4.0.0"
mypy = "1.10.1"
pylint = "3.2.5"
pytest = "8.2.2"

View File

@@ -0,0 +1,28 @@
# Generated by Django 5.1.7 on 2025-04-16 08:47
from django.db import migrations
import api.db_utils
class Migration(migrations.Migration):
dependencies = [
("api", "0016_finding_compliance_resource_details_and_more"),
]
operations = [
migrations.AlterField(
model_name="provider",
name="provider",
field=api.db_utils.ProviderEnumField(
choices=[
("aws", "AWS"),
("azure", "Azure"),
("gcp", "GCP"),
("kubernetes", "Kubernetes"),
("m365", "M365"),
],
default="aws",
),
),
]

View File

@@ -191,6 +191,7 @@ class Provider(RowLevelSecurityProtectedModel):
AZURE = "azure", _("Azure")
GCP = "gcp", _("GCP")
KUBERNETES = "kubernetes", _("Kubernetes")
M365 = "m365", _("M365")
@staticmethod
def validate_aws_uid(value):
@@ -214,6 +215,15 @@ class Provider(RowLevelSecurityProtectedModel):
pointer="/data/attributes/uid",
)
@staticmethod
def validate_m365_uid(value):
if not re.match(r"^[a-zA-Z0-9-]+\.onmicrosoft\.com$", value):
raise ModelValidationError(
detail="M365 tenant ID must be a valid domain.",
code="m365-uid",
pointer="/data/attributes/uid",
)
@staticmethod
def validate_gcp_uid(value):
if not re.match(r"^[a-z][a-z0-9-]{5,29}$", value):

View File

@@ -83,11 +83,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -99,6 +101,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -106,6 +109,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -450,11 +454,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -466,6 +472,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -473,6 +480,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -962,11 +970,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -978,6 +988,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -985,6 +996,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -1395,11 +1407,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -1411,6 +1425,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -1418,6 +1433,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -2047,11 +2063,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -2063,6 +2081,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -2070,6 +2089,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -2204,11 +2224,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -2220,6 +2242,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -2227,6 +2250,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -2377,11 +2401,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -2393,6 +2419,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -2400,6 +2427,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -2863,11 +2891,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider__in]
schema:
@@ -3441,11 +3471,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -3457,6 +3489,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -3464,6 +3497,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -4167,11 +4201,13 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
- in: query
name: filter[provider_type__in]
schema:
@@ -4183,6 +4219,7 @@ paths:
- azure
- gcp
- kubernetes
- m365
description: |-
Multiple values may be separated by commas.
@@ -4190,6 +4227,7 @@ paths:
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
explode: false
style: form
- in: query
@@ -8347,6 +8385,33 @@ components:
- client_id
- client_secret
- tenant_id
- type: object
title: M365 Static Credentials
properties:
client_id:
type: string
description: The Azure application (client) ID for authentication
in Azure AD.
client_secret:
type: string
description: The client secret associated with the application
(client) ID, providing secure access.
tenant_id:
type: string
description: The Azure tenant ID, representing the directory
where the application is registered.
user:
type: email
description: User microsoft email address.
encrypted_password:
type: string
description: User encrypted password.
required:
- client_id
- client_secret
- tenant_id
- user
- encrypted_password
- type: object
title: GCP Static Credentials
properties:
@@ -8814,12 +8879,14 @@ components:
- azure
- gcp
- kubernetes
- m365
type: string
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
uid:
type: string
title: Unique identifier for the provider, set by the provider
@@ -8926,12 +8993,14 @@ components:
- azure
- gcp
- kubernetes
- m365
type: string
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
uid:
type: string
title: Unique identifier for the provider, set by the provider
@@ -8969,12 +9038,14 @@ components:
- azure
- gcp
- kubernetes
- m365
type: string
description: |-
* `aws` - AWS
* `azure` - Azure
* `gcp` - GCP
* `kubernetes` - Kubernetes
* `m365` - M365
uid:
type: string
minLength: 3
@@ -9559,6 +9630,33 @@ components:
- client_id
- client_secret
- tenant_id
- type: object
title: M365 Static Credentials
properties:
client_id:
type: string
description: The Azure application (client) ID for authentication
in Azure AD.
client_secret:
type: string
description: The client secret associated with the application
(client) ID, providing secure access.
tenant_id:
type: string
description: The Azure tenant ID, representing the directory where
the application is registered.
user:
type: email
description: User microsoft email address.
encrypted_password:
type: string
description: User encrypted password.
required:
- client_id
- client_secret
- tenant_id
- user
- encrypted_password
- type: object
title: GCP Static Credentials
properties:
@@ -9741,6 +9839,33 @@ components:
- client_id
- client_secret
- tenant_id
- type: object
title: M365 Static Credentials
properties:
client_id:
type: string
description: The Azure application (client) ID for authentication
in Azure AD.
client_secret:
type: string
description: The client secret associated with the application
(client) ID, providing secure access.
tenant_id:
type: string
description: The Azure tenant ID, representing the directory
where the application is registered.
user:
type: email
description: User microsoft email address.
encrypted_password:
type: string
description: User encrypted password.
required:
- client_id
- client_secret
- tenant_id
- user
- encrypted_password
- type: object
title: GCP Static Credentials
properties:
@@ -9939,6 +10064,33 @@ components:
- client_id
- client_secret
- tenant_id
- type: object
title: M365 Static Credentials
properties:
client_id:
type: string
description: The Azure application (client) ID for authentication
in Azure AD.
client_secret:
type: string
description: The client secret associated with the application
(client) ID, providing secure access.
tenant_id:
type: string
description: The Azure tenant ID, representing the directory where
the application is registered.
user:
type: email
description: User microsoft email address.
encrypted_password:
type: string
description: User encrypted password.
required:
- client_id
- client_secret
- tenant_id
- user
- encrypted_password
- type: object
title: GCP Static Credentials
properties:

View File

@@ -19,6 +19,7 @@ from prowler.providers.aws.aws_provider import AwsProvider
from prowler.providers.azure.azure_provider import AzureProvider
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
class TestMergeDicts:
@@ -104,6 +105,7 @@ class TestReturnProwlerProvider:
(Provider.ProviderChoices.GCP.value, GcpProvider),
(Provider.ProviderChoices.AZURE.value, AzureProvider),
(Provider.ProviderChoices.KUBERNETES.value, KubernetesProvider),
(Provider.ProviderChoices.M365.value, M365Provider),
],
)
def test_return_prowler_provider(self, provider_type, expected_provider):
@@ -176,6 +178,10 @@ class TestGetProwlerProviderKwargs:
Provider.ProviderChoices.KUBERNETES.value,
{"context": "provider_uid"},
),
(
Provider.ProviderChoices.M365.value,
{},
),
],
)
def test_get_prowler_provider_kwargs(self, provider_type, expected_extra_kwargs):

View File

@@ -11,6 +11,7 @@ from prowler.providers.azure.azure_provider import AzureProvider
from prowler.providers.common.models import Connection
from prowler.providers.gcp.gcp_provider import GcpProvider
from prowler.providers.kubernetes.kubernetes_provider import KubernetesProvider
from prowler.providers.m365.m365_provider import M365Provider
class CustomOAuth2Client(OAuth2Client):
@@ -51,14 +52,14 @@ def merge_dicts(default_dict: dict, replacement_dict: dict) -> dict:
def return_prowler_provider(
provider: Provider,
) -> [AwsProvider | AzureProvider | GcpProvider | KubernetesProvider]:
) -> [AwsProvider | AzureProvider | GcpProvider | KubernetesProvider | M365Provider]:
"""Return the Prowler provider class based on the given provider type.
Args:
provider (Provider): The provider object containing the provider type and associated secrets.
Returns:
AwsProvider | AzureProvider | GcpProvider | KubernetesProvider: The corresponding provider class.
AwsProvider | AzureProvider | GcpProvider | KubernetesProvider | M365Provider: The corresponding provider class.
Raises:
ValueError: If the provider type specified in `provider.provider` is not supported.
@@ -72,6 +73,8 @@ def return_prowler_provider(
prowler_provider = AzureProvider
case Provider.ProviderChoices.KUBERNETES.value:
prowler_provider = KubernetesProvider
case Provider.ProviderChoices.M365.value:
prowler_provider = M365Provider
case _:
raise ValueError(f"Provider type {provider.provider} not supported")
return prowler_provider
@@ -104,15 +107,15 @@ def get_prowler_provider_kwargs(provider: Provider) -> dict:
def initialize_prowler_provider(
provider: Provider,
) -> AwsProvider | AzureProvider | GcpProvider | KubernetesProvider:
) -> AwsProvider | AzureProvider | GcpProvider | KubernetesProvider | M365Provider:
"""Initialize a Prowler provider instance based on the given provider type.
Args:
provider (Provider): The provider object containing the provider type and associated secrets.
Returns:
AwsProvider | AzureProvider | GcpProvider | KubernetesProvider: An instance of the corresponding provider class
(`AwsProvider`, `AzureProvider`, `GcpProvider`, or `KubernetesProvider`) initialized with the
AwsProvider | AzureProvider | GcpProvider | KubernetesProvider | M365Provider: An instance of the corresponding provider class
(`AwsProvider`, `AzureProvider`, `GcpProvider`, `KubernetesProvider` or `M365Provider`) initialized with the
provider's secrets.
"""
prowler_provider = return_prowler_provider(provider)
@@ -130,10 +133,12 @@ def prowler_provider_connection_test(provider: Provider) -> Connection:
Connection: A connection object representing the result of the connection test for the specified provider.
"""
prowler_provider = return_prowler_provider(provider)
try:
prowler_provider_kwargs = provider.secret.secret
except Provider.secret.RelatedObjectDoesNotExist as secret_error:
return Connection(is_connected=False, error=secret_error)
return prowler_provider.test_connection(
**prowler_provider_kwargs, provider_id=provider.uid, raise_on_exception=False
)

View File

@@ -0,0 +1,172 @@
from drf_spectacular.utils import extend_schema_field
from rest_framework_json_api import serializers
@extend_schema_field(
{
"oneOf": [
{
"type": "object",
"title": "AWS Static Credentials",
"properties": {
"aws_access_key_id": {
"type": "string",
"description": "The AWS access key ID. Required for environments where no IAM role is being "
"assumed and direct AWS access is needed.",
},
"aws_secret_access_key": {
"type": "string",
"description": "The AWS secret access key. Must accompany 'aws_access_key_id' to authorize "
"access to AWS resources.",
},
"aws_session_token": {
"type": "string",
"description": "The session token associated with temporary credentials. Only needed for "
"session-based or temporary AWS access.",
},
},
"required": ["aws_access_key_id", "aws_secret_access_key"],
},
{
"type": "object",
"title": "AWS Assume Role",
"properties": {
"role_arn": {
"type": "string",
"description": "The Amazon Resource Name (ARN) of the role to assume. Required for AWS role "
"assumption.",
},
"external_id": {
"type": "string",
"description": "An identifier to enhance security for role assumption.",
},
"aws_access_key_id": {
"type": "string",
"description": "The AWS access key ID. Only required if the environment lacks pre-configured "
"AWS credentials.",
},
"aws_secret_access_key": {
"type": "string",
"description": "The AWS secret access key. Required if 'aws_access_key_id' is provided or if "
"no AWS credentials are pre-configured.",
},
"aws_session_token": {
"type": "string",
"description": "The session token for temporary credentials, if applicable.",
},
"session_duration": {
"type": "integer",
"minimum": 900,
"maximum": 43200,
"default": 3600,
"description": "The duration (in seconds) for the role session.",
},
"role_session_name": {
"type": "string",
"description": "An identifier for the role session, useful for tracking sessions in AWS logs. "
"The regex used to validate this parameter is a string of characters consisting of "
"upper- and lower-case alphanumeric characters with no spaces. You can also include "
"underscores or any of the following characters: =,.@-\n\n"
"Examples:\n"
"- MySession123\n"
"- User_Session-1\n"
"- Test.Session@2",
"pattern": "^[a-zA-Z0-9=,.@_-]+$",
},
},
"required": ["role_arn", "external_id"],
},
{
"type": "object",
"title": "Azure Static Credentials",
"properties": {
"client_id": {
"type": "string",
"description": "The Azure application (client) ID for authentication in Azure AD.",
},
"client_secret": {
"type": "string",
"description": "The client secret associated with the application (client) ID, providing "
"secure access.",
},
"tenant_id": {
"type": "string",
"description": "The Azure tenant ID, representing the directory where the application is "
"registered.",
},
},
"required": ["client_id", "client_secret", "tenant_id"],
},
{
"type": "object",
"title": "M365 Static Credentials",
"properties": {
"client_id": {
"type": "string",
"description": "The Azure application (client) ID for authentication in Azure AD.",
},
"client_secret": {
"type": "string",
"description": "The client secret associated with the application (client) ID, providing "
"secure access.",
},
"tenant_id": {
"type": "string",
"description": "The Azure tenant ID, representing the directory where the application is "
"registered.",
},
"user": {
"type": "email",
"description": "User microsoft email address.",
},
"encrypted_password": {
"type": "string",
"description": "User encrypted password.",
},
},
"required": [
"client_id",
"client_secret",
"tenant_id",
"user",
"encrypted_password",
],
},
{
"type": "object",
"title": "GCP Static Credentials",
"properties": {
"client_id": {
"type": "string",
"description": "The client ID from Google Cloud, used to identify the application for GCP "
"access.",
},
"client_secret": {
"type": "string",
"description": "The client secret associated with the GCP client ID, required for secure "
"access.",
},
"refresh_token": {
"type": "string",
"description": "A refresh token that allows the application to obtain new access tokens for "
"extended use.",
},
},
"required": ["client_id", "client_secret", "refresh_token"],
},
{
"type": "object",
"title": "Kubernetes Static Credentials",
"properties": {
"kubeconfig_content": {
"type": "string",
"description": "The content of the Kubernetes kubeconfig file, encoded as a string.",
}
},
"required": ["kubeconfig_content"],
},
]
}
)
class ProviderSecretField(serializers.JSONField):
pass

View File

@@ -42,6 +42,7 @@ from api.v1.serializer_utils.integrations import (
IntegrationCredentialField,
S3ConfigSerializer,
)
from api.v1.serializer_utils.providers import ProviderSecretField
# Tokens
@@ -1141,6 +1142,8 @@ class BaseWriteProviderSecretSerializer(BaseWriteSerializer):
serializer = GCPProviderSecret(data=secret)
elif provider_type == Provider.ProviderChoices.KUBERNETES.value:
serializer = KubernetesProviderSecret(data=secret)
elif provider_type == Provider.ProviderChoices.M365.value:
serializer = M365ProviderSecret(data=secret)
else:
raise serializers.ValidationError(
{"provider": f"Provider type not supported {provider_type}"}
@@ -1180,6 +1183,17 @@ class AzureProviderSecret(serializers.Serializer):
resource_name = "provider-secrets"
class M365ProviderSecret(serializers.Serializer):
client_id = serializers.CharField()
client_secret = serializers.CharField()
tenant_id = serializers.CharField()
user = serializers.EmailField()
encrypted_password = serializers.CharField()
class Meta:
resource_name = "provider-secrets"
class GCPProviderSecret(serializers.Serializer):
client_id = serializers.CharField()
client_secret = serializers.CharField()
@@ -1211,141 +1225,6 @@ class AWSRoleAssumptionProviderSecret(serializers.Serializer):
resource_name = "provider-secrets"
@extend_schema_field(
{
"oneOf": [
{
"type": "object",
"title": "AWS Static Credentials",
"properties": {
"aws_access_key_id": {
"type": "string",
"description": "The AWS access key ID. Required for environments where no IAM role is being "
"assumed and direct AWS access is needed.",
},
"aws_secret_access_key": {
"type": "string",
"description": "The AWS secret access key. Must accompany 'aws_access_key_id' to authorize "
"access to AWS resources.",
},
"aws_session_token": {
"type": "string",
"description": "The session token associated with temporary credentials. Only needed for "
"session-based or temporary AWS access.",
},
},
"required": ["aws_access_key_id", "aws_secret_access_key"],
},
{
"type": "object",
"title": "AWS Assume Role",
"properties": {
"role_arn": {
"type": "string",
"description": "The Amazon Resource Name (ARN) of the role to assume. Required for AWS role "
"assumption.",
},
"external_id": {
"type": "string",
"description": "An identifier to enhance security for role assumption.",
},
"aws_access_key_id": {
"type": "string",
"description": "The AWS access key ID. Only required if the environment lacks pre-configured "
"AWS credentials.",
},
"aws_secret_access_key": {
"type": "string",
"description": "The AWS secret access key. Required if 'aws_access_key_id' is provided or if "
"no AWS credentials are pre-configured.",
},
"aws_session_token": {
"type": "string",
"description": "The session token for temporary credentials, if applicable.",
},
"session_duration": {
"type": "integer",
"minimum": 900,
"maximum": 43200,
"default": 3600,
"description": "The duration (in seconds) for the role session.",
},
"role_session_name": {
"type": "string",
"description": "An identifier for the role session, useful for tracking sessions in AWS logs. "
"The regex used to validate this parameter is a string of characters consisting of "
"upper- and lower-case alphanumeric characters with no spaces. You can also include "
"underscores or any of the following characters: =,.@-\n\n"
"Examples:\n"
"- MySession123\n"
"- User_Session-1\n"
"- Test.Session@2",
"pattern": "^[a-zA-Z0-9=,.@_-]+$",
},
},
"required": ["role_arn", "external_id"],
},
{
"type": "object",
"title": "Azure Static Credentials",
"properties": {
"client_id": {
"type": "string",
"description": "The Azure application (client) ID for authentication in Azure AD.",
},
"client_secret": {
"type": "string",
"description": "The client secret associated with the application (client) ID, providing "
"secure access.",
},
"tenant_id": {
"type": "string",
"description": "The Azure tenant ID, representing the directory where the application is "
"registered.",
},
},
"required": ["client_id", "client_secret", "tenant_id"],
},
{
"type": "object",
"title": "GCP Static Credentials",
"properties": {
"client_id": {
"type": "string",
"description": "The client ID from Google Cloud, used to identify the application for GCP "
"access.",
},
"client_secret": {
"type": "string",
"description": "The client secret associated with the GCP client ID, required for secure "
"access.",
},
"refresh_token": {
"type": "string",
"description": "A refresh token that allows the application to obtain new access tokens for "
"extended use.",
},
},
"required": ["client_id", "client_secret", "refresh_token"],
},
{
"type": "object",
"title": "Kubernetes Static Credentials",
"properties": {
"kubeconfig_content": {
"type": "string",
"description": "The content of the Kubernetes kubeconfig file, encoded as a string.",
}
},
"required": ["kubeconfig_content"],
},
]
}
)
class ProviderSecretField(serializers.JSONField):
pass
class ProviderSecretSerializer(RLSSerializer):
"""
Serializer for the ProviderSecret model.

View File

@@ -399,7 +399,6 @@ mainConfig:
[
"RSA-1024",
"P-192",
"SHA-1",
]
# AWS EKS Configuration

1854
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -8,8 +8,14 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Add SOC2 compliance framework to Azure [(#7489)](https://github.com/prowler-cloud/prowler/pull/7489).
- Add check for unused Service Accounts in GCP [(#7419)](https://github.com/prowler-cloud/prowler/pull/7419).
- Add Powershell to Microsoft365 [(#7331)](https://github.com/prowler-cloud/prowler/pull/7331).
- Add service Defender to Microsoft365 with one check for Common Attachments filter enabled in Malware Policies [(#7425)](https://github.com/prowler-cloud/prowler/pull/7425).
- Add check for Antiphishing Policy well configured in service Defender in M365 [(#7453)](https://github.com/prowler-cloud/prowler/pull/7453).
- Add check for Notifications for Internal users enabled in Malware Policies from service Defender in M365 [(#7435)](https://github.com/prowler-cloud/prowler/pull/7435).
- Support CLOUDSDK_AUTH_ACCESS_TOKEN in GCP [(#7495)](https://github.com/prowler-cloud/prowler/pull/7495).
- Add Powershell to Microsoft365 [(#7331)](https://github.com/prowler-cloud/prowler/pull/7331)
- Add service Exchange to Microsoft365 with one check for Organizations Mailbox Auditing enabled [(#7408)](https://github.com/prowler-cloud/prowler/pull/7408)
- Add check for Bypass Disable in every Mailbox for service Defender in M365 [(#7418)](https://github.com/prowler-cloud/prowler/pull/7418)
- Add new check `teams_email_sending_to_channel_disabled` [(#7533)](https://github.com/prowler-cloud/prowler/pull/7533)
### Fixed

View File

@@ -327,7 +327,6 @@ aws:
[
"RSA-1024",
"P-192",
"SHA-1",
]
# AWS EKS Configuration

View File

@@ -5,6 +5,8 @@ import re
import subprocess
import threading
from prowler.lib.logger import logger
class PowerShellSession:
"""
@@ -179,14 +181,30 @@ class PowerShellSession:
Returns:
dict: Parsed JSON object if found, otherwise an empty dictionary.
Raises:
JSONDecodeError: If the JSON parsing fails.
Example:
>>> json_parse_output('Some text {"key": "value"} more text')
{"key": "value"}
"""
if output == "":
return {}
json_match = re.search(r"(\[.*\]|\{.*\})", output, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
return {}
if not json_match:
logger.warning(
f"Could not parse PowerShell output as JSON.\nOriginal output: {output}",
)
return {}
else:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError as error:
logger.error(
f"Error parsing PowerShell output as JSON: {str(error)}\nOriginal output: {output}",
)
return {}
def close(self) -> None:
"""

View File

@@ -14,7 +14,7 @@ class acm_certificates_with_secure_key_algorithms(Check):
report.status = "PASS"
report.status_extended = f"ACM Certificate {certificate.id} for {certificate.name} uses a secure key algorithm ({certificate.key_algorithm})."
if certificate.key_algorithm in acm_client.audit_config.get(
"insecure_key_algorithms", ["RSA-1024", "P-192", "SHA-1"]
"insecure_key_algorithms", ["RSA-1024", "P-192"]
):
report.status = "FAIL"
report.status_extended = f"ACM Certificate {certificate.id} for {certificate.name} does not use a secure key algorithm ({certificate.key_algorithm})."

View File

@@ -94,7 +94,7 @@ class M365BaseException(ProwlerException):
"message": "Tenant Id is required for Microsoft 365 static credentials. Make sure you are using the correct credentials.",
"remediation": "Check the Microsoft 365 Tenant ID and ensure it is properly set up.",
},
(6022, "M365MissingEnvironmentUserCredentialsError"): {
(6022, "M365MissingEnvironmentCredentialsError"): {
"message": "User and Password environment variables are needed to use Credentials authentication method.",
"remediation": "Ensure your environment variables are properly set up.",
},
@@ -102,6 +102,18 @@ class M365BaseException(ProwlerException):
"message": "User or Password environment variables are not correct.",
"remediation": "Ensure you are using the right credentials.",
},
(6024, "M365NotValidUserError"): {
"message": "The provided M365 User is not valid.",
"remediation": "Check the M365 User and ensure it is a valid user.",
},
(6025, "M365NotValidEncryptedPasswordError"): {
"message": "The provided M365 Encrypted Password is not valid.",
"remediation": "Check the M365 Encrypted Password and ensure it is a valid password.",
},
(6026, "M365UserNotBelongingToTenantError"): {
"message": "The provided M365 User does not belong to the specified tenant.",
"remediation": "Check the M365 User email domain and ensure it belongs to the specified tenant.",
},
}
def __init__(self, code, file=None, original_exception=None, message=None):
@@ -279,7 +291,7 @@ class M365NotTenantIdButClientIdAndClientSecretError(M365CredentialsError):
)
class M365MissingEnvironmentUserCredentialsError(M365CredentialsError):
class M365MissingEnvironmentCredentialsError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6022, file=file, original_exception=original_exception, message=message
@@ -291,3 +303,24 @@ class M365EnvironmentUserCredentialsError(M365CredentialsError):
super().__init__(
6023, file=file, original_exception=original_exception, message=message
)
class M365NotValidUserError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6024, file=file, original_exception=original_exception, message=message
)
class M365NotValidEncryptedPasswordError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6025, file=file, original_exception=original_exception, message=message
)
class M365UserNotBelongingToTenantError(M365CredentialsError):
def __init__(self, file=None, original_exception=None, message=None):
super().__init__(
6026, file=file, original_exception=original_exception, message=message
)

View File

@@ -1,6 +1,11 @@
import os
import msal
from prowler.lib.powershell.powershell import PowerShellSession
from prowler.providers.m365.exceptions.exceptions import (
M365UserNotBelongingToTenantError,
)
from prowler.providers.m365.models import M365Credentials
@@ -102,7 +107,21 @@ class M365PowerShell(PowerShellSession):
scopes=["https://graph.microsoft.com/.default"],
)
return "access_token" in result
if result is None:
return False
if "access_token" not in result:
return False
# Validate user credentials belong to tenant
user_domain = credentials.user.split("@")[1]
if not credentials.provider_id.endswith(user_domain):
raise M365UserNotBelongingToTenantError(
file=os.path.basename(__file__),
message="The provided M365 User does not belong to the specified tenant.",
)
return True
def connect_microsoft_teams(self) -> dict:
"""
@@ -169,3 +188,102 @@ class M365PowerShell(PowerShellSession):
return self.execute(
"Get-AdminAuditLogConfig | Select-Object UnifiedAuditLogIngestionEnabled | ConvertTo-Json"
)
def get_malware_filter_policy(self) -> dict:
"""
Get Defender Malware Filter Policy.
Retrieves the current Defender anti-malware filter policy settings.
Returns:
dict: Malware filter policy settings in JSON format.
Example:
>>> get_malware_filter_policy()
{
"EnableFileFilter": true,
"Identity": "Default"
}
"""
return self.execute("Get-MalwareFilterPolicy | ConvertTo-Json")
def get_antiphishing_policy(self) -> dict:
"""
Get Defender Antiphishing Policy.
Retrieves the current Defender anti-phishing policy settings.
Returns:
dict: Antiphishing policy settings in JSON format.
Example:
>>> get_antiphishing_policy()
{
"EnableSpoofIntelligence": true,
"AuthenticationFailAction": "Quarantine",
"DmarcRejectAction": "Quarantine",
"DmarcQuarantineAction": "Quarantine",
"EnableFirstContactSafetyTips": true,
"EnableUnauthenticatedSender": true,
"EnableViaTag": true,
"HonorDmarcPolicy": true,
"IsDefault": false
}
"""
return self.execute("Get-AntiPhishPolicy | ConvertTo-Json")
def get_antiphishing_rules(self) -> dict:
"""
Get Defender Antiphishing Rules.
Retrieves the current Defender anti-phishing rules.
Returns:
dict: Antiphishing rules in JSON format.
Example:
>>> get_antiphishing_rules()
{
"Name": "Rule1",
"State": Enabled,
}
"""
return self.execute("Get-AntiPhishRule | ConvertTo-Json")
def get_organization_config(self) -> dict:
"""
Get Exchange Online Organization Configuration.
Retrieves the current Exchange Online organization configuration settings.
Returns:
dict: Organization configuration settings in JSON format.
Example:
>>> get_organization_config()
{
"Name": "MyOrganization",
"Guid": "12345678-1234-1234-1234-123456789012"
"AuditDisabled": false
}
"""
return self.execute("Get-OrganizationConfig | ConvertTo-Json")
def get_mailbox_audit_config(self) -> dict:
"""
Get Exchange Online Mailbox Audit Configuration.
Retrieves the current mailbox audit configuration settings for Exchange Online.
Returns:
dict: Mailbox audit configuration settings in JSON format.
Example:
>>> get_mailbox_audit_config()
{
"Name": "MyMailbox",
"Id": "12345678-1234-1234-1234-123456789012",
"AuditBypassEnabled": false
}
"""
return self.execute("Get-MailboxAuditBypassAssociation | ConvertTo-Json")

View File

@@ -1,6 +1,5 @@
import asyncio
import os
import re
from argparse import ArgumentTypeError
from os import getenv
from uuid import UUID
@@ -40,12 +39,14 @@ from prowler.providers.m365.exceptions.exceptions import (
M365HTTPResponseError,
M365InteractiveBrowserCredentialError,
M365InvalidProviderIdError,
M365MissingEnvironmentUserCredentialsError,
M365MissingEnvironmentCredentialsError,
M365NoAuthenticationMethodError,
M365NotTenantIdButClientIdAndClientSecretError,
M365NotValidClientIdError,
M365NotValidClientSecretError,
M365NotValidEncryptedPasswordError,
M365NotValidTenantIdError,
M365NotValidUserError,
M365SetUpRegionConfigError,
M365SetUpSessionError,
M365TenantIdAndClientIdNotBelongingToClientSecretError,
@@ -105,10 +106,10 @@ class M365Provider(Provider):
def __init__(
self,
sp_env_auth: bool,
env_auth: bool,
az_cli_auth: bool,
browser_auth: bool,
sp_env_auth: bool = False,
env_auth: bool = False,
az_cli_auth: bool = False,
browser_auth: bool = False,
tenant_id: str = None,
client_id: str = None,
client_secret: str = None,
@@ -187,9 +188,6 @@ class M365Provider(Provider):
self._region_config,
)
# Set up PowerShell session credentials
self._credentials = self.setup_powershell(env_auth, m365_credentials)
# Set up the identity
self._identity = self.setup_identity(
az_cli_auth,
@@ -199,6 +197,13 @@ class M365Provider(Provider):
client_id,
)
# Set up PowerShell session credentials
self._credentials = self.setup_powershell(
env_auth=env_auth,
m365_credentials=m365_credentials,
provider_id=self.identity.tenant_domain,
)
# Audit Config
if config_content:
self._audit_config = config_content
@@ -294,8 +299,8 @@ class M365Provider(Provider):
M365BrowserAuthNoTenantIDError: If browser authentication is enabled but the tenant ID is not found.
"""
if not client_id and not client_secret and not user and not encrypted_password:
if not browser_auth and tenant_id:
if not client_id and not client_secret:
if not browser_auth and tenant_id and not env_auth:
raise M365BrowserAuthNoFlagError(
file=os.path.basename(__file__),
message="M365 tenant ID error: browser authentication flag (--browser-auth) not found",
@@ -315,6 +320,12 @@ class M365Provider(Provider):
file=os.path.basename(__file__),
message="M365 Tenant ID (--tenant-id) is required for browser authentication mode",
)
elif env_auth:
if not user or not encrypted_password or not tenant_id:
raise M365MissingEnvironmentCredentialsError(
file=os.path.basename(__file__),
message="M365 provider requires AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID, M365_USER and M365_ENCRYPTED_PASSWORD environment variables to be set when using --env-auth",
)
else:
if not tenant_id:
raise M365NotTenantIdButClientIdAndClientSecretError(
@@ -362,7 +373,7 @@ class M365Provider(Provider):
@staticmethod
def setup_powershell(
env_auth: bool = False, m365_credentials: dict = {}
env_auth: bool = False, m365_credentials: dict = {}, provider_id: str = None
) -> M365Credentials:
"""Gets the M365 credentials.
@@ -382,6 +393,7 @@ class M365Provider(Provider):
client_id=m365_credentials.get("client_id", ""),
client_secret=m365_credentials.get("client_secret", ""),
tenant_id=m365_credentials.get("tenant_id", ""),
provider_id=provider_id,
)
elif env_auth:
m365_user = getenv("M365_USER")
@@ -394,7 +406,7 @@ class M365Provider(Provider):
logger.critical(
"M365 provider: Missing M365_USER or M365_ENCRYPTED_PASSWORD environment variables needed for credentials authentication"
)
raise M365MissingEnvironmentUserCredentialsError(
raise M365MissingEnvironmentCredentialsError(
file=os.path.basename(__file__),
message="Missing M365_USER or M365_ENCRYPTED_PASSWORD environment variables required for credentials authentication.",
)
@@ -404,6 +416,7 @@ class M365Provider(Provider):
client_id=client_id,
client_secret=client_secret,
tenant_id=tenant_id,
provider_id=provider_id,
)
if credentials:
@@ -434,8 +447,11 @@ class M365Provider(Provider):
f"M365 Region: {Fore.YELLOW}{self.region_config.name}{Style.RESET_ALL}",
f"M365 Tenant Domain: {Fore.YELLOW}{self._identity.tenant_domain}{Style.RESET_ALL} M365 Tenant ID: {Fore.YELLOW}{self._identity.tenant_id}{Style.RESET_ALL}",
f"M365 Identity Type: {Fore.YELLOW}{self._identity.identity_type}{Style.RESET_ALL} M365 Identity ID: {Fore.YELLOW}{self._identity.identity_id}{Style.RESET_ALL}",
f"M365 User: {Fore.YELLOW}{self.credentials.user}{Style.RESET_ALL}",
]
if self.credentials and self.credentials.user:
report_lines.append(
f"M365 User: {Fore.YELLOW}{self.credentials.user}{Style.RESET_ALL}"
)
report_title = (
f"{Style.BRIGHT}Using the M365 credentials below:{Style.RESET_ALL}"
)
@@ -466,6 +482,9 @@ class M365Provider(Provider):
- tenant_id: The M365 Active Directory tenant ID.
- client_id: The M365 client ID.
- client_secret: The M365 client secret
- user: The M365 user email
- encrypted_password: The M365 encrypted password
- provider_id: The M365 provider ID (in this case the Tenant ID).
region_config (M365RegionConfig): The region configuration object.
Returns:
@@ -587,11 +606,12 @@ class M365Provider(Provider):
browser_auth: bool = False,
tenant_id: str = None,
region: str = "M365Global",
raise_on_exception=True,
client_id=None,
client_secret=None,
user=None,
encrypted_password=None,
raise_on_exception: bool = True,
client_id: str = None,
client_secret: str = None,
user: str = None,
encrypted_password: str = None,
provider_id: str = None,
) -> Connection:
"""Test connection to M365 subscription.
@@ -610,6 +630,7 @@ class M365Provider(Provider):
client_secret (str): The M365 client secret.
user (str): The M365 user email.
encrypted_password (str): The M365 encrypted_password.
provider_id (str): The M365 provider ID (in this case the Tenant ID).
Returns:
@@ -649,11 +670,22 @@ class M365Provider(Provider):
# Get the dict from the static credentials
m365_credentials = None
if tenant_id and client_id and client_secret:
m365_credentials = M365Provider.validate_static_credentials(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
)
if not user and not encrypted_password:
m365_credentials = M365Provider.validate_static_credentials(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
user="user",
encrypted_password="encrypted_password",
)
else:
m365_credentials = M365Provider.validate_static_credentials(
tenant_id=tenant_id,
client_id=client_id,
client_secret=client_secret,
user=user,
encrypted_password=encrypted_password,
)
# Set up the M365 session
credentials = M365Provider.setup_session(
@@ -668,7 +700,19 @@ class M365Provider(Provider):
GraphServiceClient(credentials=credentials)
logger.info("M365 provider: Connection to M365 successful")
logger.info("M365 provider: Connection to MSGraph successful")
# Set up PowerShell credentials
if user and encrypted_password:
M365Provider.setup_powershell(
env_auth,
m365_credentials,
provider_id,
)
else:
logger.info("M365 provider: PowerShell authentication not required")
logger.info("M365 provider: Connection to PowerShell successful")
return Connection(is_connected=True)
@@ -896,7 +940,11 @@ class M365Provider(Provider):
@staticmethod
def validate_static_credentials(
tenant_id: str = None, client_id: str = None, client_secret: str = None
tenant_id: str = None,
client_id: str = None,
client_secret: str = None,
user: str = None,
encrypted_password: str = None,
) -> dict:
"""
Validates the static credentials for the M365 provider.
@@ -905,6 +953,8 @@ class M365Provider(Provider):
tenant_id (str): The M365 Active Directory tenant ID.
client_id (str): The M365 client ID.
client_secret (str): The M365 client secret.
user (str): The M365 user email.
encrypted_password (str): The M365 encrypted password.
Raises:
M365NotValidTenantIdError: If the provided M365 Tenant ID is not valid.
@@ -934,19 +984,36 @@ class M365Provider(Provider):
file=os.path.basename(__file__),
message="The provided M365 Client ID is not valid.",
)
# Validate the Client Secret
if not re.match("^[a-zA-Z0-9._~-]+$", client_secret):
if not client_secret:
raise M365NotValidClientSecretError(
file=os.path.basename(__file__),
message="The provided M365 Client Secret is not valid.",
)
# Validate the User
if not user:
raise M365NotValidUserError(
file=os.path.basename(__file__),
message="The provided M365 User is not valid.",
)
# Validate the Encrypted Password
if not encrypted_password:
raise M365NotValidEncryptedPasswordError(
file=os.path.basename(__file__),
message="The provided M365 Encrypted Password is not valid.",
)
try:
M365Provider.verify_client(tenant_id, client_id, client_secret)
return {
"tenant_id": tenant_id,
"client_id": client_id,
"client_secret": client_secret,
"user": user,
"encrypted_password": encrypted_password,
}
except M365NotValidTenantIdError as tenant_id_error:
logger.error(

View File

@@ -25,6 +25,7 @@ class M365Credentials(BaseModel):
client_id: str = ""
client_secret: str = ""
tenant_id: str = ""
provider_id: str = ""
class M365OutputOptions(ProviderOutputOptions):

View File

@@ -1,7 +1,6 @@
from asyncio import gather, get_event_loop
from typing import List, Optional
from msgraph.generated.models.o_data_errors.o_data_error import ODataError
from pydantic import BaseModel
from prowler.lib.logger import logger
@@ -40,20 +39,6 @@ class AdminCenter(M365Service):
license_details = await self.client.users.by_user_id(
user.id
).license_details.get()
try:
mailbox_settings = await self.client.users.by_user_id(
user.id
).mailbox_settings.get()
mailbox_settings.user_purpose
except ODataError as error:
if error.error.code == "MailboxNotEnabledForRESTAPI":
logger.warning(
f"MailboxNotEnabledForRESTAPI for user {user.id}"
)
else:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
users.update(
{
user.id: User(

View File

@@ -0,0 +1,30 @@
{
"Provider": "m365",
"CheckID": "defender_antiphishing_policy_configured",
"CheckTitle": "Ensure anti-phishing policies are properly configured and active.",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "low",
"ResourceType": "Defender Anti-Phishing Policy",
"Description": "Ensure that anti-phishing policies are created and configured for specific users, groups, or domains, taking precedence over the default policy. This check verifies the existence of rules within policies and validates specific policy settings such as spoof intelligence, DMARC actions, safety tips, and unauthenticated sender actions.",
"Risk": "Without anti-phishing policies, organizations may rely solely on default settings, which might not adequately protect against phishing attacks targeted at specific users, groups, or domains. This increases the risk of successful phishing attempts and potential data breaches.",
"RelatedUrl": "https://learn.microsoft.com/en-us/microsoft-365/security/office-365-security/set-up-anti-phishing-policies?view=o365-worldwide",
"Remediation": {
"Code": {
"CLI": "$params = @{Name='<policy_name>';PhishThresholdLevel=3;EnableTargetedUserProtection=$true;EnableOrganizationDomainsProtection=$true;EnableMailboxIntelligence=$true;EnableMailboxIntelligenceProtection=$true;EnableSpoofIntelligence=$true;TargetedUserProtectionAction='Quarantine';TargetedDomainProtectionAction='Quarantine';MailboxIntelligenceProtectionAction='Quarantine';TargetedUserQuarantineTag='DefaultFullAccessWithNotificationPolicy';MailboxIntelligenceQuarantineTag='DefaultFullAccessWithNotificationPolicy';TargetedDomainQuarantineTag='DefaultFullAccessWithNotificationPolicy';EnableFirstContactSafetyTips=$true;EnableSimilarUsersSafetyTips=$true;EnableSimilarDomainsSafetyTips=$true;EnableUnusualCharactersSafetyTips=$true;HonorDmarcPolicy=$true}; New-AntiPhishPolicy @params; New-AntiPhishRule -Name $params.Name -AntiPhishPolicy $params.Name -RecipientDomainIs (Get-AcceptedDomain).Name -Priority 0",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft 365 Defender https://security.microsoft.com. 2. Click to expand Email & collaboration and select Policies & rules. 3. On the Policies & rules page select Threat policies. 4. Under Policies, select Anti-phishing 5. Ensure policies have rules with the state set to 'on' and validate settings: spoof intelligence enabled, spoof intelligence action set to 'Quarantine', DMARC reject and quarantine actions, safety tips enabled, unauthenticated sender action enabled, show tag enabled, and honor DMARC policy enabled. If not, modify them to be as recommended.",
"Terraform": ""
},
"Recommendation": {
"Text": "Create and configure anti-phishing policies for specific users, groups, or domains to enhance protection against phishing attacks.",
"Url": "https://learn.microsoft.com/en-us/microsoft-365/security/office-365-security/set-up-anti-phishing-policies?view=o365-worldwide"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,59 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defender.defender_client import defender_client
class defender_antiphishing_policy_configured(Check):
"""
Check if an anti-phishing policy is established and properly configured in the Defender service.
Attributes:
metadata: Metadata associated with the check (inherited from Check).
"""
def execute(self) -> List[CheckReportM365]:
"""
Execute the check to verify if an anti-phishing policy is established and properly configured.
This method checks the Defender anti-phishing policies to ensure they are configured
according to best practices.
Returns:
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
for policy_name, policy in defender_client.antiphishing_policies.items():
report = CheckReportM365(
metadata=self.metadata(),
resource=policy,
resource_name="Defender Anti-Phishing Policy",
resource_id=policy_name,
)
report.status = "FAIL"
report.status_extended = (
f"Anti-phishing policy {policy_name} is not properly configured."
)
if (
not policy.default
and policy_name in defender_client.antiphising_rules
and defender_client.antiphising_rules[policy_name].state.lower()
== "enabled"
) or policy.default:
if (
policy.spoof_intelligence
and policy.spoof_intelligence_action.lower() == "quarantine"
and policy.dmarc_reject_action.lower() == "quarantine"
and policy.dmarc_quarantine_action.lower() == "quarantine"
and policy.safety_tips
and policy.unauthenticated_sender_action
and policy.show_tag
and policy.honor_dmarc_policy
):
report.status = "PASS"
report.status_extended = f"Anti-phishing policy {policy_name} is properly configured and enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.m365.services.defender.defender_service import Defender
defender_client = Defender(Provider.get_global_provider())

View File

@@ -0,0 +1,30 @@
{
"Provider": "m365",
"CheckID": "defender_malware_policy_common_attachments_filter_enabled",
"CheckTitle": "Ensure the Common Attachment Types Filter is enabled.",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "critical",
"ResourceType": "Defender Malware Policy",
"Description": "Ensure that the Common Attachment Types Filter is enabled in anti-malware policies to block known and custom malicious file types from being attached to emails.",
"Risk": "If this setting is not enabled, users may receive emails with malicious attachments that could contain malware, increasing the risk of endpoint infection or data compromise.",
"RelatedUrl": "https://learn.microsoft.com/en-us/defender-office-365/anti-malware-policies-configure?view=o365-worldwide",
"Remediation": {
"Code": {
"CLI": "Set-MalwareFilterPolicy -Identity Default -EnableFileFilter $true",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft 365 Defender https://security.microsoft.com. 2. Click to expand Email & collaboration and select Policies & rules. 3. On the Policies & rules page select Threat policies. 4. Under Policies, select Anti-malware and click on the Default (Default) policy. 5. On the policy page, scroll to the bottom and click Edit protection settings. 6. Check the option Enable the common attachments filter. 7. Click Save.",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable the common attachment types filter in your default or custom anti-malware policy to prevent the delivery of emails with potentially dangerous attachments.",
"Url": "https://learn.microsoft.com/en-us/powershell/module/exchange/set-malwarefilterpolicy?view=exchange-ps"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,54 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defender.defender_client import defender_client
class defender_malware_policy_common_attachments_filter_enabled(Check):
"""
Check if the Common Attachment Types Filter is enabled in the Defender anti-malware policy.
Attributes:
metadata: Metadata associated with the check (inherited from Check).
"""
def execute(self) -> List[CheckReportM365]:
"""
Execute the check to verify if the Common Attachment Types Filter is enabled.
This method checks the Defender anti-malware policy to determine if the
Common Attachment Types Filter is enabled.
Returns:
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
if not defender_client.malware_policies:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender Malware Policy",
resource_id="defenderMalwarePolicy",
)
report.status = "FAIL"
report.status_extended = "Common Attachment Types Filter is not enabled."
findings.append(report)
else:
for policy in defender_client.malware_policies:
report = CheckReportM365(
metadata=self.metadata(),
resource=policy,
resource_name="Defender Malware Policy",
resource_id="defenderMalwarePolicy",
)
report.status = "FAIL"
report.status_extended = f"Common Attachment Types Filter is not enabled in anti-malware policy {policy.identity}."
if policy.enable_file_filter:
report.status = "PASS"
report.status_extended = f"Common Attachment Types Filter is enabled in anti-malware policy {policy.identity}."
break
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "m365",
"CheckID": "defender_malware_policy_notifications_internal_users_malware_enabled",
"CheckTitle": "Ensure notifications for internal users sending malware is Enabled",
"CheckType": [],
"ServiceName": "defender",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Defender Malware Policy",
"Description": "Verify that Exchange Online Protection (EOP) is configured to notify admins of malicious activity from internal users.",
"Risk": "If notifications for internal users sending malware are not enabled, administrators may not be aware of potential threats originating from within the organization, increasing the risk of undetected malicious activities.",
"RelatedUrl": "https://learn.microsoft.com/en-us/defender-office-365/anti-malware-protection-about",
"Remediation": {
"Code": {
"CLI": "Set-MalwareFilterPolicy -Identity Default -EnableInternalSenderAdminNotifications $true -InternalSenderAdminAddress 'admin@example.com'",
"NativeIaC": "",
"Other": "1. Connect to Exchange Online using Connect-ExchangeOnline. 2. Execute the command: Get-MalwareFilterPolicy | fl Identity, EnableInternalSenderAdminNotifications, InternalSenderAdminAddress. 3. Ensure 'Notify an admin about undelivered messages from internal senders' is set to On and that at least one email address is listed under Administrator email address.",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable notifications for internal users sending malware in your Defender Malware Policy to ensure admins are alerted of potential threats.",
"Url": "https://learn.microsoft.com/en-us/powershell/module/exchange/set-malwarefilterpolicy?view=exchange-ps"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,62 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.defender.defender_client import defender_client
class defender_malware_policy_notifications_internal_users_malware_enabled(Check):
"""
Check if notifications for internal users sending malware are enabled in the Defender anti-malware policy.
Attributes:
metadata: Metadata associated with the check (inherited from Check).
"""
def execute(self) -> List[CheckReportM365]:
"""
Execute the check to verify if notifications for internal users sending malware are enabled.
This method checks the Defender anti-malware policy to determine if notifications for internal users
sending malware are enabled.
Returns:
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
if not defender_client.malware_policies:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Defender Malware Policy",
resource_id="defenderMalwarePolicy",
)
report.status = "FAIL"
report.status_extended = (
"Notifications for internal users sending malware are not enabled."
)
findings.append(report)
else:
for policy in defender_client.malware_policies:
report = CheckReportM365(
metadata=self.metadata(),
resource=policy,
resource_name="Defender Malware Policy",
resource_id="defenderMalwarePolicy",
)
report.status = "FAIL"
report.status_extended = (
"Notifications for internal users sending malware are not enabled."
)
if policy.enable_internal_sender_admin_notifications:
if policy.internal_sender_admin_address:
report.status = "PASS"
report.status_extended = "Notifications for internal users sending malware are enabled."
break
else:
report.status = "FAIL"
report.status_extended = "Notifications for internal users sending malware are enabled, but no email addresses are configured."
findings.append(report)
return findings

View File

@@ -0,0 +1,113 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
class Defender(M365Service):
def __init__(self, provider: M365Provider):
super().__init__(provider)
self.powershell.connect_exchange_online()
self.malware_policies = self._get_malware_filter_policy()
self.antiphishing_policies = self._get_antiphising_policy()
self.antiphising_rules = self._get_antiphising_rules()
self.powershell.close()
def _get_malware_filter_policy(self):
logger.info("M365 - Getting Defender malware filter policy...")
malware_policies = []
try:
malware_policy = self.powershell.get_malware_filter_policy()
if isinstance(malware_policy, dict):
malware_policy = [malware_policy]
for policy in malware_policy:
if policy:
malware_policies.append(
DefenderMalwarePolicy(
enable_file_filter=policy.get("EnableFileFilter", True),
identity=policy.get("Identity", ""),
enable_internal_sender_admin_notifications=policy.get(
"EnableInternalSenderAdminNotifications", False
),
internal_sender_admin_address=policy.get(
"InternalSenderAdminAddress", ""
),
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return malware_policies
def _get_antiphising_policy(self):
logger.info("Microsoft365 - Getting Defender antiphishing policy...")
antiphishing_policies = {}
try:
antiphishing_policy = self.powershell.get_antiphishing_policy()
if isinstance(antiphishing_policy, dict):
antiphishing_policy = [antiphishing_policy]
for policy in antiphishing_policy:
if policy:
antiphishing_policies[policy.get("Name", "")] = AntiphishingPolicy(
spoof_intelligence=policy.get("EnableSpoofIntelligence", True),
spoof_intelligence_action=policy.get(
"AuthenticationFailAction", ""
),
dmarc_reject_action=policy.get("DmarcRejectAction", ""),
dmarc_quarantine_action=policy.get("DmarcQuarantineAction", ""),
safety_tips=policy.get("EnableFirstContactSafetyTips", True),
unauthenticated_sender_action=policy.get(
"EnableUnauthenticatedSender", True
),
show_tag=policy.get("EnableViaTag", True),
honor_dmarc_policy=policy.get("HonorDmarcPolicy", True),
default=policy.get("IsDefault", False),
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return antiphishing_policies
def _get_antiphising_rules(self):
logger.info("Microsoft365 - Getting Defender antiphishing rules...")
antiphishing_rules = {}
try:
antiphishing_rule = self.powershell.get_antiphishing_rules()
if isinstance(antiphishing_rule, dict):
antiphishing_rule = [antiphishing_rule]
for rule in antiphishing_rule:
if rule:
antiphishing_rules[rule.get("Name", "")] = AntiphishingRule(
state=rule.get("State", ""),
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return antiphishing_rules
class DefenderMalwarePolicy(BaseModel):
enable_file_filter: bool
identity: str
enable_internal_sender_admin_notifications: bool
internal_sender_admin_address: str
class AntiphishingPolicy(BaseModel):
spoof_intelligence: bool
spoof_intelligence_action: str
dmarc_reject_action: str
dmarc_quarantine_action: str
safety_tips: bool
unauthenticated_sender_action: bool
show_tag: bool
honor_dmarc_policy: bool
default: bool
class AntiphishingRule(BaseModel):
state: str

View File

@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.m365.services.exchange.exchange_service import Exchange
exchange_client = Exchange(Provider.get_global_provider())

View File

@@ -0,0 +1,30 @@
{
"Provider": "m365",
"CheckID": "exchange_mailbox_audit_bypass_disabled",
"CheckTitle": "Ensure 'AuditBypassEnabled' is not enabled on any mailbox in the organization.",
"CheckType": [],
"ServiceName": "exchange",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Exchange Mailboxes",
"Description": "Ensure that no mailboxes in the organization have 'AuditBypassEnabled' set to true. This setting prevents mailbox audit logging and can allow unauthorized access without traceability.",
"Risk": "If 'AuditBypassEnabled' is set to true for any mailbox, access to those mailboxes won't be logged, creating a blind spot in forensic analysis and increasing the risk of undetected malicious activity.",
"RelatedUrl": "https://learn.microsoft.com/en-us/powershell/module/exchange/get-mailboxauditbypassassociation?view=exchange-ps",
"Remediation": {
"Code": {
"CLI": "$MBXAudit = Get-MailboxAuditBypassAssociation -ResultSize unlimited | Where-Object { $_.AuditBypassEnabled -eq $true }; foreach ($mailbox in $MBXAudit) { $mailboxName = $mailbox.Name; Set-MailboxAuditBypassAssociation -Identity $mailboxName -AuditBypassEnabled $false; Write-Host \"Audit Bypass disabled for mailbox Identity: $mailboxName\" -ForegroundColor Green }",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Ensure that no mailboxes have 'AuditBypassEnabled' enabled to guarantee full audit logging for all mailbox activities.",
"Url": "https://learn.microsoft.com/en-us/powershell/module/exchange/set-mailboxauditbypassassociation?view=exchange-ps"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,39 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.exchange.exchange_client import exchange_client
class exchange_mailbox_audit_bypass_disabled(Check):
"""Verify if Exchange mailbox auditing is enabled.
This check ensures that mailbox auditing is not bypassed and is properly enabled.
"""
def execute(self) -> List[CheckReportM365]:
"""Run the check to validate Exchange mailbox auditing.
Iterates through the mailbox configurations to determine if auditing is enabled
and generates a report for each mailbox.
Returns:
List[CheckReportM365]: A list of reports with the audit status for each mailbox.
"""
findings = []
for mailbox_config in exchange_client.mailboxes_config:
report = CheckReportM365(
metadata=self.metadata(),
resource=mailbox_config,
resource_name=mailbox_config.name,
resource_id=mailbox_config.id,
)
report.status = "FAIL"
report.status_extended = f"Exchange mailbox auditing is bypassed and not enabled for mailbox: {mailbox_config.name}."
if not mailbox_config.audit_bypass_enabled:
report.status = "PASS"
report.status_extended = f"Exchange mailbox auditing is enabled for mailbox: {mailbox_config.name}."
findings.append(report)
return findings

View File

@@ -0,0 +1,30 @@
{
"Provider": "m365",
"CheckID": "exchange_organization_mailbox_auditing_enabled",
"CheckTitle": "Ensure AuditDisabled organizationally is set to False.",
"CheckType": [],
"ServiceName": "exchange",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Exchange Organization Configuration",
"Description": "Ensure that the AuditDisabled property is set to False at the organizational level in Exchange Online. This enables mailbox auditing by default for all mailboxes and overrides individual mailbox settings.",
"Risk": "If mailbox auditing is disabled at the organization level, no mailbox actions are audited, limiting forensic investigation capabilities and exposing the organization to undetected malicious activity.",
"RelatedUrl": "https://learn.microsoft.com/en-us/purview/audit-mailboxes?view=o365-worldwide",
"Remediation": {
"Code": {
"CLI": "Set-OrganizationConfig -AuditDisabled $false",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Set AuditDisabled to False at the organization level to ensure mailbox auditing is always enforced.",
"Url": "https://learn.microsoft.com/en-us/powershell/module/exchange/set-organizationconfig?view=exchange-ps#-auditdisabled"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,44 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.exchange.exchange_client import exchange_client
class exchange_organization_mailbox_auditing_enabled(Check):
"""Check if Exchange mailbox auditing is enabled.
Attributes:
metadata: Metadata associated with the check (inherited from Check).
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the check for Exchange mailbox auditing.
This method checks if mailbox auditing is enabled in the Exchange organization configuration.
Returns:
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
organization_config = exchange_client.organization_config
if organization_config:
report = CheckReportM365(
metadata=self.metadata(),
resource=organization_config,
resource_name=organization_config.name,
resource_id=organization_config.guid,
)
report.status = "FAIL"
report.status_extended = (
"Exchange mailbox auditing is not enabled on your organization."
)
if not organization_config.audit_disabled:
report.status = "PASS"
report.status_extended = (
"Exchange mailbox auditing is enabled on your organization."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,66 @@
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.m365.lib.service.service import M365Service
from prowler.providers.m365.m365_provider import M365Provider
class Exchange(M365Service):
def __init__(self, provider: M365Provider):
super().__init__(provider)
self.powershell.connect_exchange_online()
self.organization_config = self._get_organization_config()
self.mailboxes_config = self._get_mailbox_audit_config()
self.powershell.close()
def _get_organization_config(self):
logger.info("Microsoft365 - Getting Exchange Organization configuration...")
organization_config = None
try:
organization_configuration = self.powershell.get_organization_config()
if organization_configuration:
organization_config = Organization(
name=organization_configuration.get("Name", ""),
guid=organization_configuration.get("Guid", ""),
audit_disabled=organization_configuration.get(
"AuditDisabled", False
),
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return organization_config
def _get_mailbox_audit_config(self):
logger.info("Microsoft365 - Getting mailbox audit configuration...")
mailboxes_config = []
try:
mailbox_audit_data = self.powershell.get_mailbox_audit_config()
for mailbox_audit_config in mailbox_audit_data:
mailboxes_config.append(
MailboxAuditConfig(
name=mailbox_audit_config.get("Name", ""),
id=mailbox_audit_config.get("Id", ""),
audit_bypass_enabled=mailbox_audit_config.get(
"AuditBypassEnabled", True
),
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return mailboxes_config
class Organization(BaseModel):
name: str
guid: str
audit_disabled: bool
class MailboxAuditConfig(BaseModel):
name: str
id: str
audit_bypass_enabled: bool

View File

@@ -21,21 +21,22 @@ class purview_audit_log_search_enabled(Check):
"""
findings = []
audit_log_config = purview_client.audit_log_config
report = CheckReportM365(
metadata=self.metadata(),
resource=audit_log_config if audit_log_config else {},
resource_name="Purview Settings",
resource_id="purviewSettings",
)
report.status = "FAIL"
report.status_extended = "Purview audit log search is not enabled."
if audit_log_config:
report = CheckReportM365(
metadata=self.metadata(),
resource=audit_log_config if audit_log_config else {},
resource_name="Purview Settings",
resource_id="purviewSettings",
)
report.status = "FAIL"
report.status_extended = "Purview audit log search is not enabled."
if purview_client.audit_log_config and getattr(
purview_client.audit_log_config, "audit_log_search", False
):
report.status = "PASS"
report.status_extended = "Purview audit log search is enabled."
if purview_client.audit_log_config and getattr(
purview_client.audit_log_config, "audit_log_search", False
):
report.status = "PASS"
report.status_extended = "Purview audit log search is enabled."
findings.append(report)
findings.append(report)
return findings

View File

@@ -17,11 +17,12 @@ class Purview(M365Service):
audit_log_config = None
try:
audit_log_config_response = self.powershell.get_audit_log_config()
audit_log_config = AuditLogConfig(
audit_log_search=audit_log_config_response.get(
"UnifiedAuditLogIngestionEnabled", False
if audit_log_config_response:
audit_log_config = AuditLogConfig(
audit_log_search=audit_log_config_response.get(
"UnifiedAuditLogIngestionEnabled", False
)
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"

View File

@@ -0,0 +1,30 @@
{
"Provider": "m365",
"CheckID": "teams_email_sending_to_channel_disabled",
"CheckTitle": "Ensure users are not be able to email the channel directly.",
"CheckType": [],
"ServiceName": "teams",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "Teams Settings",
"Description": "Ensure users can not send emails to channel email addresses.",
"Risk": "Allowing users to send emails to Teams channel email addresses introduces a security risk, as these addresses are outside the tenants domain and lack proper security controls. This creates a potential attack vector where threat actors could exploit the channel email to deliver malicious content or spam.",
"RelatedUrl": "https://learn.microsoft.com/en-us/powershell/module/teams/get-csteamsclientconfiguration?view=teams-ps",
"Remediation": {
"Code": {
"CLI": "Set-CsTeamsClientConfiguration -Identity Global -AllowEmailIntoChannel $false",
"NativeIaC": "",
"Other": "1. Navigate to Microsoft Teams admin center https://admin.teams.microsoft.com. 2. Click to expand Teams select Teams settings. 3. Under email integration set Users can send emails to a channel email address to Off.",
"Terraform": ""
},
"Recommendation": {
"Text": "Disable the ability for users to send emails to Teams channel email addresses to reduce the risk of external abuse and enhance control over organizational communications.",
"Url": "https://learn.microsoft.com/en-us/powershell/module/teams/get-csteamsclientconfiguration?view=teams-ps"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,42 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.teams.teams_client import teams_client
class teams_email_sending_to_channel_disabled(Check):
"""Check if
Attributes:
metadata: Metadata associated with the check (inherited from Check).
"""
def execute(self) -> List[CheckReportM365]:
"""Execute the check for
This method checks if
Returns:
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
teams_settings = teams_client.teams_settings
if teams_settings:
report = CheckReportM365(
metadata=self.metadata(),
resource=teams_settings if teams_settings else {},
resource_name="Teams Settings",
resource_id="teamsSettings",
)
report.status = "FAIL"
report.status_extended = "Users can send emails to channel email addresses."
if teams_settings and not teams_settings.allow_email_into_channel:
report.status = "PASS"
report.status_extended = (
"Users can not send emails to channel email addresses."
)
findings.append(report)
return findings

View File

@@ -22,46 +22,47 @@ class teams_external_file_sharing_restricted(Check):
List[CheckReportM365]: A list of reports containing the result of the check.
"""
findings = []
cloud_storage_settings = teams_client.teams_settings.cloud_storage_settings
report = CheckReportM365(
metadata=self.metadata(),
resource=cloud_storage_settings if cloud_storage_settings else {},
resource_name="Cloud Storage Settings",
resource_id="cloudStorageSettings",
)
report.status = "FAIL"
report.status_extended = "External file sharing is not restricted to only approved cloud storage services."
if teams_client.teams_settings:
cloud_storage_settings = teams_client.teams_settings.cloud_storage_settings
report = CheckReportM365(
metadata=self.metadata(),
resource=cloud_storage_settings if cloud_storage_settings else {},
resource_name="Cloud Storage Settings",
resource_id="cloudStorageSettings",
)
report.status = "FAIL"
report.status_extended = "External file sharing is not restricted to only approved cloud storage services."
allowed_services = teams_client.audit_config.get(
"allowed_cloud_storage_services", []
)
if cloud_storage_settings:
# Get storage services from CloudStorageSettings class items
storage_services = [
attr
for attr, type in CloudStorageSettings.__annotations__.items()
if type is bool
]
allowed_services = teams_client.audit_config.get(
"allowed_cloud_storage_services", []
)
if cloud_storage_settings:
# Get storage services from CloudStorageSettings class items
storage_services = [
attr
for attr, type in CloudStorageSettings.__annotations__.items()
if type is bool
]
# Check if all services are disabled when no allowed services are specified
# or if all enabled services are in the allowed list
if (
not allowed_services
and all(
not getattr(cloud_storage_settings, service, True)
for service in storage_services
)
) or (
allowed_services
and not any(
getattr(cloud_storage_settings, service, True)
and service not in allowed_services
for service in storage_services
)
):
report.status = "PASS"
report.status_extended = "External file sharing is restricted to only approved cloud storage services."
# Check if all services are disabled when no allowed services are specified
# or if all enabled services are in the allowed list
if (
not allowed_services
and all(
not getattr(cloud_storage_settings, service, True)
for service in storage_services
)
) or (
allowed_services
and not any(
getattr(cloud_storage_settings, service, True)
and service not in allowed_services
for service in storage_services
)
):
report.status = "PASS"
report.status_extended = "External file sharing is restricted to only approved cloud storage services."
findings.append(report)
findings.append(report)
return findings

View File

@@ -14,17 +14,22 @@ class Teams(M365Service):
def _get_teams_client_configuration(self):
logger.info("M365 - Getting Teams settings...")
settings = self.powershell.get_teams_settings()
teams_settings = None
try:
teams_settings = TeamsSettings(
cloud_storage_settings=CloudStorageSettings(
allow_box=settings.get("AllowBox", True),
allow_drop_box=settings.get("AllowDropBox", True),
allow_egnyte=settings.get("AllowEgnyte", True),
allow_google_drive=settings.get("AllowGoogleDrive", True),
allow_share_file=settings.get("AllowShareFile", True),
settings = self.powershell.get_teams_settings()
if settings:
teams_settings = TeamsSettings(
cloud_storage_settings=CloudStorageSettings(
allow_box=settings.get("AllowBox", True),
allow_drop_box=settings.get("AllowDropBox", True),
allow_egnyte=settings.get("AllowEgnyte", True),
allow_google_drive=settings.get("AllowGoogleDrive", True),
allow_share_file=settings.get("AllowShareFile", True),
),
allow_email_into_channel=settings.get(
"AllowEmailIntoChannel", True
),
)
)
except Exception as error:
logger.error(
@@ -43,3 +48,4 @@ class CloudStorageSettings(BaseModel):
class TeamsSettings(BaseModel):
cloud_storage_settings: CloudStorageSettings
allow_email_into_channel: bool = True

View File

@@ -89,6 +89,7 @@ coverage = "7.6.12"
docker = "7.1.0"
flake8 = "7.1.2"
freezegun = "1.5.1"
marshmallow = ">=3.15.0,<4.0.0"
mock = "5.2.0"
moto = {extras = ["all"], version = "5.0.28"}
openapi-schema-validator = "0.6.3"

View File

@@ -297,7 +297,6 @@ config_aws = {
"insecure_key_algorithms": [
"RSA-1024",
"P-192",
"SHA-1",
],
"eks_required_log_types": [
"api",

View File

@@ -317,7 +317,6 @@ aws:
[
"RSA-1024",
"P-192",
"SHA-1",
]
# AWS EKS Configuration

View File

@@ -112,6 +112,53 @@ class TestPowerShellSession:
assert result == expected
session.close()
@patch("subprocess.Popen")
def test_json_parse_output_logging(self, mock_popen):
mock_process = MagicMock()
mock_popen.return_value = mock_process
session = PowerShellSession()
# Test warning for non-JSON output
with patch("prowler.lib.logger.logger.warning") as mock_warning:
result = session.json_parse_output("some text without json")
assert result == {}
mock_warning.assert_called_once_with(
"Could not parse PowerShell output as JSON.\nOriginal output: some text without json"
)
session.close()
@patch("subprocess.Popen")
def test_json_parse_output_with_text_around_json(self, mock_popen):
mock_process = MagicMock()
mock_popen.return_value = mock_process
session = PowerShellSession()
# Test JSON extraction from text with surrounding content
result = session.json_parse_output('some text {"key": "value"} more text')
assert result == {"key": "value"}
result = session.json_parse_output('prefix [{"key": "value"}] suffix')
assert result == [{"key": "value"}]
# Test non-JSON text returns empty dict
result = session.json_parse_output("just some text")
assert result == {}
session.close()
@patch("subprocess.Popen")
def test_json_parse_output_empty(self, mock_popen):
mock_process = MagicMock()
mock_popen.return_value = mock_process
session = PowerShellSession()
# Test empty string
result = session.json_parse_output("")
assert result == {}
session.close()
@patch("subprocess.Popen")
def test_close(self, mock_popen):
mock_process = MagicMock()

View File

@@ -1,5 +1,10 @@
from unittest.mock import MagicMock, patch
import pytest
from prowler.providers.m365.exceptions.exceptions import (
M365UserNotBelongingToTenantError,
)
from prowler.providers.m365.lib.powershell.m365_powershell import M365PowerShell
from prowler.providers.m365.models import M365Credentials
@@ -84,11 +89,12 @@ class Testm365PowerShell:
}
credentials = M365Credentials(
user="test@example.com",
user="test@contoso.onmicrosoft.com",
passwd="test_password",
client_id="test_client_id",
client_secret="test_client_secret",
tenant_id="test_tenant_id",
provider_id="contoso.onmicrosoft.com",
)
session = M365PowerShell(credentials)
@@ -115,7 +121,89 @@ class Testm365PowerShell:
authority="https://login.microsoftonline.com/test_tenant_id",
)
mock_msal_instance.acquire_token_by_username_password.assert_called_once_with(
username="test@example.com",
username="test@contoso.onmicrosoft.com",
password="decrypted_password",
scopes=["https://graph.microsoft.com/.default"],
)
session.close()
@patch("subprocess.Popen")
@patch("msal.ConfidentialClientApplication")
def test_test_credentials_user_not_belonging_to_tenant(self, mock_msal, mock_popen):
mock_process = MagicMock()
mock_popen.return_value = mock_process
mock_msal_instance = MagicMock()
mock_msal.return_value = mock_msal_instance
mock_msal_instance.acquire_token_by_username_password.return_value = {
"access_token": "test_token"
}
credentials = M365Credentials(
user="user@otherdomain.com",
passwd="test_password",
client_id="test_client_id",
client_secret="test_client_secret",
tenant_id="test_tenant_id",
provider_id="contoso.onmicrosoft.com",
)
session = M365PowerShell(credentials)
session.execute = MagicMock()
session.process.stdin.write = MagicMock()
session.read_output = MagicMock(return_value="decrypted_password")
with pytest.raises(M365UserNotBelongingToTenantError) as exception:
session.test_credentials(credentials)
assert exception.type == M365UserNotBelongingToTenantError
assert "The provided M365 User does not belong to the specified tenant." in str(
exception.value
)
mock_msal.assert_called_once_with(
client_id="test_client_id",
client_credential="test_client_secret",
authority="https://login.microsoftonline.com/test_tenant_id",
)
mock_msal_instance.acquire_token_by_username_password.assert_called_once_with(
username="user@otherdomain.com",
password="decrypted_password",
scopes=["https://graph.microsoft.com/.default"],
)
session.close()
@patch("subprocess.Popen")
@patch("msal.ConfidentialClientApplication")
def test_test_credentials_auth_failure(self, mock_msal, mock_popen):
mock_process = MagicMock()
mock_popen.return_value = mock_process
mock_msal_instance = MagicMock()
mock_msal.return_value = mock_msal_instance
mock_msal_instance.acquire_token_by_username_password.return_value = None
credentials = M365Credentials(
user="test@contoso.onmicrosoft.com",
passwd="test_password",
client_id="test_client_id",
client_secret="test_client_secret",
tenant_id="test_tenant_id",
provider_id="contoso.onmicrosoft.com",
)
session = M365PowerShell(credentials)
session.execute = MagicMock()
session.process.stdin.write = MagicMock()
session.read_output = MagicMock(return_value="decrypted_password")
assert session.test_credentials(credentials) is False
mock_msal.assert_called_once_with(
client_id="test_client_id",
client_credential="test_client_secret",
authority="https://login.microsoftonline.com/test_tenant_id",
)
mock_msal_instance.acquire_token_by_username_password.assert_called_once_with(
username="test@contoso.onmicrosoft.com",
password="decrypted_password",
scopes=["https://graph.microsoft.com/.default"],
)

View File

@@ -1,3 +1,4 @@
import os
from unittest.mock import patch
from uuid import uuid4
@@ -18,8 +19,14 @@ from prowler.config.config import (
from prowler.providers.common.models import Connection
from prowler.providers.m365.exceptions.exceptions import (
M365HTTPResponseError,
M365MissingEnvironmentUserCredentialsError,
M365MissingEnvironmentCredentialsError,
M365NoAuthenticationMethodError,
M365NotValidClientIdError,
M365NotValidClientSecretError,
M365NotValidEncryptedPasswordError,
M365NotValidTenantIdError,
M365NotValidUserError,
M365UserNotBelongingToTenantError,
)
from prowler.providers.m365.m365_provider import M365Provider
from prowler.providers.m365.models import (
@@ -333,37 +340,59 @@ class TestM365Provider:
assert test_connection.is_connected
assert test_connection.error is None
def test_test_connection_tenant_id_client_id_client_secret_user_encrypted_password(
def test_test_connection_tenant_id_client_id_client_secret_no_user_encrypted_password(
self,
):
with (
patch(
"prowler.providers.m365.m365_provider.M365Provider.setup_session"
) as mock_setup_session,
patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials,
):
# Mock setup_session to return a mocked session object
mock_session = MagicMock()
mock_setup_session.return_value = mock_session
# Mock ValidateStaticCredentials to avoid real API calls
mock_validate_static_credentials.return_value = None
test_connection = M365Provider.test_connection(
tenant_id=str(uuid4()),
region="M365Global",
raise_on_exception=False,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user="user@user.com",
encrypted_password="AAAA1111",
with patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials:
mock_validate_static_credentials.side_effect = M365NotValidUserError(
file=os.path.basename(__file__),
message="The provided M365 User is not valid.",
)
assert isinstance(test_connection, Connection)
assert test_connection.is_connected
assert test_connection.error is None
with pytest.raises(M365NotValidUserError) as exception:
M365Provider.test_connection(
tenant_id=str(uuid4()),
region="M365Global",
raise_on_exception=True,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user=None,
encrypted_password="test_password",
)
assert exception.type == M365NotValidUserError
assert "The provided M365 User is not valid." in str(exception.value)
def test_test_connection_tenant_id_client_id_client_secret_user_no_encrypted_password(
self,
):
with patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials:
mock_validate_static_credentials.side_effect = (
M365NotValidEncryptedPasswordError(
file=os.path.basename(__file__),
message="The provided M365 Encrypted Password is not valid.",
)
)
with pytest.raises(M365NotValidEncryptedPasswordError) as exception:
M365Provider.test_connection(
tenant_id=str(uuid4()),
region="M365Global",
raise_on_exception=True,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user="test@example.com",
encrypted_password=None,
)
assert exception.type == M365NotValidEncryptedPasswordError
assert "The provided M365 Encrypted Password is not valid." in str(
exception.value
)
def test_test_connection_with_httpresponseerror(self):
with patch(
@@ -440,7 +469,7 @@ class TestM365Provider:
mock_session.test_credentials.return_value = False
mock_powershell.return_value = mock_session
with pytest.raises(M365MissingEnvironmentUserCredentialsError) as exc_info:
with pytest.raises(M365MissingEnvironmentCredentialsError) as exc_info:
M365Provider.setup_powershell(
env_auth=True, m365_credentials=credentials
)
@@ -450,3 +479,106 @@ class TestM365Provider:
in str(exc_info.value)
)
mock_session.test_credentials.assert_not_called()
def test_test_connection_user_not_belonging_to_tenant(
self,
):
with patch(
"prowler.providers.m365.m365_provider.M365Provider.validate_static_credentials"
) as mock_validate_static_credentials:
mock_validate_static_credentials.side_effect = M365UserNotBelongingToTenantError(
file=os.path.basename(__file__),
message="The provided M365 User does not belong to the specified tenant.",
)
with pytest.raises(M365UserNotBelongingToTenantError) as exception:
M365Provider.test_connection(
tenant_id="contoso.onmicrosoft.com",
region="M365Global",
raise_on_exception=True,
client_id=str(uuid4()),
client_secret=str(uuid4()),
user="user@otherdomain.com",
encrypted_password="test_password",
)
assert exception.type == M365UserNotBelongingToTenantError
assert (
"The provided M365 User does not belong to the specified tenant."
in str(exception.value)
)
def test_validate_static_credentials_invalid_tenant_id(self):
with pytest.raises(M365NotValidTenantIdError) as exception:
M365Provider.validate_static_credentials(
tenant_id="invalid-tenant-id",
client_id="12345678-1234-5678-1234-567812345678",
client_secret="test_secret",
user="test@example.com",
encrypted_password="test_password",
)
assert "The provided M365 Tenant ID is not valid." in str(exception.value)
def test_validate_static_credentials_missing_client_id(self):
with pytest.raises(M365NotValidClientIdError) as exception:
M365Provider.validate_static_credentials(
tenant_id="12345678-1234-5678-1234-567812345678",
client_id="",
client_secret="test_secret",
user="test@example.com",
encrypted_password="test_password",
)
assert "The provided M365 Client ID is not valid." in str(exception.value)
def test_validate_static_credentials_missing_client_secret(self):
with pytest.raises(M365NotValidClientSecretError) as exception:
M365Provider.validate_static_credentials(
tenant_id="12345678-1234-5678-1234-567812345678",
client_id="12345678-1234-5678-1234-567812345678",
client_secret="",
user="test@example.com",
encrypted_password="test_password",
)
assert "The provided M365 Client Secret is not valid." in str(exception.value)
def test_validate_static_credentials_missing_user(self):
with pytest.raises(M365NotValidUserError) as exception:
M365Provider.validate_static_credentials(
tenant_id="12345678-1234-5678-1234-567812345678",
client_id="12345678-1234-5678-1234-567812345678",
client_secret="test_secret",
user="",
encrypted_password="test_password",
)
assert "The provided M365 User is not valid." in str(exception.value)
def test_validate_static_credentials_missing_encrypted_password(self):
with pytest.raises(M365NotValidEncryptedPasswordError) as exception:
M365Provider.validate_static_credentials(
tenant_id="12345678-1234-5678-1234-567812345678",
client_id="12345678-1234-5678-1234-567812345678",
client_secret="test_secret",
user="test@example.com",
encrypted_password="",
)
assert "The provided M365 Encrypted Password is not valid." in str(
exception.value
)
def test_validate_arguments_missing_env_credentials(self):
with pytest.raises(M365MissingEnvironmentCredentialsError) as exception:
M365Provider.validate_arguments(
az_cli_auth=False,
sp_env_auth=False,
env_auth=True,
browser_auth=False,
tenant_id=None,
client_id="test_client_id",
client_secret="test_secret",
user=None,
encrypted_password=None,
)
assert (
"M365 provider requires AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, AZURE_TENANT_ID, M365_USER and M365_ENCRYPTED_PASSWORD environment variables to be set when using --env-auth"
in str(exception.value)
)

View File

@@ -0,0 +1,320 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defender_antiphishing_policy_configured:
def test_properly_configured_custom_policy(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured import (
defender_antiphishing_policy_configured,
)
from prowler.providers.m365.services.defender.defender_service import (
AntiphishingPolicy,
AntiphishingRule,
)
defender_client.antiphishing_policies = {
"Policy1": AntiphishingPolicy(
spoof_intelligence=True,
spoof_intelligence_action="Quarantine",
dmarc_reject_action="Quarantine",
dmarc_quarantine_action="Quarantine",
safety_tips=True,
unauthenticated_sender_action=True,
show_tag=True,
honor_dmarc_policy=True,
default=False,
)
}
defender_client.antiphising_rules = {
"Policy1": AntiphishingRule(state="Enabled")
}
check = defender_antiphishing_policy_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Anti-phishing policy Policy1 is properly configured and enabled."
)
assert (
result[0].resource
== defender_client.antiphishing_policies["Policy1"].dict()
)
assert result[0].resource_name == "Defender Anti-Phishing Policy"
assert result[0].resource_id == "Policy1"
assert result[0].location == "global"
def test_not_properly_configured_policy(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured import (
defender_antiphishing_policy_configured,
)
from prowler.providers.m365.services.defender.defender_service import (
AntiphishingPolicy,
AntiphishingRule,
)
defender_client.antiphishing_policies = {
"Policy2": AntiphishingPolicy(
spoof_intelligence=False,
spoof_intelligence_action="None",
dmarc_reject_action="None",
dmarc_quarantine_action="None",
safety_tips=False,
unauthenticated_sender_action=False,
show_tag=False,
honor_dmarc_policy=False,
default=False,
)
}
defender_client.antiphising_rules = {
"Policy2": AntiphishingRule(state="Enabled")
}
check = defender_antiphishing_policy_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Anti-phishing policy Policy2 is not properly configured."
)
assert (
result[0].resource
== defender_client.antiphishing_policies["Policy2"].dict()
)
assert result[0].resource_name == "Defender Anti-Phishing Policy"
assert result[0].resource_id == "Policy2"
assert result[0].location == "global"
def test_properly_configured_default_policy(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured import (
defender_antiphishing_policy_configured,
)
from prowler.providers.m365.services.defender.defender_service import (
AntiphishingPolicy,
)
defender_client.antiphishing_policies = {
"Default": AntiphishingPolicy(
spoof_intelligence=True,
spoof_intelligence_action="Quarantine",
dmarc_reject_action="Quarantine",
dmarc_quarantine_action="Quarantine",
safety_tips=True,
unauthenticated_sender_action=True,
show_tag=True,
honor_dmarc_policy=True,
default=True,
)
}
defender_client.antiphising_rules = {}
check = defender_antiphishing_policy_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Anti-phishing policy Default is properly configured and enabled."
)
assert (
result[0].resource
== defender_client.antiphishing_policies["Default"].dict()
)
assert result[0].resource_name == "Defender Anti-Phishing Policy"
assert result[0].resource_id == "Default"
assert result[0].location == "global"
def test_default_policy_not_properly_configured(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured import (
defender_antiphishing_policy_configured,
)
from prowler.providers.m365.services.defender.defender_service import (
AntiphishingPolicy,
)
defender_client.antiphishing_policies = {
"Default": AntiphishingPolicy(
spoof_intelligence=False,
spoof_intelligence_action="None",
dmarc_reject_action="None",
dmarc_quarantine_action="None",
safety_tips=False,
unauthenticated_sender_action=False,
show_tag=False,
honor_dmarc_policy=False,
default=True,
)
}
defender_client.antiphising_rules = {}
check = defender_antiphishing_policy_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Anti-phishing policy Default is not properly configured."
)
assert (
result[0].resource
== defender_client.antiphishing_policies["Default"].dict()
)
assert result[0].resource_name == "Defender Anti-Phishing Policy"
assert result[0].resource_id == "Default"
assert result[0].location == "global"
def test_no_antiphishing_policies(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured import (
defender_antiphishing_policy_configured,
)
defender_client.antiphishing_policies = {}
defender_client.antiphising_rules = {}
check = defender_antiphishing_policy_configured()
result = check.execute()
assert result == []
def test_custom_policy_without_rule(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_antiphishing_policy_configured.defender_antiphishing_policy_configured import (
defender_antiphishing_policy_configured,
)
from prowler.providers.m365.services.defender.defender_service import (
AntiphishingPolicy,
)
defender_client.antiphishing_policies = {
"PolicyX": AntiphishingPolicy(
spoof_intelligence=True,
spoof_intelligence_action="Quarantine",
dmarc_reject_action="Quarantine",
dmarc_quarantine_action="Quarantine",
safety_tips=True,
unauthenticated_sender_action=True,
show_tag=True,
honor_dmarc_policy=True,
default=False,
)
}
defender_client.antiphising_rules = {}
check = defender_antiphishing_policy_configured()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Anti-phishing policy PolicyX is not properly configured."
)
assert (
result[0].resource
== defender_client.antiphishing_policies["PolicyX"].dict()
)
assert result[0].resource_name == "Defender Anti-Phishing Policy"
assert result[0].resource_id == "PolicyX"
assert result[0].location == "global"

View File

@@ -0,0 +1,138 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defender_malware_policy_common_attachments_filter_enabled:
def test_enable_file_filter_disabled(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_malware_policy_common_attachments_filter_enabled.defender_malware_policy_common_attachments_filter_enabled.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_malware_policy_common_attachments_filter_enabled.defender_malware_policy_common_attachments_filter_enabled import (
defender_malware_policy_common_attachments_filter_enabled,
)
from prowler.providers.m365.services.defender.defender_service import (
DefenderMalwarePolicy,
)
defender_client.malware_policies = [
DefenderMalwarePolicy(
enable_file_filter=False,
identity="Policy1",
enable_internal_sender_admin_notifications=False,
internal_sender_admin_address="",
),
]
check = defender_malware_policy_common_attachments_filter_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Common Attachment Types Filter is not enabled in anti-malware policy Policy1."
)
assert result[0].resource == defender_client.malware_policies[0].dict()
assert result[0].resource_name == "Defender Malware Policy"
assert result[0].resource_id == "defenderMalwarePolicy"
assert result[0].location == "global"
def test_enable_file_filter_enabled(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_malware_policy_common_attachments_filter_enabled.defender_malware_policy_common_attachments_filter_enabled.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_malware_policy_common_attachments_filter_enabled.defender_malware_policy_common_attachments_filter_enabled import (
defender_malware_policy_common_attachments_filter_enabled,
)
from prowler.providers.m365.services.defender.defender_service import (
DefenderMalwarePolicy,
)
defender_client.malware_policies = [
DefenderMalwarePolicy(
enable_file_filter=True,
identity="Policy1",
enable_internal_sender_admin_notifications=False,
internal_sender_admin_address="",
),
]
check = defender_malware_policy_common_attachments_filter_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Common Attachment Types Filter is enabled in anti-malware policy Policy1."
)
assert result[0].resource == defender_client.malware_policies[0].dict()
assert result[0].resource_name == "Defender Malware Policy"
assert result[0].resource_id == "defenderMalwarePolicy"
assert result[0].location == "global"
def test_no_policy(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_malware_policy_common_attachments_filter_enabled.defender_malware_policy_common_attachments_filter_enabled.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_malware_policy_common_attachments_filter_enabled.defender_malware_policy_common_attachments_filter_enabled import (
defender_malware_policy_common_attachments_filter_enabled,
)
defender_client.malware_policies = []
check = defender_malware_policy_common_attachments_filter_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Common Attachment Types Filter is not enabled."
)
assert result[0].resource == {}
assert result[0].resource_name == "Defender Malware Policy"
assert result[0].resource_id == "defenderMalwarePolicy"
assert result[0].location == "global"

View File

@@ -0,0 +1,155 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_defender_malware_policy_notifications_internal_users_malware_enabled:
def test_notifications_disabled(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_malware_policy_notifications_internal_users_malware_enabled.defender_malware_policy_notifications_internal_users_malware_enabled.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_malware_policy_notifications_internal_users_malware_enabled.defender_malware_policy_notifications_internal_users_malware_enabled import (
defender_malware_policy_notifications_internal_users_malware_enabled,
)
from prowler.providers.m365.services.defender.defender_service import (
DefenderMalwarePolicy,
)
defender_client.malware_policies = [
DefenderMalwarePolicy(
enable_file_filter=True,
identity="Default",
enable_internal_sender_admin_notifications=False,
internal_sender_admin_address="",
)
]
check = (
defender_malware_policy_notifications_internal_users_malware_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Notifications for internal users sending malware are not enabled."
)
assert result[0].resource == defender_client.malware_policies[0].dict()
assert result[0].resource_name == "Defender Malware Policy"
assert result[0].resource_id == "defenderMalwarePolicy"
assert result[0].location == "global"
def test_notifications_enabled_without_email(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_malware_policy_notifications_internal_users_malware_enabled.defender_malware_policy_notifications_internal_users_malware_enabled.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_malware_policy_notifications_internal_users_malware_enabled.defender_malware_policy_notifications_internal_users_malware_enabled import (
defender_malware_policy_notifications_internal_users_malware_enabled,
)
from prowler.providers.m365.services.defender.defender_service import (
DefenderMalwarePolicy,
)
defender_client.malware_policies = [
DefenderMalwarePolicy(
enable_file_filter=True,
identity="Default",
enable_internal_sender_admin_notifications=True,
internal_sender_admin_address="",
)
]
check = (
defender_malware_policy_notifications_internal_users_malware_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Notifications for internal users sending malware are enabled, but no email addresses are configured."
)
assert result[0].resource == defender_client.malware_policies[0].dict()
assert result[0].resource_name == "Defender Malware Policy"
assert result[0].resource_id == "defenderMalwarePolicy"
assert result[0].location == "global"
def test_notifications_enabled_with_email(self):
defender_client = mock.MagicMock()
defender_client.audited_tenant = "audited_tenant"
defender_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.defender.defender_malware_policy_notifications_internal_users_malware_enabled.defender_malware_policy_notifications_internal_users_malware_enabled.defender_client",
new=defender_client,
),
):
from prowler.providers.m365.services.defender.defender_malware_policy_notifications_internal_users_malware_enabled.defender_malware_policy_notifications_internal_users_malware_enabled import (
defender_malware_policy_notifications_internal_users_malware_enabled,
)
from prowler.providers.m365.services.defender.defender_service import (
DefenderMalwarePolicy,
)
defender_client.malware_policies = [
DefenderMalwarePolicy(
enable_file_filter=True,
identity="Default",
enable_internal_sender_admin_notifications=True,
internal_sender_admin_address="security@example.com",
)
]
check = (
defender_malware_policy_notifications_internal_users_malware_enabled()
)
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Notifications for internal users sending malware are enabled."
)
assert result[0].resource == defender_client.malware_policies[0].dict()
assert result[0].resource_name == "Defender Malware Policy"
assert result[0].resource_id == "defenderMalwarePolicy"
assert result[0].location == "global"

View File

@@ -0,0 +1,179 @@
from unittest import mock
from unittest.mock import patch
from prowler.providers.m365.models import M365IdentityInfo
from prowler.providers.m365.services.defender.defender_service import (
AntiphishingPolicy,
AntiphishingRule,
Defender,
DefenderMalwarePolicy,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
def mock_defender_get_malware_filter_policy(_):
return [
DefenderMalwarePolicy(
enable_file_filter=False,
identity="Policy1",
enable_internal_sender_admin_notifications=False,
internal_sender_admin_address="",
),
DefenderMalwarePolicy(
enable_file_filter=True,
identity="Policy2",
enable_internal_sender_admin_notifications=True,
internal_sender_admin_address="security@example.com",
),
]
def mock_defender_get_antiphising_policy(_):
return {
"Policy1": AntiphishingPolicy(
spoof_intelligence=True,
spoof_intelligence_action="Quarantine",
dmarc_reject_action="Reject",
dmarc_quarantine_action="Quarantine",
safety_tips=True,
unauthenticated_sender_action=True,
show_tag=True,
honor_dmarc_policy=True,
default=False,
),
"Policy2": AntiphishingPolicy(
spoof_intelligence=False,
spoof_intelligence_action="None",
dmarc_reject_action="None",
dmarc_quarantine_action="None",
safety_tips=False,
unauthenticated_sender_action=False,
show_tag=False,
honor_dmarc_policy=False,
default=True,
),
}
def mock_defender_get_antiphising_rules(_):
return {
"Policy1": AntiphishingRule(
state="Enabled",
),
"Policy2": AntiphishingRule(
state="Disabled",
),
}
class Test_Defender_Service:
def test_get_client(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
assert defender_client.client.__class__.__name__ == "GraphServiceClient"
assert defender_client.powershell.__class__.__name__ == "M365PowerShell"
defender_client.powershell.close()
@patch(
"prowler.providers.m365.services.defender.defender_service.Defender._get_malware_filter_policy",
new=mock_defender_get_malware_filter_policy,
)
def test__get_malware_filter_policy(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
malware_policies = defender_client.malware_policies
assert (
malware_policies[0].enable_internal_sender_admin_notifications is False
)
assert malware_policies[0].internal_sender_admin_address == ""
assert malware_policies[1].enable_file_filter is True
assert malware_policies[1].identity == "Policy2"
assert (
malware_policies[1].enable_internal_sender_admin_notifications is True
)
assert (
malware_policies[1].internal_sender_admin_address
== "security@example.com"
)
defender_client.powershell.close()
@patch(
"prowler.providers.m365.services.defender.defender_service.Defender._get_antiphising_policy",
new=mock_defender_get_antiphising_policy,
)
def test_get_antiphising_policy(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
antiphishing_policies = defender_client.antiphishing_policies
assert antiphishing_policies["Policy1"].spoof_intelligence is True
assert (
antiphishing_policies["Policy1"].spoof_intelligence_action
== "Quarantine"
)
assert antiphishing_policies["Policy1"].dmarc_reject_action == "Reject"
assert (
antiphishing_policies["Policy1"].dmarc_quarantine_action == "Quarantine"
)
assert antiphishing_policies["Policy1"].safety_tips is True
assert (
antiphishing_policies["Policy1"].unauthenticated_sender_action is True
)
assert antiphishing_policies["Policy1"].show_tag is True
assert antiphishing_policies["Policy1"].honor_dmarc_policy is True
assert antiphishing_policies["Policy1"].default is False
assert antiphishing_policies["Policy2"].spoof_intelligence is False
assert antiphishing_policies["Policy2"].spoof_intelligence_action == "None"
assert antiphishing_policies["Policy2"].dmarc_reject_action == "None"
assert antiphishing_policies["Policy2"].dmarc_quarantine_action == "None"
assert antiphishing_policies["Policy2"].safety_tips is False
assert (
antiphishing_policies["Policy2"].unauthenticated_sender_action is False
)
assert antiphishing_policies["Policy2"].show_tag is False
assert antiphishing_policies["Policy2"].honor_dmarc_policy is False
assert antiphishing_policies["Policy2"].default is True
defender_client.powershell.close()
@patch(
"prowler.providers.m365.services.defender.defender_service.Defender._get_antiphising_rules",
new=mock_defender_get_antiphising_rules,
)
def test_get_antiphising_rules(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
defender_client = Defender(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
antiphishing_rules = defender_client.antiphising_rules
assert antiphishing_rules["Policy1"].state == "Enabled"
assert antiphishing_rules["Policy2"].state == "Disabled"
defender_client.powershell.close()

View File

@@ -0,0 +1,175 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_exchange_mailbox_audit_bypass_disabled:
def test_no_mailboxes(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
exchange_client.mailboxes_config = []
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled import (
exchange_mailbox_audit_bypass_disabled,
)
check = exchange_mailbox_audit_bypass_disabled()
result = check.execute()
assert len(result) == 0
def test_audit_bypass_disabled_and_enabled(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled import (
exchange_mailbox_audit_bypass_disabled,
)
from prowler.providers.m365.services.exchange.exchange_service import (
MailboxAuditConfig,
)
exchange_client.mailboxes_config = [
MailboxAuditConfig(name="test", id="test", audit_bypass_enabled=True),
MailboxAuditConfig(
name="test2", id="test2", audit_bypass_enabled=False
),
]
check = exchange_mailbox_audit_bypass_disabled()
result = check.execute()
assert len(result) == 2
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Exchange mailbox auditing is bypassed and not enabled for mailbox: test."
)
assert result[0].resource == exchange_client.mailboxes_config[0].dict()
assert result[0].resource_name == "test"
assert result[0].resource_id == "test"
assert result[0].location == "global"
assert result[1].status == "PASS"
assert (
result[1].status_extended
== "Exchange mailbox auditing is enabled for mailbox: test2."
)
assert result[1].resource == exchange_client.mailboxes_config[1].dict()
assert result[1].resource_name == "test2"
assert result[1].resource_id == "test2"
assert result[1].location == "global"
def test_audit_bypass_enabled(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled import (
exchange_mailbox_audit_bypass_disabled,
)
from prowler.providers.m365.services.exchange.exchange_service import (
MailboxAuditConfig,
)
exchange_client.mailboxes_config = [
MailboxAuditConfig(name="test", id="test", audit_bypass_enabled=True),
]
check = exchange_mailbox_audit_bypass_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Exchange mailbox auditing is bypassed and not enabled for mailbox: test."
)
assert result[0].resource == exchange_client.mailboxes_config[0].dict()
assert result[0].resource_name == "test"
assert result[0].resource_id == "test"
assert result[0].location == "global"
def test_audit_bypass_disabled(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_mailbox_audit_bypass_disabled.exchange_mailbox_audit_bypass_disabled import (
exchange_mailbox_audit_bypass_disabled,
)
from prowler.providers.m365.services.exchange.exchange_service import (
MailboxAuditConfig,
)
exchange_client.mailboxes_config = [
MailboxAuditConfig(name="test", id="test", audit_bypass_enabled=False),
]
check = exchange_mailbox_audit_bypass_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Exchange mailbox auditing is enabled for mailbox: test."
)
assert result[0].resource == exchange_client.mailboxes_config[0].dict()
assert result[0].resource_name == "test"
assert result[0].resource_id == "test"
assert result[0].location == "global"

View File

@@ -0,0 +1,116 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_exchange_organization_mailbox_auditing_enabled:
def test_no_organization(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
exchange_client.organization_config = None
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_organization_mailbox_auditing_enabled.exchange_organization_mailbox_auditing_enabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_organization_mailbox_auditing_enabled.exchange_organization_mailbox_auditing_enabled import (
exchange_organization_mailbox_auditing_enabled,
)
check = exchange_organization_mailbox_auditing_enabled()
result = check.execute()
assert len(result) == 0
def test_audit_log_search_disabled(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_organization_mailbox_auditing_enabled.exchange_organization_mailbox_auditing_enabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_organization_mailbox_auditing_enabled.exchange_organization_mailbox_auditing_enabled import (
exchange_organization_mailbox_auditing_enabled,
)
from prowler.providers.m365.services.exchange.exchange_service import (
Organization,
)
exchange_client.organization_config = Organization(
audit_disabled=True, name="test", guid="test"
)
check = exchange_organization_mailbox_auditing_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Exchange mailbox auditing is not enabled on your organization."
)
assert result[0].resource == exchange_client.organization_config.dict()
assert result[0].resource_name == "test"
assert result[0].resource_id == "test"
assert result[0].location == "global"
def test_audit_log_search_enabled(self):
exchange_client = mock.MagicMock()
exchange_client.audited_tenant = "audited_tenant"
exchange_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
mock.patch(
"prowler.providers.m365.services.exchange.exchange_organization_mailbox_auditing_enabled.exchange_organization_mailbox_auditing_enabled.exchange_client",
new=exchange_client,
),
):
from prowler.providers.m365.services.exchange.exchange_organization_mailbox_auditing_enabled.exchange_organization_mailbox_auditing_enabled import (
exchange_organization_mailbox_auditing_enabled,
)
from prowler.providers.m365.services.exchange.exchange_service import (
Organization,
)
exchange_client.organization_config = Organization(
audit_disabled=False, name="test", guid="test"
)
check = exchange_organization_mailbox_auditing_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Exchange mailbox auditing is enabled on your organization."
)
assert result[0].resource == exchange_client.organization_config.dict()
assert result[0].resource_name == "test"
assert result[0].resource_id == "test"
assert result[0].location == "global"

View File

@@ -0,0 +1,86 @@
from unittest import mock
from unittest.mock import patch
from prowler.providers.m365.models import M365IdentityInfo
from prowler.providers.m365.services.exchange.exchange_service import (
Exchange,
MailboxAuditConfig,
Organization,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
def mock_exchange_get_organization_config(_):
return Organization(audit_disabled=True, name="test", guid="test")
def mock_exchange_get_mailbox_audit_config(_):
return [
MailboxAuditConfig(name="test", id="test", audit_bypass_enabled=False),
MailboxAuditConfig(name="test2", id="test2", audit_bypass_enabled=True),
]
class Test_Exchange_Service:
def test_get_client(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
assert exchange_client.client.__class__.__name__ == "GraphServiceClient"
assert exchange_client.powershell.__class__.__name__ == "M365PowerShell"
exchange_client.powershell.close()
@patch(
"prowler.providers.m365.services.exchange.exchange_service.Exchange._get_organization_config",
new=mock_exchange_get_organization_config,
)
def test_get_organization_config(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
organization_config = exchange_client.organization_config
assert organization_config.name == "test"
assert organization_config.guid == "test"
assert organization_config.audit_disabled is True
exchange_client.powershell.close()
@patch(
"prowler.providers.m365.services.exchange.exchange_service.Exchange._get_mailbox_audit_config",
new=mock_exchange_get_mailbox_audit_config,
)
def test_get_mailbox_audit_config(self):
with (
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_exchange_online"
),
):
exchange_client = Exchange(
set_mocked_m365_provider(
identity=M365IdentityInfo(tenant_domain=DOMAIN)
)
)
mailbox_audit_config = exchange_client.mailboxes_config
assert len(mailbox_audit_config) == 2
assert mailbox_audit_config[0].name == "test"
assert mailbox_audit_config[0].id == "test"
assert mailbox_audit_config[0].audit_bypass_enabled is False
assert mailbox_audit_config[1].name == "test2"
assert mailbox_audit_config[1].id == "test2"
assert mailbox_audit_config[1].audit_bypass_enabled is True
exchange_client.powershell.close()

View File

@@ -0,0 +1,105 @@
from unittest import mock
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
class Test_teams_email_sending_to_channel_disabled:
def test_email_sending_to_channel_no_restricted(self):
teams_client = mock.MagicMock()
teams_client.audited_tenant = "audited_tenant"
teams_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_microsoft_teams"
),
mock.patch(
"prowler.providers.m365.services.teams.teams_email_sending_to_channel_disabled.teams_email_sending_to_channel_disabled.teams_client",
new=teams_client,
),
):
from prowler.providers.m365.services.teams.teams_email_sending_to_channel_disabled.teams_email_sending_to_channel_disabled import (
teams_email_sending_to_channel_disabled,
)
from prowler.providers.m365.services.teams.teams_service import (
CloudStorageSettings,
TeamsSettings,
)
teams_client.teams_settings = TeamsSettings(
cloud_storage_settings=CloudStorageSettings(
allow_box=True,
allow_drop_box=True,
allow_egnyte=True,
allow_google_drive=True,
allow_share_file=True,
),
allow_email_into_channel=True,
)
check = teams_email_sending_to_channel_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Users can send emails to channel email addresses."
)
assert result[0].resource == teams_client.teams_settings.dict()
assert result[0].resource_name == "Teams Settings"
assert result[0].resource_id == "teamsSettings"
assert result[0].location == "global"
def test_email_sending_to_channel_restricted(self):
teams_client = mock.MagicMock()
teams_client.audited_tenant = "audited_tenant"
teams_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
"prowler.providers.m365.lib.powershell.m365_powershell.M365PowerShell.connect_microsoft_teams"
),
mock.patch(
"prowler.providers.m365.services.teams.teams_email_sending_to_channel_disabled.teams_email_sending_to_channel_disabled.teams_client",
new=teams_client,
),
):
from prowler.providers.m365.services.teams.teams_email_sending_to_channel_disabled.teams_email_sending_to_channel_disabled import (
teams_email_sending_to_channel_disabled,
)
from prowler.providers.m365.services.teams.teams_service import (
CloudStorageSettings,
TeamsSettings,
)
teams_client.teams_settings = TeamsSettings(
cloud_storage_settings=CloudStorageSettings(
allow_box=True,
allow_drop_box=True,
allow_egnyte=True,
allow_google_drive=True,
allow_share_file=True,
),
allow_email_into_channel=False,
)
check = teams_email_sending_to_channel_disabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "Users can not send emails to channel email addresses."
)
assert result[0].resource == teams_client.teams_settings.dict()
assert result[0].resource_name == "Teams Settings"
assert result[0].resource_id == "teamsSettings"
assert result[0].location == "global"