Compare commits

...

8 Commits

Author SHA1 Message Date
Hugo P.Brito
e0f109aacb feat(entra): add MT.1020 directory sync exclusion check 2026-04-08 16:49:46 +01:00
Pablo Fernandez Guerra (PFE)
406eedd68a chore(ui): unset GIT_WORK_TREE in pre-commit hook (#10574)
Co-authored-by: Pablo F.G <pablo.fernandez@prowler.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-08 14:27:12 +02:00
lydiavilchez
bc38104903 feat(googleworkspace): add calendar service checks using Cloud Identity Policy API (#10597) 2026-04-08 13:26:56 +02:00
Andoni Alonso
9290d7e105 feat(sdk): warn when sensitive CLI flags receive explicit values (#10532) 2026-04-08 13:15:05 +02:00
lydiavilchez
72e8f09c07 feat(googleworkspace): add directory check for CIS 1.1.3 - super admin only admin roles (#10488)
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-04-08 12:05:15 +02:00
Pepe Fagoaga
1d43885230 docs: update architecture diagram (#10604) 2026-04-08 11:05:28 +02:00
Adrián Peña
e6aedcb207 feat(api): support sort by delta on finding-groups endpoints (#10606) 2026-04-08 11:04:57 +02:00
Kay Agahd
89fe867944 fix(aws): recognize service-specific condition keys as restrictive in is_policy_public (#10600)
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-08 10:55:55 +02:00
55 changed files with 3422 additions and 60 deletions

View File

@@ -317,7 +317,10 @@ python prowler-cli.py -v
- **Prowler SDK**: A Python SDK designed to extend the functionality of the Prowler CLI for advanced capabilities.
- **Prowler MCP Server**: A Model Context Protocol server that provides AI tools for Lighthouse, the AI-powered security assistant. This is a critical dependency for Lighthouse functionality.
![Prowler App Architecture](docs/products/img/prowler-app-architecture.png)
![Prowler App Architecture](docs/images/products/prowler-app-architecture.png)
<!-- Diagram source: docs/images/products/prowler-app-architecture.mmd — edit there, re-render at https://mermaid.live, and replace the PNG. -->
## Prowler CLI

View File

@@ -10,6 +10,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Filter RBAC role lookup by `tenant_id` to prevent cross-tenant privilege leak [(#10491)](https://github.com/prowler-cloud/prowler/pull/10491)
- `VALKEY_SCHEME`, `VALKEY_USERNAME`, and `VALKEY_PASSWORD` environment variables to configure Celery broker TLS/auth connection details for Valkey/ElastiCache [(#10420)](https://github.com/prowler-cloud/prowler/pull/10420)
- `Vercel` provider support [(#10190)](https://github.com/prowler-cloud/prowler/pull/10190)
- Finding groups list and latest endpoints support `sort=delta`, ordering by `new_count` then `changed_count` so groups with the most new findings rank highest [(#10606)](https://github.com/prowler-cloud/prowler/pull/10606)
### 🔄 Changed

View File

@@ -16839,6 +16839,39 @@ class TestFindingGroupViewSet:
data = response.json()["data"]
assert len(data) > 0
@pytest.mark.parametrize(
"endpoint_name", ["finding-group-list", "finding-group-latest"]
)
def test_finding_groups_sort_by_delta(
self,
authenticated_client,
finding_groups_fixture,
endpoint_name,
):
"""Sort by delta orders by new_count then changed_count (lexicographic)."""
params = {"sort": "-delta"}
if endpoint_name == "finding-group-list":
params["filter[inserted_at]"] = TODAY
response = authenticated_client.get(reverse(endpoint_name), params)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) > 0
def delta_key(item):
attrs = item["attributes"]
return (attrs.get("new_count", 0), attrs.get("changed_count", 0))
desc_keys = [delta_key(item) for item in data]
assert desc_keys == sorted(desc_keys, reverse=True)
# Ascending order produces the inverse arrangement
params["sort"] = "delta"
response = authenticated_client.get(reverse(endpoint_name), params)
assert response.status_code == status.HTTP_200_OK
asc_keys = [delta_key(item) for item in response.json()["data"]]
assert asc_keys == sorted(asc_keys)
def test_finding_groups_latest_ignores_date_filters(
self, authenticated_client, finding_groups_fixture
):

View File

@@ -7219,6 +7219,7 @@ class FindingGroupViewSet(BaseRLSViewSet):
"check_id": "check_id",
"check_title": "check_title",
"severity": "severity_order",
"delta": "delta_order",
"fail_count": "fail_count",
"pass_count": "pass_count",
"muted_count": "muted_count",
@@ -7569,7 +7570,20 @@ class FindingGroupViewSet(BaseRLSViewSet):
sort_param, self._FINDING_GROUP_SORT_MAP
)
if ordering:
aggregated_queryset = aggregated_queryset.order_by(*ordering)
# delta_order is a virtual sort field: expand it to a
# lexicographic ordering by (new_count, changed_count) so groups
# with more new findings rank higher, with changed_count as the
# tie-breaker (preserves the "new > changed" priority used by
# the resources endpoint, but driven by the actual counters).
expanded_ordering = []
for field in ordering:
if field.lstrip("-") == "delta_order":
sign = "-" if field.startswith("-") else ""
expanded_ordering.append(f"{sign}new_count")
expanded_ordering.append(f"{sign}changed_count")
else:
expanded_ordering.append(field)
aggregated_queryset = aggregated_queryset.order_by(*expanded_ordering)
else:
aggregated_queryset = aggregated_queryset.order_by(
"-fail_count", "-severity_order", "check_id"

View File

@@ -750,6 +750,35 @@ def init_parser(self):
# More arguments for the provider.
```
##### Sensitive CLI Arguments
CLI flags that accept secrets (tokens, passwords, API keys) require special handling to protect credentials from leaking in HTML output and process listings:
1. **Use `nargs="?"` with `default=None`** so the flag works both with and without an inline value. This allows the provider to fall back to an environment variable when no value is passed.
2. **Add a `SENSITIVE_ARGUMENTS` frozenset** at the top of the `arguments.py` file listing every flag that accepts secret values:
```python
SENSITIVE_ARGUMENTS = frozenset({"--your-provider-password", "--your-provider-token"})
```
Prowler automatically discovers these frozensets and uses them to redact values in HTML output and warn users who pass secrets directly on the command line.
3. **Document the environment variable** in the `help` text so users know the recommended alternative:
```python
<provider_name>_parser.add_argument(
"--your-provider-password",
nargs="?",
default=None,
metavar="PASSWORD",
help="Password for authentication. We recommend using the YOUR_PROVIDER_PASSWORD environment variable instead.",
)
```
<Warning>
Do not add new arguments that require passing secrets as CLI values without an environment variable fallback. Prowler CLI warns users when sensitive flags receive explicit values on the command line.
</Warning>
#### Step 5: Implement Mutelist
**Explanation:**

Binary file not shown.

Before

Width:  |  Height:  |  Size: 192 KiB

View File

@@ -11,16 +11,19 @@ Prowler App is a web application that simplifies running Prowler. It provides:
## Components
Prowler App consists of three main components:
Prowler App consists of four main components:
- **Prowler UI**: User-friendly web interface for running Prowler and viewing results, powered by Next.js
- **Prowler API**: Backend API that executes Prowler scans and stores results, built with Django REST Framework
- **Prowler SDK**: Python SDK that integrates with Prowler CLI for advanced functionality
- **Prowler MCP Server**: Model Context Protocol server that exposes AI tools for Lighthouse, the AI-powered security assistant. Required dependency for Lighthouse.
Supporting infrastructure includes:
- **PostgreSQL**: Persistent storage of scan results
- **Celery Workers**: Asynchronous execution of Prowler scans
- **Celery Beat (API Scheduler)**: Schedules recurring scans and enqueues jobs on the broker
- **Valkey**: In-memory database serving as message broker for Celery workers
- **Neo4j**: Graph database used by the Attack Paths feature to combine cloud inventory with Prowler findings (currently populated by AWS scans)
![Prowler App Architecture](/images/products/prowler-app-architecture.png)

View File

@@ -0,0 +1,37 @@
flowchart TB
user([User / Security Team])
cli([Prowler CLI])
subgraph APP["Prowler App"]
ui["Prowler UI<br/>(Next.js)"]
api["Prowler API<br/>(Django REST Framework)"]
worker["API Worker<br/>(Celery)"]
beat["API Scheduler<br/>(Celery Beat)"]
mcp["Prowler MCP Server<br/>(Lighthouse AI tools)"]
end
sdk["Prowler SDK<br/>(Python)"]
subgraph DATA["Data Layer"]
pg[("PostgreSQL")]
valkey[("Valkey / Redis")]
neo4j[("Neo4j")]
end
providers["Providers"]
user --> ui
user --> cli
ui -->|REST| api
api --> pg
api --> valkey
beat -->|enqueue jobs| valkey
valkey -->|dispatch| worker
worker --> pg
worker -->|Attack Paths| neo4j
worker -->|invokes| sdk
cli --> sdk
api -. AI tools .-> mcp
mcp -. context .-> api
sdk --> providers

Binary file not shown.

Before

Width:  |  Height:  |  Size: 192 KiB

After

Width:  |  Height:  |  Size: 268 KiB

View File

@@ -66,22 +66,38 @@ prowler <provider> --categories internet-exposed
### Shodan
Prowler allows you check if any public IPs in your Cloud environments are exposed in Shodan with the `-N`/`--shodan <shodan_api_key>` option:
Prowler can check whether any public IPs in cloud environments are exposed in Shodan using the `-N`/`--shodan` option.
For example, you can check if any of your AWS Elastic Compute Cloud (EC2) instances has an elastic IP exposed in Shodan:
#### Using the Environment Variable (Recommended)
Set the `SHODAN_API_KEY` environment variable to avoid exposing the API key in process listings and shell history:
```console
prowler aws -N/--shodan <shodan_api_key> -c ec2_elastic_ip_shodan
export SHODAN_API_KEY=<shodan_api_key>
```
Also, you can check if any of your Azure Subscription has an public IP exposed in Shodan:
Then run Prowler with the `--shodan` flag (no value needed):
```console
prowler azure -N/--shodan <shodan_api_key> -c network_public_ip_shodan
prowler aws --shodan -c ec2_elastic_ip_shodan
```
And finally, you can check if any of your GCP projects has an public IP address exposed in Shodan:
```console
prowler gcp -N/--shodan <shodan_api_key> -c compute_public_address_shodan
prowler azure --shodan -c network_public_ip_shodan
```
```console
prowler gcp --shodan -c compute_public_address_shodan
```
#### Using the CLI Flag
Alternatively, pass the API key directly on the command line:
```console
prowler aws --shodan <shodan_api_key> -c ec2_elastic_ip_shodan
```
<Warning>
Passing secret values directly on the command line exposes them in process listings and shell history. Prowler CLI displays a warning when this pattern is detected. Use the `SHODAN_API_KEY` environment variable instead.
</Warning>

View File

@@ -6,17 +6,19 @@ import { VersionBadge } from "/snippets/version-badge.mdx"
<VersionBadge version="5.19.0" />
Prowler for Google Workspace uses a **Service Account with Domain-Wide Delegation** to authenticate to the Google Workspace Admin SDK. This allows Prowler to read directory data on behalf of a super administrator without requiring an interactive login.
Prowler for Google Workspace uses a **Service Account with Domain-Wide Delegation** to authenticate to the Google Workspace Admin SDK and the Cloud Identity Policy API. This allows Prowler to read directory data and domain-level application policies on behalf of a super administrator without requiring an interactive login.
## Required Open Authorization (OAuth) Scopes
Prowler requests the following read-only OAuth 2.0 scopes from the Google Workspace Admin SDK:
Prowler requests the following read-only OAuth 2.0 scopes:
| Scope | Description |
|-------|-------------|
| `https://www.googleapis.com/auth/admin.directory.user.readonly` | Read access to user accounts and their admin status |
| `https://www.googleapis.com/auth/admin.directory.domain.readonly` | Read access to domain information |
| `https://www.googleapis.com/auth/admin.directory.customer.readonly` | Read access to customer information (Customer ID) |
| `https://www.googleapis.com/auth/cloud-identity.policies.readonly` | Read access to domain-level application policies (required for Calendar service checks) |
| `https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly` | Read access to admin roles and role assignments |
<Warning>
The delegated user must be a **super administrator** in your Google Workspace organization. Using a non-admin account will result in permission errors when accessing the Admin SDK.
@@ -30,13 +32,24 @@ If no GCP project exists, create one at [https://console.cloud.google.com](https
The project is only used to host the Service Account — it does not need to have any Google Workspace data in it.
### Step 2: Enable the Admin SDK API
### Step 2: Enable Required APIs
1. Navigate to the [Google Cloud Console](https://console.cloud.google.com)
2. Select the target project
3. Navigate to **APIs & Services → Library**
4. Search for **Admin SDK API**
5. Click **Enable**
In the [Google Cloud Console](https://console.cloud.google.com), select the target project and navigate to **APIs & Services → Library**. Search for and enable each of the following APIs:
| API | Required For |
|-----|--------------|
| **Admin SDK API** | Directory service checks (users, roles, domains) |
| **Cloud Identity API** | Calendar service checks (domain-level sharing and invitation policies) |
For each API:
1. Search for the API name in the library
2. Click the API result
3. Click **Enable**
<Note>
Both APIs must be enabled in the same GCP project that hosts the Service Account. Calendar checks will return no findings if the Cloud Identity API is not enabled.
</Note>
### Step 3: Create a Service Account
@@ -73,7 +86,7 @@ This JSON key grants access to your Google Workspace organization. Never commit
6. In the **OAuth scopes** field, enter the following scopes as a comma-separated list:
```
https://www.googleapis.com/auth/admin.directory.user.readonly,https://www.googleapis.com/auth/admin.directory.domain.readonly,https://www.googleapis.com/auth/admin.directory.customer.readonly
https://www.googleapis.com/auth/admin.directory.user.readonly,https://www.googleapis.com/auth/admin.directory.domain.readonly,https://www.googleapis.com/auth/admin.directory.customer.readonly,https://www.googleapis.com/auth/cloud-identity.policies.readonly,https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly
```
7. Click **Authorize**
@@ -114,7 +127,7 @@ The delegated user must be provided via the `GOOGLEWORKSPACE_DELEGATED_USER` env
- **Use environment variables** — Never hardcode credentials in scripts or commands
- **Use a dedicated Service Account** — Create one specifically for Prowler, separate from other integrations
- **Use read-only scopes** — Prowler only requires the three read-only scopes listed above
- **Use read-only scopes** — Prowler only requires the read-only scopes listed above
- **Restrict key access** — Set file permissions to `600` on the JSON key file
- **Rotate keys regularly** — Delete and regenerate the JSON key periodically
- **Use a least-privilege super admin** — Consider using a dedicated super admin account for Prowler's delegated user rather than a personal admin account
@@ -151,7 +164,7 @@ python3 -c "import json; json.load(open('/path/to/key.json'))" && echo "Valid JS
The Service Account cannot impersonate the delegated user. This usually means Domain-Wide Delegation has not been configured, or the OAuth scopes are incorrect. Verify:
- The Service Account Client ID is correctly entered in the Admin Console
- All three required OAuth scopes are included
- All required OAuth scopes are included
- The delegated user is a super administrator
### Permission Denied on Admin SDK Calls
@@ -159,5 +172,14 @@ The Service Account cannot impersonate the delegated user. This usually means Do
If Prowler connects but returns empty results or permission errors for specific API calls:
- Confirm Domain-Wide Delegation is fully propagated (wait a few minutes after setup)
- Verify all three scopes are authorized in the Admin Console
- Verify all scopes are authorized in the Admin Console
- Ensure the delegated user is an active super administrator
### Calendar Checks Return No Findings
If the Directory checks run successfully but the Calendar checks (e.g., `calendar_external_sharing_primary_calendar`) return no findings, the Cloud Identity Policy API is not reachable for this Service Account. Verify:
- The **Cloud Identity API** is enabled in the GCP project hosting the Service Account (Step 2)
- The scope `https://www.googleapis.com/auth/cloud-identity.policies.readonly` is included in the Domain-Wide Delegation OAuth scopes list in the Admin Console (Step 5)
- The delegated user is a super administrator (the Policy API only returns data to super admins)
- Domain-Wide Delegation has had time to propagate after adding the new scope (a few minutes)

View File

@@ -78,7 +78,7 @@ The Service Account JSON is the full content of the key file downloaded when cre
![Check Connection](/images/providers/googleworkspace-check-connection.png)
<Note>
If the connection test fails, verify that Domain-Wide Delegation is properly configured and that all three OAuth scopes are authorized. It may take a few minutes for delegation changes to propagate. See the [Troubleshooting](/user-guide/providers/googleworkspace/authentication#troubleshooting) section for common errors.
If the connection test fails, verify that Domain-Wide Delegation is properly configured and that all required OAuth scopes are authorized. It may take a few minutes for delegation changes to propagate. See the [Troubleshooting](/user-guide/providers/googleworkspace/authentication#troubleshooting) section for common errors.
</Note>
### Step 5: Launch the Scan

View File

@@ -11,11 +11,13 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `glue_etl_jobs_no_secrets_in_arguments` check for plaintext secrets in AWS Glue ETL job arguments [(#10368)](https://github.com/prowler-cloud/prowler/pull/10368)
- `awslambda_function_no_dead_letter_queue`, `awslambda_function_using_cross_account_layers`, and `awslambda_function_env_vars_not_encrypted_with_cmk` checks for AWS Lambda [(#10381)](https://github.com/prowler-cloud/prowler/pull/10381)
- `entra_conditional_access_policy_mdm_compliant_device_required` check for M365 provider [(#10220)](https://github.com/prowler-cloud/prowler/pull/10220)
- `directory_super_admin_only_admin_roles` check for Google Workspace provider [(#10488)](https://github.com/prowler-cloud/prowler/pull/10488)
- `ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip` check for AWS provider using `ipaddress.is_global` for accurate public IP detection [(#10335)](https://github.com/prowler-cloud/prowler/pull/10335)
- `entra_conditional_access_policy_block_o365_elevated_insider_risk` check for M365 provider [(#10232)](https://github.com/prowler-cloud/prowler/pull/10232)
- `--resource-group` and `--list-resource-groups` CLI flags to filter checks by resource group across all providers [(#10479)](https://github.com/prowler-cloud/prowler/pull/10479)
- CISA SCuBA Google Workspace Baselines compliance [(#10466)](https://github.com/prowler-cloud/prowler/pull/10466)
- CIS Google Workspace Foundations Benchmark v1.3.0 compliance [(#10462)](https://github.com/prowler-cloud/prowler/pull/10462)
- `calendar_external_sharing_primary_calendar`, `calendar_external_sharing_secondary_calendar`, and `calendar_external_invitations_warning` checks for Google Workspace provider using the Cloud Identity Policy API [(#10597)](https://github.com/prowler-cloud/prowler/pull/10597)
- `entra_conditional_access_policy_device_registration_mfa_required` check and `entra_intune_enrollment_sign_in_frequency_every_time` enhancement for M365 provider [(#10222)](https://github.com/prowler-cloud/prowler/pull/10222)
- `entra_conditional_access_policy_block_elevated_insider_risk` check for M365 provider [(#10234)](https://github.com/prowler-cloud/prowler/pull/10234)
- `Vercel` provider support with 30 checks [(#10189)](https://github.com/prowler-cloud/prowler/pull/10189)
@@ -24,6 +26,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Added `internet-exposed` category to 13 AWS checks (CloudFront, CodeArtifact, EC2, EFS, RDS, SageMaker, Shield, VPC) [(#10502)](https://github.com/prowler-cloud/prowler/pull/10502)
- Minimum Python version from 3.9 to 3.10 and updated classifiers to reflect supported versions (3.10, 3.11, 3.12) [(#10464)](https://github.com/prowler-cloud/prowler/pull/10464)
- Sensitive CLI flags now warn when values are passed directly, recommending environment variables instead [(#10532)](https://github.com/prowler-cloud/prowler/pull/10532)
### 🐞 Fixed
@@ -33,6 +36,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `--list-checks` and `--list-checks-json` now include `threat-detection` category checks in their output [(#10578)](https://github.com/prowler-cloud/prowler/pull/10578)
- Missing `__init__.py` in `codebuild_project_uses_allowed_github_organizations` check preventing discovery by `--list-checks` [(#10584)](https://github.com/prowler-cloud/prowler/pull/10584)
- Azure Key Vault checks emitting incorrect findings for keys, secrets, and vault logging [(#10332)](https://github.com/prowler-cloud/prowler/pull/10332)
- `is_policy_public` now recognizes `kms:CallerAccount`, `kms:ViaService`, `aws:CalledVia`, `aws:CalledViaFirst`, and `aws:CalledViaLast` as restrictive condition keys, fixing false positives in `kms_key_policy_is_not_public` and other checks that use `is_condition_block_restrictive` [(#10600)](https://github.com/prowler-cloud/prowler/pull/10600)
- `_enabled_regions` empty-set bug in `AwsProvider.generate_regional_clients` creating boto3 clients for all 36 AWS regions instead of the audited ones, causing random CI timeouts and slow test runs [(#10598)](https://github.com/prowler-cloud/prowler/pull/10598)
- Retrieve only the latest version from a package in AWS CodeArtifact [(#10243)](https://github.com/prowler-cloud/prowler/pull/10243)

View File

@@ -54,7 +54,9 @@
{
"Id": "1.1.3",
"Description": "Ensure super admin accounts are used only for super admin activities",
"Checks": [],
"Checks": [
"directory_super_admin_only_admin_roles"
],
"Attributes": [
{
"Section": "1 Directory",
@@ -96,7 +98,9 @@
{
"Id": "3.1.1.1.1",
"Description": "Ensure external sharing options for primary calendars are configured",
"Checks": [],
"Checks": [
"calendar_external_sharing_primary_calendar"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -138,7 +142,9 @@
{
"Id": "3.1.1.1.3",
"Description": "Ensure external invitation warnings for Google Calendar are configured",
"Checks": [],
"Checks": [
"calendar_external_invitations_warning"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -159,7 +165,9 @@
{
"Id": "3.1.1.2.1",
"Description": "Ensure external sharing options for secondary calendars are configured",
"Checks": [],
"Checks": [
"calendar_external_sharing_secondary_calendar"
],
"Attributes": [
{
"Section": "3 Apps",

View File

@@ -1310,7 +1310,9 @@
{
"Id": "GWS.CALENDAR.1.1",
"Description": "External Sharing Options for Primary Calendars SHALL be configured to Only free/busy information (hide event details)",
"Checks": [],
"Checks": [
"calendar_external_sharing_primary_calendar"
],
"Attributes": [
{
"Section": "Calendar",
@@ -1323,7 +1325,9 @@
{
"Id": "GWS.CALENDAR.1.2",
"Description": "External sharing options for secondary calendars SHALL be configured to Only free/busy information (hide event details)",
"Checks": [],
"Checks": [
"calendar_external_sharing_secondary_calendar"
],
"Attributes": [
{
"Section": "Calendar",
@@ -1336,7 +1340,9 @@
{
"Id": "GWS.CALENDAR.2.1",
"Description": "External invitations warnings SHALL be enabled to prompt users before sending invitations",
"Checks": [],
"Checks": [
"calendar_external_invitations_warning"
],
"Attributes": [
{
"Section": "Calendar",

View File

@@ -207,6 +207,7 @@
"entra_admin_portals_access_restriction",
"entra_admin_users_phishing_resistant_mfa_enabled",
"entra_conditional_access_policy_block_o365_elevated_insider_risk",
"entra_conditional_access_policy_directory_sync_account_excluded",
"entra_policy_guest_users_access_restrictions",
"entra_seamless_sso_disabled"
]
@@ -685,6 +686,7 @@
"entra_conditional_access_policy_approved_client_app_required_for_mobile",
"entra_conditional_access_policy_app_enforced_restrictions",
"entra_conditional_access_policy_block_elevated_insider_risk",
"entra_conditional_access_policy_directory_sync_account_excluded",
"entra_conditional_access_policy_block_o365_elevated_insider_risk",
"entra_policy_guest_users_access_restrictions",
"sharepoint_external_sharing_restricted"
@@ -711,6 +713,7 @@
"entra_break_glass_account_fido2_security_key_registered",
"entra_conditional_access_policy_approved_client_app_required_for_mobile",
"entra_conditional_access_policy_device_code_flow_blocked",
"entra_conditional_access_policy_directory_sync_account_excluded",
"entra_identity_protection_sign_in_risk_enabled",
"entra_managed_device_required_for_authentication",
"entra_seamless_sso_disabled",

View File

@@ -12,6 +12,7 @@ from prowler.config.config import (
default_output_directory,
)
from prowler.lib.check.models import Severity
from prowler.lib.cli.redact import warn_sensitive_argument_values
from prowler.lib.outputs.common import Status
from prowler.providers.common.arguments import (
init_providers_parser,
@@ -19,8 +20,6 @@ from prowler.providers.common.arguments import (
validate_provider_arguments,
)
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
class ProwlerArgumentParser:
# Set the default parser
@@ -126,6 +125,10 @@ Detailed documentation at https://docs.prowler.com
elif sys.argv[1] == "oci":
sys.argv[1] = "oraclecloud"
# Warn about sensitive flags passed with explicit values
# Snapshot argv before parse_args() which may exit on errors
warn_sensitive_argument_values(list(sys.argv[1:]))
# Parse arguments
args = self.parser.parse_args()
@@ -434,7 +437,7 @@ Detailed documentation at https://docs.prowler.com
nargs="?",
default=None,
metavar="SHODAN_API_KEY",
help="Check if any public IPs in your Cloud environments are exposed in Shodan.",
help="Check if any public IPs in your Cloud environments are exposed in Shodan. We recommend to use the SHODAN_API_KEY environment variable to provide the API key.",
)
third_party_subparser.add_argument(
"--slack",

View File

@@ -1,6 +1,9 @@
from functools import lru_cache
from importlib import import_module
from colorama import Fore, Style
from prowler.lib.cli.sensitive import SENSITIVE_ARGUMENTS as COMMON_SENSITIVE_ARGUMENTS
from prowler.lib.logger import logger
from prowler.providers.common.provider import Provider, providers_path
@@ -13,11 +16,7 @@ def get_sensitive_arguments() -> frozenset:
sensitive: set[str] = set()
# Common parser sensitive arguments (e.g., --shodan)
try:
parser_module = import_module("prowler.lib.cli.parser")
sensitive.update(getattr(parser_module, "SENSITIVE_ARGUMENTS", frozenset()))
except Exception as error:
logger.debug(f"Could not load SENSITIVE_ARGUMENTS from parser: {error}")
sensitive.update(COMMON_SENSITIVE_ARGUMENTS)
# Provider-specific sensitive arguments
for provider in Provider.get_available_providers():
@@ -66,3 +65,49 @@ def redact_argv(argv: list[str]) -> str:
result.append(arg)
return " ".join(result)
def warn_sensitive_argument_values(argv: list[str]) -> None:
"""Log a warning for each sensitive CLI flag that was passed with an explicit value.
Scans the raw argv list (not parsed args) to detect when users pass
secret values directly on the command line instead of using environment
variables. Handles both ``--flag value`` and ``--flag=value`` syntax.
Args:
argv: The argument list to check (typically ``sys.argv[1:]``).
"""
sensitive = get_sensitive_arguments()
if not sensitive:
return
use_color = "--no-color" not in argv
flags_with_values: list[str] = []
for i, arg in enumerate(argv):
# --flag=value syntax
if "=" in arg:
flag = arg.split("=", 1)[0]
if flag in sensitive:
flags_with_values.append(flag)
continue
# --flag value syntax
if arg in sensitive:
if i + 1 < len(argv) and not argv[i + 1].startswith("-"):
flags_with_values.append(arg)
for flag in flags_with_values:
if use_color:
logger.warning(
f"{Fore.YELLOW}{Style.BRIGHT}WARNING:{Style.RESET_ALL}{Fore.YELLOW} "
f"Passing a value directly to {flag} is not recommended. "
f"Use the corresponding environment variable instead to avoid "
f"exposing secrets in process listings and shell history.{Style.RESET_ALL}"
)
else:
logger.warning(
f"Passing a value directly to {flag} is not recommended. "
f"Use the corresponding environment variable instead to avoid "
f"exposing secrets in process listings and shell history."
)

View File

@@ -0,0 +1,8 @@
"""Common parser sensitive arguments.
This module is kept dependency-free (no prowler-internal imports) so that
``prowler.lib.cli.redact`` and any provider argument module can import it
without circular-import risk.
"""
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})

View File

@@ -617,6 +617,11 @@ def is_condition_block_restrictive(
"aws:sourceorgpaths",
"aws:userid",
"aws:username",
"aws:calledvia",
"aws:calledviafirst",
"aws:calledvialast",
"kms:calleraccount",
"kms:viaservice",
"s3:resourceaccount",
"lambda:eventsourcetoken", # For Alexa Home functions, a token that the invoker must supply.
],
@@ -635,6 +640,11 @@ def is_condition_block_restrictive(
"aws:sourceorgpaths",
"aws:userid",
"aws:username",
"aws:calledvia",
"aws:calledviafirst",
"aws:calledvialast",
"kms:calleraccount",
"kms:viaservice",
"s3:resourceaccount",
"lambda:eventsourcetoken",
],

View File

@@ -59,11 +59,14 @@ class GoogleworkspaceProvider(Provider):
_mutelist: GoogleWorkspaceMutelist
audit_metadata: Audit_Metadata
# Google Workspace Admin SDK OAuth2 scopes
DIRECTORY_SCOPES = [
# Google Workspace OAuth2 scopes
SCOPES = [
"https://www.googleapis.com/auth/admin.directory.user.readonly",
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
"https://www.googleapis.com/auth/admin.directory.customer.readonly",
# Cloud Identity Policy API (calendar and other app policies)
"https://www.googleapis.com/auth/cloud-identity.policies.readonly",
"https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly",
]
def __init__(
@@ -214,7 +217,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except FileNotFoundError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -241,7 +244,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_info(
credentials_data,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except ValueError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -264,7 +267,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_file(
env_file,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except FileNotFoundError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -293,7 +296,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_info(
credentials_data,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except ValueError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -414,7 +417,7 @@ class GoogleworkspaceProvider(Provider):
)
# Fetch all domains (primary + aliases) to support domain aliases
# The scope admin.directory.domain.readonly is already in DIRECTORY_SCOPES
# The scope admin.directory.domain.readonly is already in SCOPES above
try:
domains_response = service.domains().list(customer="my_customer").execute()
valid_domains = [

View File

@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar_client = Calendar(Provider.get_global_provider())

View File

@@ -0,0 +1,41 @@
{
"Provider": "googleworkspace",
"CheckID": "calendar_external_invitations_warning",
"CheckTitle": "External invitation warnings are enabled for Google Calendar",
"CheckType": [],
"ServiceName": "calendar",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "collaboration",
"Description": "Google Calendar **warns users** when they invite guests from outside the organization to an event. This prompt gives users a chance to reconsider before sharing meeting details with external parties, reducing the likelihood of **accidental information disclosure** through calendar invitations.",
"Risk": "Without external invitation warnings, users may unintentionally include **external guests** in internal meetings, exposing **confidential meeting details**, agendas, and internal attendee lists to unauthorized parties. This is a common vector for inadvertent data leakage through everyday calendar actions.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.google.com/a/answer/6329284",
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External invitations**, check **Warn users when inviting guests outside of the domain**\n5. Click **Save**",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable external invitation warnings so users are notified whenever a meeting invitation includes guests outside the organization. This simple prompt helps prevent accidental disclosure of meeting details to unintended recipients.",
"Url": "https://hub.prowler.com/check/calendar_external_invitations_warning"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"calendar_external_sharing_primary_calendar",
"calendar_external_sharing_secondary_calendar"
],
"Notes": ""
}

View File

@@ -0,0 +1,56 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
calendar_client,
)
class calendar_external_invitations_warning(Check):
"""Check that external invitation warnings are enabled for Google Calendar
This check verifies that the domain-level policy warns users when they
invite guests from outside the organization, reducing the risk of accidental
information disclosure through calendar events.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if calendar_client.policies_fetched:
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=calendar_client.provider.identity,
resource_name=calendar_client.provider.identity.domain,
resource_id=calendar_client.provider.identity.customer_id,
customer_id=calendar_client.provider.identity.customer_id,
location="global",
)
warning_enabled = calendar_client.policies.external_invitations_warning
if warning_enabled is True:
report.status = "PASS"
report.status_extended = (
f"External invitation warnings for Google Calendar are enabled "
f"in domain {calendar_client.provider.identity.domain}."
)
else:
report.status = "FAIL"
if warning_enabled is None:
report.status_extended = (
f"External invitation warnings for Google Calendar are not "
f"explicitly configured in domain "
f"{calendar_client.provider.identity.domain}. "
f"Users should be warned when inviting guests outside the organization."
)
else:
report.status_extended = (
f"External invitation warnings for Google Calendar are disabled "
f"in domain {calendar_client.provider.identity.domain}. "
f"Users should be warned when inviting guests outside the organization."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,41 @@
{
"Provider": "googleworkspace",
"CheckID": "calendar_external_sharing_primary_calendar",
"CheckTitle": "External sharing for primary calendars is restricted to free/busy only",
"CheckType": [],
"ServiceName": "calendar",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "collaboration",
"Description": "Primary calendars in the Google Workspace domain share **only free/busy information** with external users. When external sharing is set to share full event details, sensitive information such as meeting titles, attendees, locations, and descriptions is exposed to users outside the organization.",
"Risk": "Overly permissive external sharing of primary calendars exposes **sensitive meeting metadata** — titles, attendees, locations, and descriptions — to users outside the organization. This increases the risk of **information disclosure**, **social engineering**, and **targeted phishing** based on insights into organizational activities.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.google.com/a/answer/60765",
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External sharing options for primary calendars**, select **Only free/busy information (hide event details)**\n5. Click **Save**",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict external sharing of primary calendars to free/busy information only. This preserves scheduling functionality with external users while preventing exposure of sensitive meeting details.",
"Url": "https://hub.prowler.com/check/calendar_external_sharing_primary_calendar"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"calendar_external_sharing_secondary_calendar",
"calendar_external_invitations_warning"
],
"Notes": ""
}

View File

@@ -0,0 +1,56 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
calendar_client,
)
class calendar_external_sharing_primary_calendar(Check):
"""Check that external sharing for primary calendars is restricted to free/busy only
This check verifies that the domain-level policy for primary calendar external
sharing is set to share only free/busy information, preventing exposure of
event details to external users.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if calendar_client.policies_fetched:
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=calendar_client.provider.identity,
resource_name=calendar_client.provider.identity.domain,
resource_id=calendar_client.provider.identity.customer_id,
customer_id=calendar_client.provider.identity.customer_id,
location="global",
)
sharing = calendar_client.policies.primary_calendar_external_sharing
if sharing == "EXTERNAL_FREE_BUSY_ONLY":
report.status = "PASS"
report.status_extended = (
f"Primary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is restricted to "
f"free/busy information only."
)
else:
report.status = "FAIL"
if sharing is None:
report.status_extended = (
f"Primary calendar external sharing is not explicitly configured "
f"in domain {calendar_client.provider.identity.domain}. "
f"External sharing should be restricted to free/busy information only."
)
else:
report.status_extended = (
f"Primary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is set to {sharing}. "
f"External sharing should be restricted to free/busy information only."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,41 @@
{
"Provider": "googleworkspace",
"CheckID": "calendar_external_sharing_secondary_calendar",
"CheckTitle": "External sharing for secondary calendars is restricted to free/busy only",
"CheckType": [],
"ServiceName": "calendar",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "collaboration",
"Description": "Secondary calendars in the Google Workspace domain share **only free/busy information** with external users. Secondary calendars are additional calendars users create beyond their primary calendar (e.g., for projects, teams, or personal events), and are commonly used to organize sensitive or focused activities that should not be visible to external parties.",
"Risk": "Overly permissive external sharing of secondary calendars exposes **project-specific or team-specific event details** to users outside the organization. Because secondary calendars often hold more targeted activities (e.g., product launches, internal reviews), unrestricted external sharing increases the risk of **information disclosure** and **competitive intelligence leakage**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.google.com/a/answer/60765",
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External sharing options for secondary calendars**, select **Only free/busy information (hide event details)**\n5. Click **Save**",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict external sharing of secondary calendars to free/busy information only. This preserves scheduling interoperability with external collaborators while preventing exposure of sensitive event details in user-created calendars.",
"Url": "https://hub.prowler.com/check/calendar_external_sharing_secondary_calendar"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"calendar_external_sharing_primary_calendar",
"calendar_external_invitations_warning"
],
"Notes": ""
}

View File

@@ -0,0 +1,56 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
calendar_client,
)
class calendar_external_sharing_secondary_calendar(Check):
"""Check that external sharing for secondary calendars is restricted to free/busy only
This check verifies that the domain-level policy for secondary calendar external
sharing is set to share only free/busy information, preventing exposure of
event details in user-created calendars to external users.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if calendar_client.policies_fetched:
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=calendar_client.provider.identity,
resource_name=calendar_client.provider.identity.domain,
resource_id=calendar_client.provider.identity.customer_id,
customer_id=calendar_client.provider.identity.customer_id,
location="global",
)
sharing = calendar_client.policies.secondary_calendar_external_sharing
if sharing == "EXTERNAL_FREE_BUSY_ONLY":
report.status = "PASS"
report.status_extended = (
f"Secondary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is restricted to "
f"free/busy information only."
)
else:
report.status = "FAIL"
if sharing is None:
report.status_extended = (
f"Secondary calendar external sharing is not explicitly configured "
f"in domain {calendar_client.provider.identity.domain}. "
f"External sharing should be restricted to free/busy information only."
)
else:
report.status_extended = (
f"Secondary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is set to {sharing}. "
f"External sharing should be restricted to free/busy information only."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,112 @@
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.googleworkspace.lib.service.service import GoogleWorkspaceService
class Calendar(GoogleWorkspaceService):
"""Google Workspace Calendar service for auditing domain-level calendar policies.
Uses the Cloud Identity Policy API v1 to read calendar sharing
and invitation settings configured in the Admin Console.
"""
def __init__(self, provider):
super().__init__(provider)
self.policies = CalendarPolicies()
self.policies_fetched = False
self._fetch_calendar_policies()
def _fetch_calendar_policies(self):
"""Fetch calendar policies from the Cloud Identity Policy API v1."""
logger.info("Calendar - Fetching calendar policies...")
try:
service = self._build_service("cloudidentity", "v1")
if not service:
logger.error("Failed to build Cloud Identity service")
return
request = service.policies().list(pageSize=100)
fetch_succeeded = True
while request is not None:
try:
response = request.execute()
for policy in response.get("policies", []):
setting = policy.get("setting", {})
setting_type = setting.get("type", "").removeprefix("settings/")
value = setting.get("value", {})
if (
setting_type
== "calendar.primary_calendar_max_allowed_external_sharing"
):
self.policies.primary_calendar_external_sharing = value.get(
"maxAllowedExternalSharing"
)
logger.debug(
"Primary calendar external sharing: "
f"{self.policies.primary_calendar_external_sharing}"
)
elif (
setting_type
== "calendar.secondary_calendar_max_allowed_external_sharing"
):
self.policies.secondary_calendar_external_sharing = (
value.get("maxAllowedExternalSharing")
)
logger.debug(
"Secondary calendar external sharing: "
f"{self.policies.secondary_calendar_external_sharing}"
)
elif setting_type == "calendar.external_invitations":
self.policies.external_invitations_warning = value.get(
"warnOnInvite"
)
logger.debug(
"External invitations warning: "
f"{self.policies.external_invitations_warning}"
)
request = service.policies().list_next(request, response)
except Exception as error:
self._handle_api_error(
error,
"fetching calendar policies",
self.provider.identity.customer_id,
)
fetch_succeeded = False
break
self.policies_fetched = fetch_succeeded
logger.info(
f"Calendar policies fetched - "
f"Primary sharing: {self.policies.primary_calendar_external_sharing}, "
f"Secondary sharing: {self.policies.secondary_calendar_external_sharing}, "
f"Invitation warnings: {self.policies.external_invitations_warning}"
)
except Exception as error:
self._handle_api_error(
error,
"fetching calendar policies",
self.provider.identity.customer_id,
)
self.policies_fetched = False
class CalendarPolicies(BaseModel):
"""Model for domain-level Calendar policy settings."""
primary_calendar_external_sharing: Optional[str] = None
secondary_calendar_external_sharing: Optional[str] = None
external_invitations_warning: Optional[bool] = None

View File

@@ -8,23 +8,21 @@ class Directory(GoogleWorkspaceService):
def __init__(self, provider):
super().__init__(provider)
self._service = self._build_service("admin", "directory_v1")
self.users = self._list_users()
self._roles = self._list_roles()
self._populate_role_assignments()
def _list_users(self):
logger.info("Directory - Listing Users...")
users = {}
try:
# Build the Admin SDK Directory service
service = self._build_service("admin", "directory_v1")
if not service:
if not self._service:
logger.error("Failed to build Directory service")
return users
# Fetch users using the Directory API
# Reference: https://developers.google.com/admin-sdk/directory/reference/rest/v1/users/list
request = service.users().list(
request = self._service.users().list(
customer=self.provider.identity.customer_id,
maxResults=500, # Max allowed by API
orderBy="email",
@@ -38,14 +36,11 @@ class Directory(GoogleWorkspaceService):
user = User(
id=user_data.get("id"),
email=user_data.get("primaryEmail"),
is_admin=user_data.get("isAdmin", False),
)
users[user.id] = user
logger.debug(
f"Processed user: {user.email} (Admin: {user.is_admin})"
)
logger.debug(f"Processed user: {user.email}")
request = service.users().list_next(request, response)
request = self._service.users().list_next(request, response)
except Exception as error:
self._handle_api_error(
@@ -62,9 +57,108 @@ class Directory(GoogleWorkspaceService):
return users
def _list_roles(self):
logger.info("Directory - Listing Roles...")
roles = {}
try:
if not self._service:
return roles
request = self._service.roles().list(
customer=self.provider.identity.customer_id,
)
while request is not None:
try:
response = request.execute()
for role_data in response.get("items", []):
role_id = str(role_data.get("roleId", ""))
role_name = role_data.get("roleName", "")
if role_id and role_name:
roles[role_id] = Role(
id=role_id,
name=role_name,
description=role_data.get("roleDescription", ""),
is_super_admin_role=role_data.get(
"isSuperAdminRole", False
),
)
request = self._service.roles().list_next(request, response)
except Exception as error:
self._handle_api_error(
error,
"listing roles",
self.provider.identity.customer_id,
)
break
logger.info(f"Found {len(roles)} roles in the domain")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return roles
def _populate_role_assignments(self):
logger.info("Directory - Fetching Role Assignments...")
if not self._service:
return
try:
request = self._service.roleAssignments().list(
customer=self.provider.identity.customer_id,
)
while request is not None:
try:
response = request.execute()
for assignment in response.get("items", []):
user_id = str(assignment.get("assignedTo", ""))
role_id = str(assignment.get("roleId", ""))
user = self.users.get(user_id)
role = self._roles.get(role_id)
if user and role:
user.role_assignments.append(role)
if role.is_super_admin_role:
user.is_admin = True
request = self._service.roleAssignments().list_next(
request, response
)
except Exception as error:
self._handle_api_error(
error,
"listing role assignments",
self.provider.identity.customer_id,
)
break
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Role(BaseModel):
id: str
name: str
description: str = ""
is_super_admin_role: bool = False
class User(BaseModel):
id: str
email: str
is_admin: bool = False
role_assignments: list[Role] = []

View File

@@ -0,0 +1,39 @@
{
"Provider": "googleworkspace",
"CheckID": "directory_super_admin_only_admin_roles",
"CheckTitle": "All super admin accounts are used only for super admin activities",
"CheckType": [],
"ServiceName": "directory",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Super admin accounts do not also hold **additional admin roles** such as Groups Admin, User Management Admin, etc. Each super administrator has a separate, non-admin account for daily activities, following the **principle of least privilege**.",
"Risk": "A super admin account that also holds additional admin roles increases the **attack surface** for phishing and credential theft. Compromising a single dual-role account grants full administrative access, bypassing **separation of duties** and enabling unauthorized changes to users, billing, and security settings.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://knowledge.workspace.google.com/admin/users/prebuilt-administrator-roles",
"https://support.google.com/a/answer/9011373"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Directory** > **Users**\n3. Click on the super admin user who also has additional admin roles\n4. Click **Admin roles and privileges**\n5. Remove the additional admin roles from the super admin account\n6. Create a separate account for daily admin tasks",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply the principle of separation of duties by maintaining dedicated super admin accounts exclusively for privileged tasks. Daily administrative activities should be performed from separate accounts with only the delegated roles required.",
"Url": "https://hub.prowler.com/check/directory_super_admin_only_admin_roles"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [
"directory_super_admin_count"
],
"Notes": ""
}

View File

@@ -0,0 +1,60 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.directory.directory_client import (
directory_client,
)
class directory_super_admin_only_admin_roles(Check):
"""Check that super admin accounts are used only for super admin activities
This check verifies that no super admin user has additional admin roles assigned
beyond the Super Admin role. Super admins should have separate accounts for daily
activities to follow least privilege.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if directory_client.users:
dual_role_admins = {}
for user in directory_client.users.values():
if user.is_admin:
extra_roles = [
r.description or r.name
for r in user.role_assignments
if not r.is_super_admin_role
]
if extra_roles:
dual_role_admins[user.email] = extra_roles
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=directory_client.provider.identity,
resource_name=directory_client.provider.identity.domain,
resource_id=directory_client.provider.identity.customer_id,
customer_id=directory_client.provider.identity.customer_id,
location="global",
)
if dual_role_admins:
details = ", ".join(
f"{email} ({', '.join(roles)})"
for email, roles in dual_role_admins.items()
)
report.status = "FAIL"
report.status_extended = (
f"Super admin accounts also holding additional admin roles: {details}. "
f"Super admin accounts should be used only for super admin activities."
)
else:
report.status = "PASS"
report.status_extended = (
f"All super admin accounts in domain {directory_client.provider.identity.domain} "
f"are used only for super admin activities."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,41 @@
{
"Provider": "m365",
"CheckID": "entra_conditional_access_policy_directory_sync_account_excluded",
"CheckTitle": "Conditional Access policy excludes Directory Synchronization Accounts to protect Entra Connect sync",
"CheckType": [],
"ServiceName": "entra",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Conditional Access policies scoped to **all users** and **all cloud applications** are evaluated to confirm the **Directory Synchronization Accounts** role is explicitly excluded. The Microsoft Entra Connect Sync Account does not support multifactor authentication, so it must be excluded from restrictive policies to maintain directory synchronization.",
"Risk": "If the Directory Synchronization Accounts role is not excluded from Conditional Access policies requiring MFA or blocking access, the Entra Connect Sync Account will be unable to authenticate. This breaks hybrid identity synchronization between on-premises Active Directory and Entra ID, potentially causing authentication failures and identity inconsistencies.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-all-users-mfa",
"https://learn.microsoft.com/en-us/entra/identity/hybrid/connect/reference-connect-accounts-permissions",
"https://maester.dev/docs/tests/MT.1020"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Navigate to the Microsoft Entra admin center at https://entra.microsoft.com.\n2. Expand **Protection** > **Conditional Access** and select **Policies**.\n3. Open each policy that targets **All users** and **All cloud apps**.\n4. Under **Users** > **Exclude**, select **Directory roles** and add the **Directory Synchronization Accounts** role.\n5. Save the policy.",
"Terraform": ""
},
"Recommendation": {
"Text": "Exclude the Directory Synchronization Accounts role from all Conditional Access policies that target all users and all cloud applications. This prevents breaking Entra Connect directory synchronization while maintaining security controls for interactive users.",
"Url": "https://hub.prowler.com/check/entra_conditional_access_policy_directory_sync_account_excluded"
}
},
"Categories": [
"identity-access",
"e3"
],
"DependsOn": [],
"RelatedTo": [
"entra_conditional_access_policy_require_mfa_for_management_api"
],
"Notes": "This check corresponds to Maester test MT.1020 (Test-MtCaExclusionForDirectorySyncAccount). The Directory Synchronization Accounts role template ID is d29b2b05-8046-44ba-8758-1e26182fcf32."
}

View File

@@ -0,0 +1,88 @@
"""Check if Conditional Access policies exclude the Directory Synchronization Account."""
from prowler.lib.check.models import Check, CheckReportM365
from prowler.providers.m365.services.entra.entra_client import entra_client
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicyState,
)
# The Directory Synchronization Accounts built-in role template ID in Entra ID.
# This role is assigned to the Microsoft Entra Connect Sync service account and
# does not support multifactor authentication.
DIRECTORY_SYNC_ROLE_TEMPLATE_ID = "d29b2b05-8046-44ba-8758-1e26182fcf32"
class entra_conditional_access_policy_directory_sync_account_excluded(Check):
"""Check that Conditional Access policies exclude the Directory Synchronization Account.
The Microsoft Entra Connect Sync Account cannot support MFA. Conditional
Access policies scoped to all users and all cloud apps must explicitly
exclude the Directory Synchronization Accounts role to prevent breaking
directory synchronization.
- PASS: The policy excludes the Directory Synchronization Accounts role.
- FAIL: The policy does not exclude the Directory Synchronization Accounts role.
"""
def execute(self) -> list[CheckReportM365]:
"""Execute the check for Directory Sync Account exclusion from Conditional Access policies.
Iterates through all enabled Conditional Access policies that target
all users and all cloud applications, verifying each one excludes the
Directory Synchronization Accounts role.
Returns:
A list of reports containing the result of the check.
"""
findings = []
for policy in entra_client.conditional_access_policies.values():
if policy.state == ConditionalAccessPolicyState.DISABLED:
continue
if not policy.conditions.user_conditions:
continue
if "All" not in policy.conditions.user_conditions.included_users:
continue
if not policy.conditions.application_conditions:
continue
if (
"All"
not in policy.conditions.application_conditions.included_applications
):
continue
report = CheckReportM365(
metadata=self.metadata(),
resource=policy,
resource_name=policy.display_name,
resource_id=policy.id,
)
if (
DIRECTORY_SYNC_ROLE_TEMPLATE_ID
in policy.conditions.user_conditions.excluded_roles
):
report.status = "PASS"
report.status_extended = f"Conditional Access Policy '{policy.display_name}' excludes the Directory Synchronization Accounts role."
else:
report.status = "FAIL"
report.status_extended = f"Conditional Access Policy '{policy.display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
findings.append(report)
if not findings:
report = CheckReportM365(
metadata=self.metadata(),
resource={},
resource_name="Conditional Access Policies",
resource_id="conditionalAccessPolicies",
)
report.status = "PASS"
report.status_extended = "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
findings.append(report)
return findings

View File

@@ -13,7 +13,11 @@ def init_parser(self):
"--nhn-username", nargs="?", default=None, help="NHN API Username"
)
nhn_auth_subparser.add_argument(
"--nhn-password", nargs="?", default=None, help="NHN API Password"
"--nhn-password",
nargs="?",
default=None,
metavar="NHN_PASSWORD",
help="NHN API Password",
)
nhn_auth_subparser.add_argument(
"--nhn-tenant-id", nargs="?", default=None, help="NHN Tenant ID"

View File

@@ -46,6 +46,7 @@ def init_parser(self):
"--os-password",
nargs="?",
default=None,
metavar="OS_PASSWORD",
help="OpenStack password for authentication. Can also be set via OS_PASSWORD environment variable",
)
openstack_explicit_subparser.add_argument(

View File

@@ -45,6 +45,34 @@ prowler/providers/{provider}/
└── {check_name}.metadata.json
```
## Sensitive CLI Arguments
Flags that accept secrets (tokens, passwords, API keys) MUST follow these rules:
1. **Use `nargs="?"` with `default=None`** — the flag accepts an optional value for backward compatibility; the recommended path is environment variables.
2. **Set `metavar` to the environment variable name** users should use (e.g., `metavar="GITHUB_PERSONAL_ACCESS_TOKEN"`).
3. **Add the flag to the `SENSITIVE_ARGUMENTS` frozenset** at the top of the provider's `arguments.py`. This set is used to redact values in HTML output and warn users who pass secrets directly.
4. **Do not add new arguments that require passing secrets as CLI values** — secrets should come from environment variables. The flag accepts a value for backward compatibility, but CLI warns users to prefer env vars.
### Pattern
```python
# prowler/providers/{provider}/lib/arguments/arguments.py
SENSITIVE_ARGUMENTS = frozenset({"--my-api-key", "--my-password"})
def init_parser(self):
auth_subparser = parser.add_argument_group("Authentication Modes")
auth_subparser.add_argument(
"--my-api-key",
nargs="?",
default=None,
metavar="MY_API_KEY",
help="API key for authentication. Use MY_API_KEY env var instead of passing directly.",
)
```
## Provider Class Template
```python

View File

@@ -1,8 +1,14 @@
import logging
from unittest.mock import patch
import pytest
from prowler.lib.cli.redact import REDACTED_VALUE, get_sensitive_arguments, redact_argv
from prowler.lib.cli.redact import (
REDACTED_VALUE,
get_sensitive_arguments,
redact_argv,
warn_sensitive_argument_values,
)
@pytest.fixture
@@ -87,6 +93,62 @@ class TestRedactArgv:
assert redact_argv(argv) == "aws --region=us-east-1"
class TestWarnSensitiveArgumentValues:
def test_no_warning_without_sensitive_flags(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["aws", "--region", "eu-west-1"])
assert caplog.text == ""
def test_no_warning_flag_without_value(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["github", "--personal-access-token"])
assert caplog.text == ""
def test_no_warning_flag_followed_by_another_flag(
self, caplog, mock_sensitive_args
):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(
["github", "--personal-access-token", "--region", "eu-west-1"]
)
assert caplog.text == ""
def test_warning_flag_with_value(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(
["github", "--personal-access-token", "ghp_secret"]
)
assert "--personal-access-token" in caplog.text
assert "not recommended" in caplog.text
def test_warning_flag_with_equals_syntax(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["aws", "--shodan=key123"])
assert "--shodan" in caplog.text
assert "not recommended" in caplog.text
def test_warning_multiple_flags(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(
[
"github",
"--personal-access-token",
"ghp_secret",
"--shodan",
"key",
]
)
assert "--personal-access-token" in caplog.text
assert "--shodan" in caplog.text
def test_no_color_output(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["--no-color", "aws", "--shodan", "key123"])
assert "not recommended" in caplog.text
# Should not contain ANSI escape codes
assert "\033[" not in caplog.text
class TestGetSensitiveArguments:
def test_discovers_known_sensitive_arguments(self):
"""Integration test: verify the discovery mechanism finds flags from provider modules."""

View File

@@ -1413,6 +1413,115 @@ class Test_Policy:
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_aws_CalledVia_str(self):
condition_statement = {
"StringEquals": {"aws:CalledVia": "cloudformation.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_equals_aws_CalledViaFirst_str(self):
condition_statement = {
"StringEquals": {"aws:CalledViaFirst": "cloudformation.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_equals_aws_CalledViaLast_str(self):
condition_statement = {
"StringEquals": {"aws:CalledViaLast": "glue.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_like_aws_CalledVia_str(self):
condition_statement = {"StringLike": {"aws:CalledVia": "*.amazonaws.com"}}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_equals_kms_CallerAccount_str(self):
condition_statement = {
"StringEquals": {"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_CallerAccount_str_not_valid(self):
condition_statement = {
"StringEquals": {"kms:CallerAccount": NON_TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert not is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_CallerAccount_list(self):
condition_statement = {
"StringEquals": {"kms:CallerAccount": [TRUSTED_AWS_ACCOUNT_NUMBER]}
}
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_CallerAccount_list_not_valid(self):
condition_statement = {
"StringEquals": {
"kms:CallerAccount": [
TRUSTED_AWS_ACCOUNT_NUMBER,
NON_TRUSTED_AWS_ACCOUNT_NUMBER,
]
}
}
assert not is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_ViaService_str(self):
condition_statement = {
"StringEquals": {"kms:ViaService": "glue.eu-central-1.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_like_kms_CallerAccount_str(self):
condition_statement = {
"StringLike": {"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_like_kms_CallerAccount_str_not_valid(self):
condition_statement = {
"StringLike": {"kms:CallerAccount": NON_TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert not is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_like_kms_ViaService_str(self):
condition_statement = {"StringLike": {"kms:ViaService": "glue.*.amazonaws.com"}}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_two_lists_unrestrictive(self):
condition_statement = {
"StringLike": {
@@ -2357,6 +2466,71 @@ class Test_Policy:
trusted_ips=["1.2.3.4", "5.6.7.8"],
)
def test_is_policy_public_kms_caller_account_and_via_service(self):
policy = {
"Version": "2008-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:CreateGrant",
"kms:DescribeKey",
],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:ViaService": "glue.eu-central-1.amazonaws.com",
"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER,
}
},
},
],
}
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_is_policy_public_kms_caller_account_only(self):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["kms:Decrypt"],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER,
}
},
},
],
}
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_is_policy_public_kms_via_service_without_account_restriction(self):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["kms:Decrypt"],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:ViaService": "glue.eu-central-1.amazonaws.com",
}
},
},
],
}
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_check_admin_access(self):
policy = {
"Version": "2012-10-17",

View File

@@ -43,6 +43,39 @@ USER_3 = {
}
# Role data for Directory API role tests
SUPER_ADMIN_ROLE_ID = "13801188331880449"
SEED_ADMIN_ROLE_ID = "13801188331880451"
GROUPS_ADMIN_ROLE_ID = "13801188331880450"
ROLE_SUPER_ADMIN = {
"roleId": SUPER_ADMIN_ROLE_ID,
"roleName": "Super Admin",
"roleDescription": "Super Admin",
"isSystemRole": True,
"isSuperAdminRole": True,
}
# Google automatically assigns _SEED_ADMIN_ROLE to the first account that
# created the domain. It is a super-admin-capable system role with a
# different name, so it must also be excluded when counting "extra" roles.
ROLE_SEED_ADMIN = {
"roleId": SEED_ADMIN_ROLE_ID,
"roleName": "_SEED_ADMIN_ROLE",
"roleDescription": "Super Admin",
"isSystemRole": True,
"isSuperAdminRole": True,
}
ROLE_GROUPS_ADMIN = {
"roleId": GROUPS_ADMIN_ROLE_ID,
"roleName": "_GROUPS_ADMIN_ROLE",
"roleDescription": "Groups Administrator",
"isSystemRole": True,
"isSuperAdminRole": False,
}
def set_mocked_googleworkspace_provider(
identity: GoogleWorkspaceIdentityInfo = GoogleWorkspaceIdentityInfo(
domain=DOMAIN,

View File

@@ -0,0 +1,130 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
class TestCalendarExternalInvitationsWarning:
def test_pass_warnings_enabled(self):
"""Test PASS when external invitation warnings are enabled"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
external_invitations_warning=True
)
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "enabled" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_warnings_disabled(self):
"""Test FAIL when external invitation warnings are disabled"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
external_invitations_warning=False
)
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "disabled" in findings[0].status_extended
def test_fail_no_policy_set(self):
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
external_invitations_warning=None
)
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not explicitly configured" in findings[0].status_extended
def test_no_findings_when_fetch_failed(self):
"""Test no findings returned when the API fetch failed"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = False
mock_calendar_client.policies = CalendarPolicies()
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 0

View File

@@ -0,0 +1,161 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
class TestCalendarExternalSharingPrimaryCalendar:
def test_pass_free_busy_only(self):
"""Test PASS when external sharing is restricted to free/busy only"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY"
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "free/busy information only" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_read_only(self):
"""Test FAIL when external sharing allows read-only access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_ONLY"
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_ONLY" in findings[0].status_extended
assert "free/busy information only" in findings[0].status_extended
def test_fail_read_write(self):
"""Test FAIL when external sharing allows read-write access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE"
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_WRITE" in findings[0].status_extended
def test_fail_no_policy_set(self):
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing=None
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not explicitly configured" in findings[0].status_extended
def test_no_findings_when_fetch_failed(self):
"""Test no findings returned when the API fetch failed"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = False
mock_calendar_client.policies = CalendarPolicies()
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 0

View File

@@ -0,0 +1,161 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
class TestCalendarExternalSharingSecondaryCalendar:
def test_pass_free_busy_only(self):
"""Test PASS when external sharing is restricted to free/busy only"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY"
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "free/busy information only" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_read_only(self):
"""Test FAIL when external sharing allows read-only access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_ONLY"
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_ONLY" in findings[0].status_extended
assert "free/busy information only" in findings[0].status_extended
def test_fail_read_write_manage(self):
"""Test FAIL when external sharing allows read-write-manage access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE_MANAGE"
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_WRITE_MANAGE" in findings[0].status_extended
def test_fail_no_policy_set(self):
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing=None
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not explicitly configured" in findings[0].status_extended
def test_no_findings_when_fetch_failed(self):
"""Test no findings returned when the API fetch failed"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = False
mock_calendar_client.policies = CalendarPolicies()
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 0

View File

@@ -0,0 +1,231 @@
from unittest.mock import MagicMock, patch
from tests.providers.googleworkspace.googleworkspace_fixtures import (
set_mocked_googleworkspace_provider,
)
class TestCalendarService:
def test_calendar_fetch_policies_all_settings(self):
"""Test fetching all 3 calendar policy settings from Cloud Identity API"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_credentials = MagicMock()
mock_session = MagicMock()
mock_session.credentials = mock_credentials
mock_provider.session = mock_session
mock_service = MagicMock()
mock_policies_list = MagicMock()
# Mock the actual Cloud Identity Policy API v1 response shape:
# - "type" (not "name"), prefixed with "settings/"
# - inner value field names are camelCase
mock_policies_list.execute.return_value = {
"policies": [
{
"setting": {
"type": "settings/calendar.primary_calendar_max_allowed_external_sharing",
"value": {
"maxAllowedExternalSharing": "EXTERNAL_FREE_BUSY_ONLY"
},
}
},
{
"setting": {
"type": "settings/calendar.secondary_calendar_max_allowed_external_sharing",
"value": {
"maxAllowedExternalSharing": "EXTERNAL_ALL_INFO_READ_ONLY"
},
}
},
{
"setting": {
"type": "settings/calendar.external_invitations",
"value": {"warnOnInvite": True},
}
},
]
}
mock_service.policies().list.return_value = mock_policies_list
mock_service.policies().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is True
assert (
calendar.policies.primary_calendar_external_sharing
== "EXTERNAL_FREE_BUSY_ONLY"
)
assert (
calendar.policies.secondary_calendar_external_sharing
== "EXTERNAL_ALL_INFO_READ_ONLY"
)
assert calendar.policies.external_invitations_warning is True
def test_calendar_fetch_policies_empty_response(self):
"""Test handling empty policies response"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
mock_policies_list = MagicMock()
mock_policies_list.execute.return_value = {"policies": []}
mock_service.policies().list.return_value = mock_policies_list
mock_service.policies().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is True
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_fetch_policies_api_error(self):
"""Test handling of API errors during policy fetch"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
mock_service.policies().list.side_effect = Exception("API Error")
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is False
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_fetch_policies_build_service_returns_none(self):
"""Test early return when _build_service fails to construct the client"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=None,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is False
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_fetch_policies_execute_raises(self):
"""Test inner except handler when request.execute() raises during pagination"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
mock_request = MagicMock()
mock_request.execute.side_effect = Exception("Execute failed")
mock_service.policies().list.return_value = mock_request
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is False
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_policies_model(self):
"""Test CalendarPolicies Pydantic model"""
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY",
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE",
external_invitations_warning=True,
)
assert policies.primary_calendar_external_sharing == "EXTERNAL_FREE_BUSY_ONLY"
assert (
policies.secondary_calendar_external_sharing
== "EXTERNAL_ALL_INFO_READ_WRITE"
)
assert policies.external_invitations_warning is True

View File

@@ -1,6 +1,12 @@
from unittest.mock import MagicMock, patch
from tests.providers.googleworkspace.googleworkspace_fixtures import (
GROUPS_ADMIN_ROLE_ID,
ROLE_GROUPS_ADMIN,
ROLE_SEED_ADMIN,
ROLE_SUPER_ADMIN,
SEED_ADMIN_ROLE_ID,
SUPER_ADMIN_ROLE_ID,
USER_1,
USER_2,
USER_3,
@@ -25,6 +31,24 @@ class TestDirectoryService:
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles response
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {
"items": [
{"assignedTo": "user1-id", "roleId": SUPER_ADMIN_ROLE_ID},
{"assignedTo": "user2-id", "roleId": SUPER_ADMIN_ROLE_ID},
]
}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
@@ -67,6 +91,17 @@ class TestDirectoryService:
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles response
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {"items": []}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {"items": []}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
@@ -97,6 +132,16 @@ class TestDirectoryService:
mock_service = MagicMock()
mock_service.users().list.side_effect = Exception("API Error")
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {"items": []}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {"items": []}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
@@ -130,3 +175,193 @@ class TestDirectoryService:
assert user.id == "test-id"
assert user.email == "test@test-company.com"
assert user.is_admin is True
assert user.role_assignments == []
def test_directory_list_roles(self):
"""Test that _list_roles correctly builds a roleId-to-roleName mapping"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
# Mock empty users
mock_users_list = MagicMock()
mock_users_list.execute.return_value = {"users": []}
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles response
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {"items": []}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.directory.directory_service import (
Directory,
)
directory = Directory(mock_provider)
super_admin_role = directory._roles[SUPER_ADMIN_ROLE_ID]
assert super_admin_role.name == "Super Admin"
assert super_admin_role.description == "Super Admin"
assert super_admin_role.is_super_admin_role is True
groups_admin_role = directory._roles[GROUPS_ADMIN_ROLE_ID]
assert groups_admin_role.name == "_GROUPS_ADMIN_ROLE"
assert groups_admin_role.description == "Groups Administrator"
assert groups_admin_role.is_super_admin_role is False
def test_directory_role_assignments_populated(self):
"""Test that role assignments are fetched and resolved for super admins"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
# Mock users - one super admin
mock_users_list = MagicMock()
mock_users_list.execute.return_value = {"users": [USER_1]}
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {
"items": [
{"assignedTo": "user1-id", "roleId": SUPER_ADMIN_ROLE_ID},
{"assignedTo": "user1-id", "roleId": GROUPS_ADMIN_ROLE_ID},
]
}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.directory.directory_service import (
Directory,
)
directory = Directory(mock_provider)
user = directory.users["user1-id"]
role_names = [r.name for r in user.role_assignments]
role_descriptions = [r.description for r in user.role_assignments]
assert "Super Admin" in role_names
assert "_GROUPS_ADMIN_ROLE" in role_names
assert "Groups Administrator" in role_descriptions
assert len(user.role_assignments) == 2
assert user.is_admin is True
def test_directory_second_super_admin_detected_via_role_assignments(self):
"""Regression: a second super admin whose users.list().isAdmin still
reads False (e.g. API propagation lag, or only holding
_SEED_ADMIN_ROLE) must still be recognised as a super admin through
the Role Assignments API, AND any extra non-super-admin roles they
hold must be surfaced on their User object."""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
stale_user_1 = {
"id": "user1-id",
"primaryEmail": "admin1@test-company.com",
"isAdmin": False,
}
stale_user_2 = {
"id": "user2-id",
"primaryEmail": "admin2@test-company.com",
"isAdmin": False,
}
mock_users_list = MagicMock()
mock_users_list.execute.return_value = {"users": [stale_user_1, stale_user_2]}
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_SEED_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {
"items": [
{"assignedTo": "user1-id", "roleId": SEED_ADMIN_ROLE_ID},
{"assignedTo": "user2-id", "roleId": SUPER_ADMIN_ROLE_ID},
{"assignedTo": "user2-id", "roleId": GROUPS_ADMIN_ROLE_ID},
]
}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.directory.directory_service import (
Directory,
)
directory = Directory(mock_provider)
user1 = directory.users["user1-id"]
user2 = directory.users["user2-id"]
assert user1.is_admin is True
assert user2.is_admin is True
assert [r.name for r in user1.role_assignments] == ["_SEED_ADMIN_ROLE"]
user2_role_names = {r.name for r in user2.role_assignments}
assert user2_role_names == {"Super Admin", "_GROUPS_ADMIN_ROLE"}

View File

@@ -0,0 +1,446 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.directory.directory_service import (
Role,
User,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
SUPER_ADMIN_ROLE = Role(
id="13801188331880449",
name="Super Admin",
description="Super Admin",
is_super_admin_role=True,
)
SEED_ADMIN_ROLE = Role(
id="13801188331880451",
name="_SEED_ADMIN_ROLE",
description="Super Admin",
is_super_admin_role=True,
)
GROUPS_ADMIN_ROLE = Role(
id="13801188331880450",
name="_GROUPS_ADMIN_ROLE",
description="Groups Administrator",
is_super_admin_role=False,
)
USER_MANAGEMENT_ADMIN_ROLE = Role(
id="13801188331880452",
name="_USER_MANAGEMENT_ADMIN_ROLE",
description="User Management Administrator",
is_super_admin_role=False,
)
CUSTOM_ROLE_NO_DESCRIPTION = Role(
id="13801188331880453",
name="custom-helpdesk-role",
description="",
is_super_admin_role=False,
)
class TestDirectorySuperAdminOnlyAdminRoles:
def test_pass_super_admins_only_super_admin_role(self):
"""Test PASS when super admins have only the Super Admin role"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
"admin2-id": User(
id="admin2-id",
email="admin2@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
"user1-id": User(
id="user1-id",
email="user@test-company.com",
is_admin=False,
role_assignments=[],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "used only for super admin activities" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_pass_super_admin_with_seed_admin_role(self):
"""Test PASS when a super admin only holds _SEED_ADMIN_ROLE.
_SEED_ADMIN_ROLE is auto-assigned by Google to the original domain
creator and has isSuperAdminRole=True, so it must not count as an
"extra" role.
"""
users = {
"admin1-id": User(
id="admin1-id",
email="playground@prowler.cloud",
is_admin=True,
role_assignments=[SEED_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "_SEED_ADMIN_ROLE" not in findings[0].status_extended
def test_pass_super_admin_with_both_super_admin_and_seed_admin(self):
"""Test PASS when admin holds both Super Admin and _SEED_ADMIN_ROLE"""
users = {
"admin1-id": User(
id="admin1-id",
email="playground@prowler.cloud",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, SEED_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
def test_fail_super_admin_with_additional_roles(self):
"""Test FAIL when a super admin also has additional admin roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
),
"user1-id": User(
id="user1-id",
email="user@test-company.com",
is_admin=False,
role_assignments=[],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "admin1@test-company.com" in findings[0].status_extended
assert "Groups Administrator" in findings[0].status_extended
assert "_GROUPS_ADMIN_ROLE" not in findings[0].status_extended
assert "used only for super admin activities" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_seed_admin_with_additional_roles(self):
"""Test FAIL when a _SEED_ADMIN_ROLE holder also has extra roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="playground@prowler.cloud",
is_admin=True,
role_assignments=[SEED_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "playground@prowler.cloud" in findings[0].status_extended
assert "Groups Administrator" in findings[0].status_extended
assert "_GROUPS_ADMIN_ROLE" not in findings[0].status_extended
assert "_SEED_ADMIN_ROLE" not in findings[0].status_extended
def test_fail_multiple_super_admins_with_extra_roles(self):
"""Test FAIL lists all super admins that have additional roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
),
"admin2-id": User(
id="admin2-id",
email="admin2@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, USER_MANAGEMENT_ADMIN_ROLE],
),
"admin3-id": User(
id="admin3-id",
email="admin3@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "admin1@test-company.com" in findings[0].status_extended
assert "admin2@test-company.com" in findings[0].status_extended
assert "admin3@test-company.com" not in findings[0].status_extended
def test_no_findings_when_no_users(self):
"""Test no findings when there are no users"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = {}
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 0
def test_non_super_admin_with_roles_not_flagged(self):
"""Test that users who are not super admins are ignored even if they have roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
"delegated1-id": User(
id="delegated1-id",
email="delegated@test-company.com",
is_admin=False,
role_assignments=[GROUPS_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "delegated@test-company.com" not in findings[0].status_extended
def test_pass_super_admin_with_empty_role_assignments(self):
"""Test PASS when super admin has no role assignments (edge case)"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
def test_fail_custom_role_without_description_falls_back_to_name(self):
"""A custom role with an empty description should be displayed
using its name as a fall-back, so the FAIL message is never blank
for users that genuinely hold extra roles."""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, CUSTOM_ROLE_NO_DESCRIPTION],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "custom-helpdesk-role" in findings[0].status_extended

View File

@@ -0,0 +1,711 @@
"""Tests for the entra_conditional_access_policy_directory_sync_account_excluded check."""
from unittest import mock
from uuid import uuid4
from prowler.providers.m365.services.entra.entra_service import (
ApplicationEnforcedRestrictions,
ApplicationsConditions,
ConditionalAccessGrantControl,
ConditionalAccessPolicyState,
Conditions,
GrantControlOperator,
GrantControls,
PersistentBrowser,
SessionControls,
SignInFrequency,
SignInFrequencyInterval,
UsersConditions,
)
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
DIRECTORY_SYNC_ROLE_TEMPLATE_ID = "d29b2b05-8046-44ba-8758-1e26182fcf32"
CHECK_MODULE_PATH = "prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded"
def _default_session_controls():
"""Return default session controls for test policies."""
return SessionControls(
persistent_browser=PersistentBrowser(is_enabled=False, mode="always"),
sign_in_frequency=SignInFrequency(
is_enabled=False,
frequency=None,
type=None,
interval=SignInFrequencyInterval.EVERY_TIME,
),
application_enforced_restrictions=ApplicationEnforcedRestrictions(
is_enabled=False
),
)
def _default_grant_controls():
"""Return default grant controls requiring MFA for test policies."""
return GrantControls(
built_in_controls=[ConditionalAccessGrantControl.MFA],
operator=GrantControlOperator.AND,
authentication_strength=None,
)
class Test_entra_conditional_access_policy_directory_sync_account_excluded:
"""Test class for Directory Sync Account exclusion check."""
def test_no_conditional_access_policies(self):
"""Test PASS when no Conditional Access policies exist."""
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
entra_client.conditional_access_policies = {}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
)
assert result[0].resource == {}
assert result[0].resource_name == "Conditional Access Policies"
assert result[0].resource_id == "conditionalAccessPolicies"
assert result[0].location == "global"
def test_policy_disabled(self):
"""Test PASS when only a disabled policy exists targeting all users and apps."""
policy_id = str(uuid4())
display_name = "Require MFA for All Users"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.DISABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
)
assert result[0].resource == {}
assert result[0].resource_name == "Conditional Access Policies"
assert result[0].resource_id == "conditionalAccessPolicies"
assert result[0].location == "global"
def test_policy_targets_specific_users(self):
"""Test PASS when the policy targets specific users, not all users."""
policy_id = str(uuid4())
display_name = "Require MFA for Admins"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=["some-group-id"],
excluded_groups=[],
included_users=[],
excluded_users=[],
included_roles=[],
excluded_roles=[],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
)
def test_policy_targets_specific_apps(self):
"""Test PASS when the policy targets specific apps, not all apps."""
policy_id = str(uuid4())
display_name = "Require MFA for Office 365"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["some-app-id"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
)
def test_policy_enabled_without_sync_exclusion(self):
"""Test FAIL when an enabled policy targets all users and all apps but does not exclude the sync role."""
policy_id = str(uuid4())
display_name = "Require MFA for All Users"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Conditional Access Policy '{display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
)
assert (
result[0].resource
== entra_client.conditional_access_policies[policy_id].dict()
)
assert result[0].resource_name == display_name
assert result[0].resource_id == policy_id
assert result[0].location == "global"
def test_policy_report_only_without_sync_exclusion(self):
"""Test FAIL when a report-only policy targets all users and apps without excluding the sync role."""
policy_id = str(uuid4())
display_name = "Report Only - Require MFA"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED_FOR_REPORTING,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Conditional Access Policy '{display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
)
assert (
result[0].resource
== entra_client.conditional_access_policies[policy_id].dict()
)
assert result[0].resource_name == display_name
assert result[0].resource_id == policy_id
assert result[0].location == "global"
def test_policy_enabled_with_sync_exclusion(self):
"""Test PASS when an enabled policy targets all users and apps and excludes the sync role."""
policy_id = str(uuid4())
display_name = "Require MFA for All Users"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[
DIRECTORY_SYNC_ROLE_TEMPLATE_ID,
],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Conditional Access Policy '{display_name}' excludes the Directory Synchronization Accounts role."
)
assert (
result[0].resource
== entra_client.conditional_access_policies[policy_id].dict()
)
assert result[0].resource_name == display_name
assert result[0].resource_id == policy_id
assert result[0].location == "global"
def test_policy_with_sync_role_and_other_excluded_roles(self):
"""Test PASS when the sync role is excluded alongside other roles."""
policy_id = str(uuid4())
display_name = "Require MFA for All Users"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[
"some-other-role-id",
DIRECTORY_SYNC_ROLE_TEMPLATE_ID,
],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Conditional Access Policy '{display_name}' excludes the Directory Synchronization Accounts role."
)
def test_multiple_policies_mixed_results(self):
"""Test multiple policies where one excludes sync role and another does not."""
policy_id_pass = str(uuid4())
policy_id_fail = str(uuid4())
display_name_pass = "MFA Policy - With Exclusion"
display_name_fail = "MFA Policy - Without Exclusion"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id_pass: ConditionalAccessPolicy(
id=policy_id_pass,
display_name=display_name_pass,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[
DIRECTORY_SYNC_ROLE_TEMPLATE_ID,
],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
),
policy_id_fail: ConditionalAccessPolicy(
id=policy_id_fail,
display_name=display_name_fail,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=[],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
),
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 2
pass_results = [r for r in result if r.status == "PASS"]
fail_results = [r for r in result if r.status == "FAIL"]
assert len(pass_results) == 1
assert len(fail_results) == 1
assert pass_results[0].resource_name == display_name_pass
assert pass_results[0].resource_id == policy_id_pass
assert (
pass_results[0].status_extended
== f"Conditional Access Policy '{display_name_pass}' excludes the Directory Synchronization Accounts role."
)
assert fail_results[0].resource_name == display_name_fail
assert fail_results[0].resource_id == policy_id_fail
assert (
fail_results[0].status_extended
== f"Conditional Access Policy '{display_name_fail}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
)
def test_policy_with_wrong_excluded_role(self):
"""Test FAIL when the policy excludes a different role but not the sync role."""
policy_id = str(uuid4())
display_name = "Require MFA for All Users"
entra_client = mock.MagicMock
entra_client.audited_tenant = "audited_tenant"
entra_client.audited_domain = DOMAIN
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_m365_provider(),
),
mock.patch(
f"{CHECK_MODULE_PATH}.entra_client",
new=entra_client,
),
):
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
entra_conditional_access_policy_directory_sync_account_excluded,
)
from prowler.providers.m365.services.entra.entra_service import (
ConditionalAccessPolicy,
)
entra_client.conditional_access_policies = {
policy_id: ConditionalAccessPolicy(
id=policy_id,
display_name=display_name,
conditions=Conditions(
application_conditions=ApplicationsConditions(
included_applications=["All"],
excluded_applications=[],
included_user_actions=[],
),
user_conditions=UsersConditions(
included_groups=[],
excluded_groups=[],
included_users=["All"],
excluded_users=[],
included_roles=[],
excluded_roles=["some-other-role-id"],
),
client_app_types=[],
user_risk_levels=[],
),
grant_controls=_default_grant_controls(),
session_controls=_default_session_controls(),
state=ConditionalAccessPolicyState.ENABLED,
)
}
check = entra_conditional_access_policy_directory_sync_account_excluded()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Conditional Access Policy '{display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
)
assert result[0].resource_name == display_name
assert result[0].resource_id == policy_id

View File

@@ -6,6 +6,12 @@
set -e
# The Python pre-commit framework (see .pre-commit-config.yaml, hook "ui-checks")
# exports GIT_WORK_TREE, GIT_DIR, and GIT_INDEX_FILE pointing to its temp staging
# area. Unset them so git commands below resolve against the real repo and index.
# See: https://github.com/prowler-cloud/prowler/pull/10574
unset GIT_WORK_TREE GIT_DIR GIT_INDEX_FILE GIT_PREFIX GIT_COMMON_DIR GIT_OBJECT_DIRECTORY
# Colors
RED='\033[0;31m'
GREEN='\033[0;32m'