mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-04-09 11:17:08 +00:00
Compare commits
8 Commits
fix/sdk-aw
...
feat/prowl
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0f109aacb | ||
|
|
406eedd68a | ||
|
|
bc38104903 | ||
|
|
9290d7e105 | ||
|
|
72e8f09c07 | ||
|
|
1d43885230 | ||
|
|
e6aedcb207 | ||
|
|
89fe867944 |
@@ -317,7 +317,10 @@ python prowler-cli.py -v
|
||||
- **Prowler SDK**: A Python SDK designed to extend the functionality of the Prowler CLI for advanced capabilities.
|
||||
- **Prowler MCP Server**: A Model Context Protocol server that provides AI tools for Lighthouse, the AI-powered security assistant. This is a critical dependency for Lighthouse functionality.
|
||||
|
||||

|
||||

|
||||
|
||||
<!-- Diagram source: docs/images/products/prowler-app-architecture.mmd — edit there, re-render at https://mermaid.live, and replace the PNG. -->
|
||||
|
||||
|
||||
## Prowler CLI
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
- Filter RBAC role lookup by `tenant_id` to prevent cross-tenant privilege leak [(#10491)](https://github.com/prowler-cloud/prowler/pull/10491)
|
||||
- `VALKEY_SCHEME`, `VALKEY_USERNAME`, and `VALKEY_PASSWORD` environment variables to configure Celery broker TLS/auth connection details for Valkey/ElastiCache [(#10420)](https://github.com/prowler-cloud/prowler/pull/10420)
|
||||
- `Vercel` provider support [(#10190)](https://github.com/prowler-cloud/prowler/pull/10190)
|
||||
- Finding groups list and latest endpoints support `sort=delta`, ordering by `new_count` then `changed_count` so groups with the most new findings rank highest [(#10606)](https://github.com/prowler-cloud/prowler/pull/10606)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
|
||||
@@ -16839,6 +16839,39 @@ class TestFindingGroupViewSet:
|
||||
data = response.json()["data"]
|
||||
assert len(data) > 0
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"endpoint_name", ["finding-group-list", "finding-group-latest"]
|
||||
)
|
||||
def test_finding_groups_sort_by_delta(
|
||||
self,
|
||||
authenticated_client,
|
||||
finding_groups_fixture,
|
||||
endpoint_name,
|
||||
):
|
||||
"""Sort by delta orders by new_count then changed_count (lexicographic)."""
|
||||
params = {"sort": "-delta"}
|
||||
if endpoint_name == "finding-group-list":
|
||||
params["filter[inserted_at]"] = TODAY
|
||||
|
||||
response = authenticated_client.get(reverse(endpoint_name), params)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
data = response.json()["data"]
|
||||
assert len(data) > 0
|
||||
|
||||
def delta_key(item):
|
||||
attrs = item["attributes"]
|
||||
return (attrs.get("new_count", 0), attrs.get("changed_count", 0))
|
||||
|
||||
desc_keys = [delta_key(item) for item in data]
|
||||
assert desc_keys == sorted(desc_keys, reverse=True)
|
||||
|
||||
# Ascending order produces the inverse arrangement
|
||||
params["sort"] = "delta"
|
||||
response = authenticated_client.get(reverse(endpoint_name), params)
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
asc_keys = [delta_key(item) for item in response.json()["data"]]
|
||||
assert asc_keys == sorted(asc_keys)
|
||||
|
||||
def test_finding_groups_latest_ignores_date_filters(
|
||||
self, authenticated_client, finding_groups_fixture
|
||||
):
|
||||
|
||||
@@ -7219,6 +7219,7 @@ class FindingGroupViewSet(BaseRLSViewSet):
|
||||
"check_id": "check_id",
|
||||
"check_title": "check_title",
|
||||
"severity": "severity_order",
|
||||
"delta": "delta_order",
|
||||
"fail_count": "fail_count",
|
||||
"pass_count": "pass_count",
|
||||
"muted_count": "muted_count",
|
||||
@@ -7569,7 +7570,20 @@ class FindingGroupViewSet(BaseRLSViewSet):
|
||||
sort_param, self._FINDING_GROUP_SORT_MAP
|
||||
)
|
||||
if ordering:
|
||||
aggregated_queryset = aggregated_queryset.order_by(*ordering)
|
||||
# delta_order is a virtual sort field: expand it to a
|
||||
# lexicographic ordering by (new_count, changed_count) so groups
|
||||
# with more new findings rank higher, with changed_count as the
|
||||
# tie-breaker (preserves the "new > changed" priority used by
|
||||
# the resources endpoint, but driven by the actual counters).
|
||||
expanded_ordering = []
|
||||
for field in ordering:
|
||||
if field.lstrip("-") == "delta_order":
|
||||
sign = "-" if field.startswith("-") else ""
|
||||
expanded_ordering.append(f"{sign}new_count")
|
||||
expanded_ordering.append(f"{sign}changed_count")
|
||||
else:
|
||||
expanded_ordering.append(field)
|
||||
aggregated_queryset = aggregated_queryset.order_by(*expanded_ordering)
|
||||
else:
|
||||
aggregated_queryset = aggregated_queryset.order_by(
|
||||
"-fail_count", "-severity_order", "check_id"
|
||||
|
||||
@@ -750,6 +750,35 @@ def init_parser(self):
|
||||
# More arguments for the provider.
|
||||
```
|
||||
|
||||
##### Sensitive CLI Arguments
|
||||
|
||||
CLI flags that accept secrets (tokens, passwords, API keys) require special handling to protect credentials from leaking in HTML output and process listings:
|
||||
|
||||
1. **Use `nargs="?"` with `default=None`** so the flag works both with and without an inline value. This allows the provider to fall back to an environment variable when no value is passed.
|
||||
2. **Add a `SENSITIVE_ARGUMENTS` frozenset** at the top of the `arguments.py` file listing every flag that accepts secret values:
|
||||
|
||||
```python
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--your-provider-password", "--your-provider-token"})
|
||||
```
|
||||
|
||||
Prowler automatically discovers these frozensets and uses them to redact values in HTML output and warn users who pass secrets directly on the command line.
|
||||
|
||||
3. **Document the environment variable** in the `help` text so users know the recommended alternative:
|
||||
|
||||
```python
|
||||
<provider_name>_parser.add_argument(
|
||||
"--your-provider-password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="PASSWORD",
|
||||
help="Password for authentication. We recommend using the YOUR_PROVIDER_PASSWORD environment variable instead.",
|
||||
)
|
||||
```
|
||||
|
||||
<Warning>
|
||||
Do not add new arguments that require passing secrets as CLI values without an environment variable fallback. Prowler CLI warns users when sensitive flags receive explicit values on the command line.
|
||||
</Warning>
|
||||
|
||||
#### Step 5: Implement Mutelist
|
||||
|
||||
**Explanation:**
|
||||
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 192 KiB |
@@ -11,16 +11,19 @@ Prowler App is a web application that simplifies running Prowler. It provides:
|
||||
|
||||
## Components
|
||||
|
||||
Prowler App consists of three main components:
|
||||
Prowler App consists of four main components:
|
||||
|
||||
- **Prowler UI**: User-friendly web interface for running Prowler and viewing results, powered by Next.js
|
||||
- **Prowler API**: Backend API that executes Prowler scans and stores results, built with Django REST Framework
|
||||
- **Prowler SDK**: Python SDK that integrates with Prowler CLI for advanced functionality
|
||||
- **Prowler MCP Server**: Model Context Protocol server that exposes AI tools for Lighthouse, the AI-powered security assistant. Required dependency for Lighthouse.
|
||||
|
||||
Supporting infrastructure includes:
|
||||
|
||||
- **PostgreSQL**: Persistent storage of scan results
|
||||
- **Celery Workers**: Asynchronous execution of Prowler scans
|
||||
- **Celery Beat (API Scheduler)**: Schedules recurring scans and enqueues jobs on the broker
|
||||
- **Valkey**: In-memory database serving as message broker for Celery workers
|
||||
- **Neo4j**: Graph database used by the Attack Paths feature to combine cloud inventory with Prowler findings (currently populated by AWS scans)
|
||||
|
||||

|
||||
|
||||
37
docs/images/products/prowler-app-architecture.mmd
Normal file
37
docs/images/products/prowler-app-architecture.mmd
Normal file
@@ -0,0 +1,37 @@
|
||||
flowchart TB
|
||||
user([User / Security Team])
|
||||
cli([Prowler CLI])
|
||||
|
||||
subgraph APP["Prowler App"]
|
||||
ui["Prowler UI<br/>(Next.js)"]
|
||||
api["Prowler API<br/>(Django REST Framework)"]
|
||||
worker["API Worker<br/>(Celery)"]
|
||||
beat["API Scheduler<br/>(Celery Beat)"]
|
||||
mcp["Prowler MCP Server<br/>(Lighthouse AI tools)"]
|
||||
end
|
||||
|
||||
sdk["Prowler SDK<br/>(Python)"]
|
||||
|
||||
subgraph DATA["Data Layer"]
|
||||
pg[("PostgreSQL")]
|
||||
valkey[("Valkey / Redis")]
|
||||
neo4j[("Neo4j")]
|
||||
end
|
||||
|
||||
providers["Providers"]
|
||||
|
||||
user --> ui
|
||||
user --> cli
|
||||
ui -->|REST| api
|
||||
api --> pg
|
||||
api --> valkey
|
||||
beat -->|enqueue jobs| valkey
|
||||
valkey -->|dispatch| worker
|
||||
worker --> pg
|
||||
worker -->|Attack Paths| neo4j
|
||||
worker -->|invokes| sdk
|
||||
cli --> sdk
|
||||
api -. AI tools .-> mcp
|
||||
mcp -. context .-> api
|
||||
|
||||
sdk --> providers
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 192 KiB After Width: | Height: | Size: 268 KiB |
@@ -66,22 +66,38 @@ prowler <provider> --categories internet-exposed
|
||||
|
||||
### Shodan
|
||||
|
||||
Prowler allows you check if any public IPs in your Cloud environments are exposed in Shodan with the `-N`/`--shodan <shodan_api_key>` option:
|
||||
Prowler can check whether any public IPs in cloud environments are exposed in Shodan using the `-N`/`--shodan` option.
|
||||
|
||||
For example, you can check if any of your AWS Elastic Compute Cloud (EC2) instances has an elastic IP exposed in Shodan:
|
||||
#### Using the Environment Variable (Recommended)
|
||||
|
||||
Set the `SHODAN_API_KEY` environment variable to avoid exposing the API key in process listings and shell history:
|
||||
|
||||
```console
|
||||
prowler aws -N/--shodan <shodan_api_key> -c ec2_elastic_ip_shodan
|
||||
export SHODAN_API_KEY=<shodan_api_key>
|
||||
```
|
||||
|
||||
Also, you can check if any of your Azure Subscription has an public IP exposed in Shodan:
|
||||
Then run Prowler with the `--shodan` flag (no value needed):
|
||||
|
||||
```console
|
||||
prowler azure -N/--shodan <shodan_api_key> -c network_public_ip_shodan
|
||||
prowler aws --shodan -c ec2_elastic_ip_shodan
|
||||
```
|
||||
|
||||
And finally, you can check if any of your GCP projects has an public IP address exposed in Shodan:
|
||||
|
||||
```console
|
||||
prowler gcp -N/--shodan <shodan_api_key> -c compute_public_address_shodan
|
||||
prowler azure --shodan -c network_public_ip_shodan
|
||||
```
|
||||
|
||||
```console
|
||||
prowler gcp --shodan -c compute_public_address_shodan
|
||||
```
|
||||
|
||||
#### Using the CLI Flag
|
||||
|
||||
Alternatively, pass the API key directly on the command line:
|
||||
|
||||
```console
|
||||
prowler aws --shodan <shodan_api_key> -c ec2_elastic_ip_shodan
|
||||
```
|
||||
|
||||
<Warning>
|
||||
Passing secret values directly on the command line exposes them in process listings and shell history. Prowler CLI displays a warning when this pattern is detected. Use the `SHODAN_API_KEY` environment variable instead.
|
||||
</Warning>
|
||||
|
||||
@@ -6,17 +6,19 @@ import { VersionBadge } from "/snippets/version-badge.mdx"
|
||||
|
||||
<VersionBadge version="5.19.0" />
|
||||
|
||||
Prowler for Google Workspace uses a **Service Account with Domain-Wide Delegation** to authenticate to the Google Workspace Admin SDK. This allows Prowler to read directory data on behalf of a super administrator without requiring an interactive login.
|
||||
Prowler for Google Workspace uses a **Service Account with Domain-Wide Delegation** to authenticate to the Google Workspace Admin SDK and the Cloud Identity Policy API. This allows Prowler to read directory data and domain-level application policies on behalf of a super administrator without requiring an interactive login.
|
||||
|
||||
## Required Open Authorization (OAuth) Scopes
|
||||
|
||||
Prowler requests the following read-only OAuth 2.0 scopes from the Google Workspace Admin SDK:
|
||||
Prowler requests the following read-only OAuth 2.0 scopes:
|
||||
|
||||
| Scope | Description |
|
||||
|-------|-------------|
|
||||
| `https://www.googleapis.com/auth/admin.directory.user.readonly` | Read access to user accounts and their admin status |
|
||||
| `https://www.googleapis.com/auth/admin.directory.domain.readonly` | Read access to domain information |
|
||||
| `https://www.googleapis.com/auth/admin.directory.customer.readonly` | Read access to customer information (Customer ID) |
|
||||
| `https://www.googleapis.com/auth/cloud-identity.policies.readonly` | Read access to domain-level application policies (required for Calendar service checks) |
|
||||
| `https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly` | Read access to admin roles and role assignments |
|
||||
|
||||
<Warning>
|
||||
The delegated user must be a **super administrator** in your Google Workspace organization. Using a non-admin account will result in permission errors when accessing the Admin SDK.
|
||||
@@ -30,13 +32,24 @@ If no GCP project exists, create one at [https://console.cloud.google.com](https
|
||||
|
||||
The project is only used to host the Service Account — it does not need to have any Google Workspace data in it.
|
||||
|
||||
### Step 2: Enable the Admin SDK API
|
||||
### Step 2: Enable Required APIs
|
||||
|
||||
1. Navigate to the [Google Cloud Console](https://console.cloud.google.com)
|
||||
2. Select the target project
|
||||
3. Navigate to **APIs & Services → Library**
|
||||
4. Search for **Admin SDK API**
|
||||
5. Click **Enable**
|
||||
In the [Google Cloud Console](https://console.cloud.google.com), select the target project and navigate to **APIs & Services → Library**. Search for and enable each of the following APIs:
|
||||
|
||||
| API | Required For |
|
||||
|-----|--------------|
|
||||
| **Admin SDK API** | Directory service checks (users, roles, domains) |
|
||||
| **Cloud Identity API** | Calendar service checks (domain-level sharing and invitation policies) |
|
||||
|
||||
For each API:
|
||||
|
||||
1. Search for the API name in the library
|
||||
2. Click the API result
|
||||
3. Click **Enable**
|
||||
|
||||
<Note>
|
||||
Both APIs must be enabled in the same GCP project that hosts the Service Account. Calendar checks will return no findings if the Cloud Identity API is not enabled.
|
||||
</Note>
|
||||
|
||||
### Step 3: Create a Service Account
|
||||
|
||||
@@ -73,7 +86,7 @@ This JSON key grants access to your Google Workspace organization. Never commit
|
||||
6. In the **OAuth scopes** field, enter the following scopes as a comma-separated list:
|
||||
|
||||
```
|
||||
https://www.googleapis.com/auth/admin.directory.user.readonly,https://www.googleapis.com/auth/admin.directory.domain.readonly,https://www.googleapis.com/auth/admin.directory.customer.readonly
|
||||
https://www.googleapis.com/auth/admin.directory.user.readonly,https://www.googleapis.com/auth/admin.directory.domain.readonly,https://www.googleapis.com/auth/admin.directory.customer.readonly,https://www.googleapis.com/auth/cloud-identity.policies.readonly,https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly
|
||||
```
|
||||
|
||||
7. Click **Authorize**
|
||||
@@ -114,7 +127,7 @@ The delegated user must be provided via the `GOOGLEWORKSPACE_DELEGATED_USER` env
|
||||
|
||||
- **Use environment variables** — Never hardcode credentials in scripts or commands
|
||||
- **Use a dedicated Service Account** — Create one specifically for Prowler, separate from other integrations
|
||||
- **Use read-only scopes** — Prowler only requires the three read-only scopes listed above
|
||||
- **Use read-only scopes** — Prowler only requires the read-only scopes listed above
|
||||
- **Restrict key access** — Set file permissions to `600` on the JSON key file
|
||||
- **Rotate keys regularly** — Delete and regenerate the JSON key periodically
|
||||
- **Use a least-privilege super admin** — Consider using a dedicated super admin account for Prowler's delegated user rather than a personal admin account
|
||||
@@ -151,7 +164,7 @@ python3 -c "import json; json.load(open('/path/to/key.json'))" && echo "Valid JS
|
||||
The Service Account cannot impersonate the delegated user. This usually means Domain-Wide Delegation has not been configured, or the OAuth scopes are incorrect. Verify:
|
||||
|
||||
- The Service Account Client ID is correctly entered in the Admin Console
|
||||
- All three required OAuth scopes are included
|
||||
- All required OAuth scopes are included
|
||||
- The delegated user is a super administrator
|
||||
|
||||
### Permission Denied on Admin SDK Calls
|
||||
@@ -159,5 +172,14 @@ The Service Account cannot impersonate the delegated user. This usually means Do
|
||||
If Prowler connects but returns empty results or permission errors for specific API calls:
|
||||
|
||||
- Confirm Domain-Wide Delegation is fully propagated (wait a few minutes after setup)
|
||||
- Verify all three scopes are authorized in the Admin Console
|
||||
- Verify all scopes are authorized in the Admin Console
|
||||
- Ensure the delegated user is an active super administrator
|
||||
|
||||
### Calendar Checks Return No Findings
|
||||
|
||||
If the Directory checks run successfully but the Calendar checks (e.g., `calendar_external_sharing_primary_calendar`) return no findings, the Cloud Identity Policy API is not reachable for this Service Account. Verify:
|
||||
|
||||
- The **Cloud Identity API** is enabled in the GCP project hosting the Service Account (Step 2)
|
||||
- The scope `https://www.googleapis.com/auth/cloud-identity.policies.readonly` is included in the Domain-Wide Delegation OAuth scopes list in the Admin Console (Step 5)
|
||||
- The delegated user is a super administrator (the Policy API only returns data to super admins)
|
||||
- Domain-Wide Delegation has had time to propagate after adding the new scope (a few minutes)
|
||||
|
||||
@@ -78,7 +78,7 @@ The Service Account JSON is the full content of the key file downloaded when cre
|
||||

|
||||
|
||||
<Note>
|
||||
If the connection test fails, verify that Domain-Wide Delegation is properly configured and that all three OAuth scopes are authorized. It may take a few minutes for delegation changes to propagate. See the [Troubleshooting](/user-guide/providers/googleworkspace/authentication#troubleshooting) section for common errors.
|
||||
If the connection test fails, verify that Domain-Wide Delegation is properly configured and that all required OAuth scopes are authorized. It may take a few minutes for delegation changes to propagate. See the [Troubleshooting](/user-guide/providers/googleworkspace/authentication#troubleshooting) section for common errors.
|
||||
</Note>
|
||||
|
||||
### Step 5: Launch the Scan
|
||||
|
||||
@@ -11,11 +11,13 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- `glue_etl_jobs_no_secrets_in_arguments` check for plaintext secrets in AWS Glue ETL job arguments [(#10368)](https://github.com/prowler-cloud/prowler/pull/10368)
|
||||
- `awslambda_function_no_dead_letter_queue`, `awslambda_function_using_cross_account_layers`, and `awslambda_function_env_vars_not_encrypted_with_cmk` checks for AWS Lambda [(#10381)](https://github.com/prowler-cloud/prowler/pull/10381)
|
||||
- `entra_conditional_access_policy_mdm_compliant_device_required` check for M365 provider [(#10220)](https://github.com/prowler-cloud/prowler/pull/10220)
|
||||
- `directory_super_admin_only_admin_roles` check for Google Workspace provider [(#10488)](https://github.com/prowler-cloud/prowler/pull/10488)
|
||||
- `ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip` check for AWS provider using `ipaddress.is_global` for accurate public IP detection [(#10335)](https://github.com/prowler-cloud/prowler/pull/10335)
|
||||
- `entra_conditional_access_policy_block_o365_elevated_insider_risk` check for M365 provider [(#10232)](https://github.com/prowler-cloud/prowler/pull/10232)
|
||||
- `--resource-group` and `--list-resource-groups` CLI flags to filter checks by resource group across all providers [(#10479)](https://github.com/prowler-cloud/prowler/pull/10479)
|
||||
- CISA SCuBA Google Workspace Baselines compliance [(#10466)](https://github.com/prowler-cloud/prowler/pull/10466)
|
||||
- CIS Google Workspace Foundations Benchmark v1.3.0 compliance [(#10462)](https://github.com/prowler-cloud/prowler/pull/10462)
|
||||
- `calendar_external_sharing_primary_calendar`, `calendar_external_sharing_secondary_calendar`, and `calendar_external_invitations_warning` checks for Google Workspace provider using the Cloud Identity Policy API [(#10597)](https://github.com/prowler-cloud/prowler/pull/10597)
|
||||
- `entra_conditional_access_policy_device_registration_mfa_required` check and `entra_intune_enrollment_sign_in_frequency_every_time` enhancement for M365 provider [(#10222)](https://github.com/prowler-cloud/prowler/pull/10222)
|
||||
- `entra_conditional_access_policy_block_elevated_insider_risk` check for M365 provider [(#10234)](https://github.com/prowler-cloud/prowler/pull/10234)
|
||||
- `Vercel` provider support with 30 checks [(#10189)](https://github.com/prowler-cloud/prowler/pull/10189)
|
||||
@@ -24,6 +26,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
|
||||
- Added `internet-exposed` category to 13 AWS checks (CloudFront, CodeArtifact, EC2, EFS, RDS, SageMaker, Shield, VPC) [(#10502)](https://github.com/prowler-cloud/prowler/pull/10502)
|
||||
- Minimum Python version from 3.9 to 3.10 and updated classifiers to reflect supported versions (3.10, 3.11, 3.12) [(#10464)](https://github.com/prowler-cloud/prowler/pull/10464)
|
||||
- Sensitive CLI flags now warn when values are passed directly, recommending environment variables instead [(#10532)](https://github.com/prowler-cloud/prowler/pull/10532)
|
||||
|
||||
### 🐞 Fixed
|
||||
|
||||
@@ -33,6 +36,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- `--list-checks` and `--list-checks-json` now include `threat-detection` category checks in their output [(#10578)](https://github.com/prowler-cloud/prowler/pull/10578)
|
||||
- Missing `__init__.py` in `codebuild_project_uses_allowed_github_organizations` check preventing discovery by `--list-checks` [(#10584)](https://github.com/prowler-cloud/prowler/pull/10584)
|
||||
- Azure Key Vault checks emitting incorrect findings for keys, secrets, and vault logging [(#10332)](https://github.com/prowler-cloud/prowler/pull/10332)
|
||||
- `is_policy_public` now recognizes `kms:CallerAccount`, `kms:ViaService`, `aws:CalledVia`, `aws:CalledViaFirst`, and `aws:CalledViaLast` as restrictive condition keys, fixing false positives in `kms_key_policy_is_not_public` and other checks that use `is_condition_block_restrictive` [(#10600)](https://github.com/prowler-cloud/prowler/pull/10600)
|
||||
- `_enabled_regions` empty-set bug in `AwsProvider.generate_regional_clients` creating boto3 clients for all 36 AWS regions instead of the audited ones, causing random CI timeouts and slow test runs [(#10598)](https://github.com/prowler-cloud/prowler/pull/10598)
|
||||
- Retrieve only the latest version from a package in AWS CodeArtifact [(#10243)](https://github.com/prowler-cloud/prowler/pull/10243)
|
||||
|
||||
|
||||
@@ -54,7 +54,9 @@
|
||||
{
|
||||
"Id": "1.1.3",
|
||||
"Description": "Ensure super admin accounts are used only for super admin activities",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"directory_super_admin_only_admin_roles"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "1 Directory",
|
||||
@@ -96,7 +98,9 @@
|
||||
{
|
||||
"Id": "3.1.1.1.1",
|
||||
"Description": "Ensure external sharing options for primary calendars are configured",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"calendar_external_sharing_primary_calendar"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "3 Apps",
|
||||
@@ -138,7 +142,9 @@
|
||||
{
|
||||
"Id": "3.1.1.1.3",
|
||||
"Description": "Ensure external invitation warnings for Google Calendar are configured",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"calendar_external_invitations_warning"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "3 Apps",
|
||||
@@ -159,7 +165,9 @@
|
||||
{
|
||||
"Id": "3.1.1.2.1",
|
||||
"Description": "Ensure external sharing options for secondary calendars are configured",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"calendar_external_sharing_secondary_calendar"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "3 Apps",
|
||||
|
||||
@@ -1310,7 +1310,9 @@
|
||||
{
|
||||
"Id": "GWS.CALENDAR.1.1",
|
||||
"Description": "External Sharing Options for Primary Calendars SHALL be configured to Only free/busy information (hide event details)",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"calendar_external_sharing_primary_calendar"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "Calendar",
|
||||
@@ -1323,7 +1325,9 @@
|
||||
{
|
||||
"Id": "GWS.CALENDAR.1.2",
|
||||
"Description": "External sharing options for secondary calendars SHALL be configured to Only free/busy information (hide event details)",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"calendar_external_sharing_secondary_calendar"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "Calendar",
|
||||
@@ -1336,7 +1340,9 @@
|
||||
{
|
||||
"Id": "GWS.CALENDAR.2.1",
|
||||
"Description": "External invitations warnings SHALL be enabled to prompt users before sending invitations",
|
||||
"Checks": [],
|
||||
"Checks": [
|
||||
"calendar_external_invitations_warning"
|
||||
],
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "Calendar",
|
||||
|
||||
@@ -207,6 +207,7 @@
|
||||
"entra_admin_portals_access_restriction",
|
||||
"entra_admin_users_phishing_resistant_mfa_enabled",
|
||||
"entra_conditional_access_policy_block_o365_elevated_insider_risk",
|
||||
"entra_conditional_access_policy_directory_sync_account_excluded",
|
||||
"entra_policy_guest_users_access_restrictions",
|
||||
"entra_seamless_sso_disabled"
|
||||
]
|
||||
@@ -685,6 +686,7 @@
|
||||
"entra_conditional_access_policy_approved_client_app_required_for_mobile",
|
||||
"entra_conditional_access_policy_app_enforced_restrictions",
|
||||
"entra_conditional_access_policy_block_elevated_insider_risk",
|
||||
"entra_conditional_access_policy_directory_sync_account_excluded",
|
||||
"entra_conditional_access_policy_block_o365_elevated_insider_risk",
|
||||
"entra_policy_guest_users_access_restrictions",
|
||||
"sharepoint_external_sharing_restricted"
|
||||
@@ -711,6 +713,7 @@
|
||||
"entra_break_glass_account_fido2_security_key_registered",
|
||||
"entra_conditional_access_policy_approved_client_app_required_for_mobile",
|
||||
"entra_conditional_access_policy_device_code_flow_blocked",
|
||||
"entra_conditional_access_policy_directory_sync_account_excluded",
|
||||
"entra_identity_protection_sign_in_risk_enabled",
|
||||
"entra_managed_device_required_for_authentication",
|
||||
"entra_seamless_sso_disabled",
|
||||
|
||||
@@ -12,6 +12,7 @@ from prowler.config.config import (
|
||||
default_output_directory,
|
||||
)
|
||||
from prowler.lib.check.models import Severity
|
||||
from prowler.lib.cli.redact import warn_sensitive_argument_values
|
||||
from prowler.lib.outputs.common import Status
|
||||
from prowler.providers.common.arguments import (
|
||||
init_providers_parser,
|
||||
@@ -19,8 +20,6 @@ from prowler.providers.common.arguments import (
|
||||
validate_provider_arguments,
|
||||
)
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
|
||||
|
||||
|
||||
class ProwlerArgumentParser:
|
||||
# Set the default parser
|
||||
@@ -126,6 +125,10 @@ Detailed documentation at https://docs.prowler.com
|
||||
elif sys.argv[1] == "oci":
|
||||
sys.argv[1] = "oraclecloud"
|
||||
|
||||
# Warn about sensitive flags passed with explicit values
|
||||
# Snapshot argv before parse_args() which may exit on errors
|
||||
warn_sensitive_argument_values(list(sys.argv[1:]))
|
||||
|
||||
# Parse arguments
|
||||
args = self.parser.parse_args()
|
||||
|
||||
@@ -434,7 +437,7 @@ Detailed documentation at https://docs.prowler.com
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="SHODAN_API_KEY",
|
||||
help="Check if any public IPs in your Cloud environments are exposed in Shodan.",
|
||||
help="Check if any public IPs in your Cloud environments are exposed in Shodan. We recommend to use the SHODAN_API_KEY environment variable to provide the API key.",
|
||||
)
|
||||
third_party_subparser.add_argument(
|
||||
"--slack",
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
from functools import lru_cache
|
||||
from importlib import import_module
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.lib.cli.sensitive import SENSITIVE_ARGUMENTS as COMMON_SENSITIVE_ARGUMENTS
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.common.provider import Provider, providers_path
|
||||
|
||||
@@ -13,11 +16,7 @@ def get_sensitive_arguments() -> frozenset:
|
||||
sensitive: set[str] = set()
|
||||
|
||||
# Common parser sensitive arguments (e.g., --shodan)
|
||||
try:
|
||||
parser_module = import_module("prowler.lib.cli.parser")
|
||||
sensitive.update(getattr(parser_module, "SENSITIVE_ARGUMENTS", frozenset()))
|
||||
except Exception as error:
|
||||
logger.debug(f"Could not load SENSITIVE_ARGUMENTS from parser: {error}")
|
||||
sensitive.update(COMMON_SENSITIVE_ARGUMENTS)
|
||||
|
||||
# Provider-specific sensitive arguments
|
||||
for provider in Provider.get_available_providers():
|
||||
@@ -66,3 +65,49 @@ def redact_argv(argv: list[str]) -> str:
|
||||
result.append(arg)
|
||||
|
||||
return " ".join(result)
|
||||
|
||||
|
||||
def warn_sensitive_argument_values(argv: list[str]) -> None:
|
||||
"""Log a warning for each sensitive CLI flag that was passed with an explicit value.
|
||||
|
||||
Scans the raw argv list (not parsed args) to detect when users pass
|
||||
secret values directly on the command line instead of using environment
|
||||
variables. Handles both ``--flag value`` and ``--flag=value`` syntax.
|
||||
|
||||
Args:
|
||||
argv: The argument list to check (typically ``sys.argv[1:]``).
|
||||
"""
|
||||
sensitive = get_sensitive_arguments()
|
||||
if not sensitive:
|
||||
return
|
||||
|
||||
use_color = "--no-color" not in argv
|
||||
flags_with_values: list[str] = []
|
||||
|
||||
for i, arg in enumerate(argv):
|
||||
# --flag=value syntax
|
||||
if "=" in arg:
|
||||
flag = arg.split("=", 1)[0]
|
||||
if flag in sensitive:
|
||||
flags_with_values.append(flag)
|
||||
continue
|
||||
|
||||
# --flag value syntax
|
||||
if arg in sensitive:
|
||||
if i + 1 < len(argv) and not argv[i + 1].startswith("-"):
|
||||
flags_with_values.append(arg)
|
||||
|
||||
for flag in flags_with_values:
|
||||
if use_color:
|
||||
logger.warning(
|
||||
f"{Fore.YELLOW}{Style.BRIGHT}WARNING:{Style.RESET_ALL}{Fore.YELLOW} "
|
||||
f"Passing a value directly to {flag} is not recommended. "
|
||||
f"Use the corresponding environment variable instead to avoid "
|
||||
f"exposing secrets in process listings and shell history.{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Passing a value directly to {flag} is not recommended. "
|
||||
f"Use the corresponding environment variable instead to avoid "
|
||||
f"exposing secrets in process listings and shell history."
|
||||
)
|
||||
|
||||
8
prowler/lib/cli/sensitive.py
Normal file
8
prowler/lib/cli/sensitive.py
Normal file
@@ -0,0 +1,8 @@
|
||||
"""Common parser sensitive arguments.
|
||||
|
||||
This module is kept dependency-free (no prowler-internal imports) so that
|
||||
``prowler.lib.cli.redact`` and any provider argument module can import it
|
||||
without circular-import risk.
|
||||
"""
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
|
||||
@@ -617,6 +617,11 @@ def is_condition_block_restrictive(
|
||||
"aws:sourceorgpaths",
|
||||
"aws:userid",
|
||||
"aws:username",
|
||||
"aws:calledvia",
|
||||
"aws:calledviafirst",
|
||||
"aws:calledvialast",
|
||||
"kms:calleraccount",
|
||||
"kms:viaservice",
|
||||
"s3:resourceaccount",
|
||||
"lambda:eventsourcetoken", # For Alexa Home functions, a token that the invoker must supply.
|
||||
],
|
||||
@@ -635,6 +640,11 @@ def is_condition_block_restrictive(
|
||||
"aws:sourceorgpaths",
|
||||
"aws:userid",
|
||||
"aws:username",
|
||||
"aws:calledvia",
|
||||
"aws:calledviafirst",
|
||||
"aws:calledvialast",
|
||||
"kms:calleraccount",
|
||||
"kms:viaservice",
|
||||
"s3:resourceaccount",
|
||||
"lambda:eventsourcetoken",
|
||||
],
|
||||
|
||||
@@ -59,11 +59,14 @@ class GoogleworkspaceProvider(Provider):
|
||||
_mutelist: GoogleWorkspaceMutelist
|
||||
audit_metadata: Audit_Metadata
|
||||
|
||||
# Google Workspace Admin SDK OAuth2 scopes
|
||||
DIRECTORY_SCOPES = [
|
||||
# Google Workspace OAuth2 scopes
|
||||
SCOPES = [
|
||||
"https://www.googleapis.com/auth/admin.directory.user.readonly",
|
||||
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
|
||||
"https://www.googleapis.com/auth/admin.directory.customer.readonly",
|
||||
# Cloud Identity Policy API (calendar and other app policies)
|
||||
"https://www.googleapis.com/auth/cloud-identity.policies.readonly",
|
||||
"https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly",
|
||||
]
|
||||
|
||||
def __init__(
|
||||
@@ -214,7 +217,7 @@ class GoogleworkspaceProvider(Provider):
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_file(
|
||||
credentials_file,
|
||||
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
|
||||
scopes=GoogleworkspaceProvider.SCOPES,
|
||||
)
|
||||
except FileNotFoundError as error:
|
||||
raise GoogleWorkspaceInvalidCredentialsError(
|
||||
@@ -241,7 +244,7 @@ class GoogleworkspaceProvider(Provider):
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_info(
|
||||
credentials_data,
|
||||
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
|
||||
scopes=GoogleworkspaceProvider.SCOPES,
|
||||
)
|
||||
except ValueError as error:
|
||||
raise GoogleWorkspaceInvalidCredentialsError(
|
||||
@@ -264,7 +267,7 @@ class GoogleworkspaceProvider(Provider):
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_file(
|
||||
env_file,
|
||||
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
|
||||
scopes=GoogleworkspaceProvider.SCOPES,
|
||||
)
|
||||
except FileNotFoundError as error:
|
||||
raise GoogleWorkspaceInvalidCredentialsError(
|
||||
@@ -293,7 +296,7 @@ class GoogleworkspaceProvider(Provider):
|
||||
try:
|
||||
credentials = service_account.Credentials.from_service_account_info(
|
||||
credentials_data,
|
||||
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
|
||||
scopes=GoogleworkspaceProvider.SCOPES,
|
||||
)
|
||||
except ValueError as error:
|
||||
raise GoogleWorkspaceInvalidCredentialsError(
|
||||
@@ -414,7 +417,7 @@ class GoogleworkspaceProvider(Provider):
|
||||
)
|
||||
|
||||
# Fetch all domains (primary + aliases) to support domain aliases
|
||||
# The scope admin.directory.domain.readonly is already in DIRECTORY_SCOPES
|
||||
# The scope admin.directory.domain.readonly is already in SCOPES above
|
||||
try:
|
||||
domains_response = service.domains().list(customer="my_customer").execute()
|
||||
valid_domains = [
|
||||
|
||||
@@ -0,0 +1,6 @@
|
||||
from prowler.providers.common.provider import Provider
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
Calendar,
|
||||
)
|
||||
|
||||
calendar_client = Calendar(Provider.get_global_provider())
|
||||
@@ -0,0 +1,41 @@
|
||||
{
|
||||
"Provider": "googleworkspace",
|
||||
"CheckID": "calendar_external_invitations_warning",
|
||||
"CheckTitle": "External invitation warnings are enabled for Google Calendar",
|
||||
"CheckType": [],
|
||||
"ServiceName": "calendar",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "NotDefined",
|
||||
"ResourceGroup": "collaboration",
|
||||
"Description": "Google Calendar **warns users** when they invite guests from outside the organization to an event. This prompt gives users a chance to reconsider before sharing meeting details with external parties, reducing the likelihood of **accidental information disclosure** through calendar invitations.",
|
||||
"Risk": "Without external invitation warnings, users may unintentionally include **external guests** in internal meetings, exposing **confidential meeting details**, agendas, and internal attendee lists to unauthorized parties. This is a common vector for inadvertent data leakage through everyday calendar actions.",
|
||||
"RelatedUrl": "",
|
||||
"AdditionalURLs": [
|
||||
"https://support.google.com/a/answer/6329284",
|
||||
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
|
||||
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
|
||||
],
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External invitations**, check **Warn users when inviting guests outside of the domain**\n5. Click **Save**",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable external invitation warnings so users are notified whenever a meeting invitation includes guests outside the organization. This simple prompt helps prevent accidental disclosure of meeting details to unintended recipients.",
|
||||
"Url": "https://hub.prowler.com/check/calendar_external_invitations_warning"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [
|
||||
"calendar_external_sharing_primary_calendar",
|
||||
"calendar_external_sharing_secondary_calendar"
|
||||
],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
|
||||
calendar_client,
|
||||
)
|
||||
|
||||
|
||||
class calendar_external_invitations_warning(Check):
|
||||
"""Check that external invitation warnings are enabled for Google Calendar
|
||||
|
||||
This check verifies that the domain-level policy warns users when they
|
||||
invite guests from outside the organization, reducing the risk of accidental
|
||||
information disclosure through calendar events.
|
||||
"""
|
||||
|
||||
def execute(self) -> List[CheckReportGoogleWorkspace]:
|
||||
findings = []
|
||||
|
||||
if calendar_client.policies_fetched:
|
||||
report = CheckReportGoogleWorkspace(
|
||||
metadata=self.metadata(),
|
||||
resource=calendar_client.provider.identity,
|
||||
resource_name=calendar_client.provider.identity.domain,
|
||||
resource_id=calendar_client.provider.identity.customer_id,
|
||||
customer_id=calendar_client.provider.identity.customer_id,
|
||||
location="global",
|
||||
)
|
||||
|
||||
warning_enabled = calendar_client.policies.external_invitations_warning
|
||||
|
||||
if warning_enabled is True:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"External invitation warnings for Google Calendar are enabled "
|
||||
f"in domain {calendar_client.provider.identity.domain}."
|
||||
)
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
if warning_enabled is None:
|
||||
report.status_extended = (
|
||||
f"External invitation warnings for Google Calendar are not "
|
||||
f"explicitly configured in domain "
|
||||
f"{calendar_client.provider.identity.domain}. "
|
||||
f"Users should be warned when inviting guests outside the organization."
|
||||
)
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"External invitation warnings for Google Calendar are disabled "
|
||||
f"in domain {calendar_client.provider.identity.domain}. "
|
||||
f"Users should be warned when inviting guests outside the organization."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,41 @@
|
||||
{
|
||||
"Provider": "googleworkspace",
|
||||
"CheckID": "calendar_external_sharing_primary_calendar",
|
||||
"CheckTitle": "External sharing for primary calendars is restricted to free/busy only",
|
||||
"CheckType": [],
|
||||
"ServiceName": "calendar",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "NotDefined",
|
||||
"ResourceGroup": "collaboration",
|
||||
"Description": "Primary calendars in the Google Workspace domain share **only free/busy information** with external users. When external sharing is set to share full event details, sensitive information such as meeting titles, attendees, locations, and descriptions is exposed to users outside the organization.",
|
||||
"Risk": "Overly permissive external sharing of primary calendars exposes **sensitive meeting metadata** — titles, attendees, locations, and descriptions — to users outside the organization. This increases the risk of **information disclosure**, **social engineering**, and **targeted phishing** based on insights into organizational activities.",
|
||||
"RelatedUrl": "",
|
||||
"AdditionalURLs": [
|
||||
"https://support.google.com/a/answer/60765",
|
||||
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
|
||||
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
|
||||
],
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External sharing options for primary calendars**, select **Only free/busy information (hide event details)**\n5. Click **Save**",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Restrict external sharing of primary calendars to free/busy information only. This preserves scheduling functionality with external users while preventing exposure of sensitive meeting details.",
|
||||
"Url": "https://hub.prowler.com/check/calendar_external_sharing_primary_calendar"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [
|
||||
"calendar_external_sharing_secondary_calendar",
|
||||
"calendar_external_invitations_warning"
|
||||
],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
|
||||
calendar_client,
|
||||
)
|
||||
|
||||
|
||||
class calendar_external_sharing_primary_calendar(Check):
|
||||
"""Check that external sharing for primary calendars is restricted to free/busy only
|
||||
|
||||
This check verifies that the domain-level policy for primary calendar external
|
||||
sharing is set to share only free/busy information, preventing exposure of
|
||||
event details to external users.
|
||||
"""
|
||||
|
||||
def execute(self) -> List[CheckReportGoogleWorkspace]:
|
||||
findings = []
|
||||
|
||||
if calendar_client.policies_fetched:
|
||||
report = CheckReportGoogleWorkspace(
|
||||
metadata=self.metadata(),
|
||||
resource=calendar_client.provider.identity,
|
||||
resource_name=calendar_client.provider.identity.domain,
|
||||
resource_id=calendar_client.provider.identity.customer_id,
|
||||
customer_id=calendar_client.provider.identity.customer_id,
|
||||
location="global",
|
||||
)
|
||||
|
||||
sharing = calendar_client.policies.primary_calendar_external_sharing
|
||||
|
||||
if sharing == "EXTERNAL_FREE_BUSY_ONLY":
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"Primary calendar external sharing in domain "
|
||||
f"{calendar_client.provider.identity.domain} is restricted to "
|
||||
f"free/busy information only."
|
||||
)
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
if sharing is None:
|
||||
report.status_extended = (
|
||||
f"Primary calendar external sharing is not explicitly configured "
|
||||
f"in domain {calendar_client.provider.identity.domain}. "
|
||||
f"External sharing should be restricted to free/busy information only."
|
||||
)
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"Primary calendar external sharing in domain "
|
||||
f"{calendar_client.provider.identity.domain} is set to {sharing}. "
|
||||
f"External sharing should be restricted to free/busy information only."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,41 @@
|
||||
{
|
||||
"Provider": "googleworkspace",
|
||||
"CheckID": "calendar_external_sharing_secondary_calendar",
|
||||
"CheckTitle": "External sharing for secondary calendars is restricted to free/busy only",
|
||||
"CheckType": [],
|
||||
"ServiceName": "calendar",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "NotDefined",
|
||||
"ResourceGroup": "collaboration",
|
||||
"Description": "Secondary calendars in the Google Workspace domain share **only free/busy information** with external users. Secondary calendars are additional calendars users create beyond their primary calendar (e.g., for projects, teams, or personal events), and are commonly used to organize sensitive or focused activities that should not be visible to external parties.",
|
||||
"Risk": "Overly permissive external sharing of secondary calendars exposes **project-specific or team-specific event details** to users outside the organization. Because secondary calendars often hold more targeted activities (e.g., product launches, internal reviews), unrestricted external sharing increases the risk of **information disclosure** and **competitive intelligence leakage**.",
|
||||
"RelatedUrl": "",
|
||||
"AdditionalURLs": [
|
||||
"https://support.google.com/a/answer/60765",
|
||||
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
|
||||
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
|
||||
],
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External sharing options for secondary calendars**, select **Only free/busy information (hide event details)**\n5. Click **Save**",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Restrict external sharing of secondary calendars to free/busy information only. This preserves scheduling interoperability with external collaborators while preventing exposure of sensitive event details in user-created calendars.",
|
||||
"Url": "https://hub.prowler.com/check/calendar_external_sharing_secondary_calendar"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [
|
||||
"calendar_external_sharing_primary_calendar",
|
||||
"calendar_external_invitations_warning"
|
||||
],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
|
||||
calendar_client,
|
||||
)
|
||||
|
||||
|
||||
class calendar_external_sharing_secondary_calendar(Check):
|
||||
"""Check that external sharing for secondary calendars is restricted to free/busy only
|
||||
|
||||
This check verifies that the domain-level policy for secondary calendar external
|
||||
sharing is set to share only free/busy information, preventing exposure of
|
||||
event details in user-created calendars to external users.
|
||||
"""
|
||||
|
||||
def execute(self) -> List[CheckReportGoogleWorkspace]:
|
||||
findings = []
|
||||
|
||||
if calendar_client.policies_fetched:
|
||||
report = CheckReportGoogleWorkspace(
|
||||
metadata=self.metadata(),
|
||||
resource=calendar_client.provider.identity,
|
||||
resource_name=calendar_client.provider.identity.domain,
|
||||
resource_id=calendar_client.provider.identity.customer_id,
|
||||
customer_id=calendar_client.provider.identity.customer_id,
|
||||
location="global",
|
||||
)
|
||||
|
||||
sharing = calendar_client.policies.secondary_calendar_external_sharing
|
||||
|
||||
if sharing == "EXTERNAL_FREE_BUSY_ONLY":
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"Secondary calendar external sharing in domain "
|
||||
f"{calendar_client.provider.identity.domain} is restricted to "
|
||||
f"free/busy information only."
|
||||
)
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
if sharing is None:
|
||||
report.status_extended = (
|
||||
f"Secondary calendar external sharing is not explicitly configured "
|
||||
f"in domain {calendar_client.provider.identity.domain}. "
|
||||
f"External sharing should be restricted to free/busy information only."
|
||||
)
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"Secondary calendar external sharing in domain "
|
||||
f"{calendar_client.provider.identity.domain} is set to {sharing}. "
|
||||
f"External sharing should be restricted to free/busy information only."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,112 @@
|
||||
from typing import Optional
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.googleworkspace.lib.service.service import GoogleWorkspaceService
|
||||
|
||||
|
||||
class Calendar(GoogleWorkspaceService):
|
||||
"""Google Workspace Calendar service for auditing domain-level calendar policies.
|
||||
|
||||
Uses the Cloud Identity Policy API v1 to read calendar sharing
|
||||
and invitation settings configured in the Admin Console.
|
||||
"""
|
||||
|
||||
def __init__(self, provider):
|
||||
super().__init__(provider)
|
||||
self.policies = CalendarPolicies()
|
||||
self.policies_fetched = False
|
||||
self._fetch_calendar_policies()
|
||||
|
||||
def _fetch_calendar_policies(self):
|
||||
"""Fetch calendar policies from the Cloud Identity Policy API v1."""
|
||||
logger.info("Calendar - Fetching calendar policies...")
|
||||
|
||||
try:
|
||||
service = self._build_service("cloudidentity", "v1")
|
||||
|
||||
if not service:
|
||||
logger.error("Failed to build Cloud Identity service")
|
||||
return
|
||||
|
||||
request = service.policies().list(pageSize=100)
|
||||
fetch_succeeded = True
|
||||
|
||||
while request is not None:
|
||||
try:
|
||||
response = request.execute()
|
||||
|
||||
for policy in response.get("policies", []):
|
||||
setting = policy.get("setting", {})
|
||||
setting_type = setting.get("type", "").removeprefix("settings/")
|
||||
value = setting.get("value", {})
|
||||
|
||||
if (
|
||||
setting_type
|
||||
== "calendar.primary_calendar_max_allowed_external_sharing"
|
||||
):
|
||||
self.policies.primary_calendar_external_sharing = value.get(
|
||||
"maxAllowedExternalSharing"
|
||||
)
|
||||
logger.debug(
|
||||
"Primary calendar external sharing: "
|
||||
f"{self.policies.primary_calendar_external_sharing}"
|
||||
)
|
||||
|
||||
elif (
|
||||
setting_type
|
||||
== "calendar.secondary_calendar_max_allowed_external_sharing"
|
||||
):
|
||||
self.policies.secondary_calendar_external_sharing = (
|
||||
value.get("maxAllowedExternalSharing")
|
||||
)
|
||||
logger.debug(
|
||||
"Secondary calendar external sharing: "
|
||||
f"{self.policies.secondary_calendar_external_sharing}"
|
||||
)
|
||||
|
||||
elif setting_type == "calendar.external_invitations":
|
||||
self.policies.external_invitations_warning = value.get(
|
||||
"warnOnInvite"
|
||||
)
|
||||
logger.debug(
|
||||
"External invitations warning: "
|
||||
f"{self.policies.external_invitations_warning}"
|
||||
)
|
||||
|
||||
request = service.policies().list_next(request, response)
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(
|
||||
error,
|
||||
"fetching calendar policies",
|
||||
self.provider.identity.customer_id,
|
||||
)
|
||||
fetch_succeeded = False
|
||||
break
|
||||
|
||||
self.policies_fetched = fetch_succeeded
|
||||
|
||||
logger.info(
|
||||
f"Calendar policies fetched - "
|
||||
f"Primary sharing: {self.policies.primary_calendar_external_sharing}, "
|
||||
f"Secondary sharing: {self.policies.secondary_calendar_external_sharing}, "
|
||||
f"Invitation warnings: {self.policies.external_invitations_warning}"
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(
|
||||
error,
|
||||
"fetching calendar policies",
|
||||
self.provider.identity.customer_id,
|
||||
)
|
||||
self.policies_fetched = False
|
||||
|
||||
|
||||
class CalendarPolicies(BaseModel):
|
||||
"""Model for domain-level Calendar policy settings."""
|
||||
|
||||
primary_calendar_external_sharing: Optional[str] = None
|
||||
secondary_calendar_external_sharing: Optional[str] = None
|
||||
external_invitations_warning: Optional[bool] = None
|
||||
@@ -8,23 +8,21 @@ class Directory(GoogleWorkspaceService):
|
||||
|
||||
def __init__(self, provider):
|
||||
super().__init__(provider)
|
||||
self._service = self._build_service("admin", "directory_v1")
|
||||
self.users = self._list_users()
|
||||
self._roles = self._list_roles()
|
||||
self._populate_role_assignments()
|
||||
|
||||
def _list_users(self):
|
||||
logger.info("Directory - Listing Users...")
|
||||
users = {}
|
||||
|
||||
try:
|
||||
# Build the Admin SDK Directory service
|
||||
service = self._build_service("admin", "directory_v1")
|
||||
|
||||
if not service:
|
||||
if not self._service:
|
||||
logger.error("Failed to build Directory service")
|
||||
return users
|
||||
|
||||
# Fetch users using the Directory API
|
||||
# Reference: https://developers.google.com/admin-sdk/directory/reference/rest/v1/users/list
|
||||
request = service.users().list(
|
||||
request = self._service.users().list(
|
||||
customer=self.provider.identity.customer_id,
|
||||
maxResults=500, # Max allowed by API
|
||||
orderBy="email",
|
||||
@@ -38,14 +36,11 @@ class Directory(GoogleWorkspaceService):
|
||||
user = User(
|
||||
id=user_data.get("id"),
|
||||
email=user_data.get("primaryEmail"),
|
||||
is_admin=user_data.get("isAdmin", False),
|
||||
)
|
||||
users[user.id] = user
|
||||
logger.debug(
|
||||
f"Processed user: {user.email} (Admin: {user.is_admin})"
|
||||
)
|
||||
logger.debug(f"Processed user: {user.email}")
|
||||
|
||||
request = service.users().list_next(request, response)
|
||||
request = self._service.users().list_next(request, response)
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(
|
||||
@@ -62,9 +57,108 @@ class Directory(GoogleWorkspaceService):
|
||||
|
||||
return users
|
||||
|
||||
def _list_roles(self):
|
||||
logger.info("Directory - Listing Roles...")
|
||||
roles = {}
|
||||
|
||||
try:
|
||||
if not self._service:
|
||||
return roles
|
||||
|
||||
request = self._service.roles().list(
|
||||
customer=self.provider.identity.customer_id,
|
||||
)
|
||||
|
||||
while request is not None:
|
||||
try:
|
||||
response = request.execute()
|
||||
|
||||
for role_data in response.get("items", []):
|
||||
role_id = str(role_data.get("roleId", ""))
|
||||
role_name = role_data.get("roleName", "")
|
||||
if role_id and role_name:
|
||||
roles[role_id] = Role(
|
||||
id=role_id,
|
||||
name=role_name,
|
||||
description=role_data.get("roleDescription", ""),
|
||||
is_super_admin_role=role_data.get(
|
||||
"isSuperAdminRole", False
|
||||
),
|
||||
)
|
||||
|
||||
request = self._service.roles().list_next(request, response)
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(
|
||||
error,
|
||||
"listing roles",
|
||||
self.provider.identity.customer_id,
|
||||
)
|
||||
break
|
||||
|
||||
logger.info(f"Found {len(roles)} roles in the domain")
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
return roles
|
||||
|
||||
def _populate_role_assignments(self):
|
||||
logger.info("Directory - Fetching Role Assignments...")
|
||||
|
||||
if not self._service:
|
||||
return
|
||||
|
||||
try:
|
||||
request = self._service.roleAssignments().list(
|
||||
customer=self.provider.identity.customer_id,
|
||||
)
|
||||
|
||||
while request is not None:
|
||||
try:
|
||||
response = request.execute()
|
||||
|
||||
for assignment in response.get("items", []):
|
||||
user_id = str(assignment.get("assignedTo", ""))
|
||||
role_id = str(assignment.get("roleId", ""))
|
||||
user = self.users.get(user_id)
|
||||
role = self._roles.get(role_id)
|
||||
if user and role:
|
||||
user.role_assignments.append(role)
|
||||
if role.is_super_admin_role:
|
||||
user.is_admin = True
|
||||
|
||||
request = self._service.roleAssignments().list_next(
|
||||
request, response
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(
|
||||
error,
|
||||
"listing role assignments",
|
||||
self.provider.identity.customer_id,
|
||||
)
|
||||
break
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
|
||||
class Role(BaseModel):
|
||||
|
||||
id: str
|
||||
name: str
|
||||
description: str = ""
|
||||
is_super_admin_role: bool = False
|
||||
|
||||
|
||||
class User(BaseModel):
|
||||
|
||||
id: str
|
||||
email: str
|
||||
is_admin: bool = False
|
||||
role_assignments: list[Role] = []
|
||||
|
||||
@@ -0,0 +1,39 @@
|
||||
{
|
||||
"Provider": "googleworkspace",
|
||||
"CheckID": "directory_super_admin_only_admin_roles",
|
||||
"CheckTitle": "All super admin accounts are used only for super admin activities",
|
||||
"CheckType": [],
|
||||
"ServiceName": "directory",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": "high",
|
||||
"ResourceType": "NotDefined",
|
||||
"ResourceGroup": "IAM",
|
||||
"Description": "Super admin accounts do not also hold **additional admin roles** such as Groups Admin, User Management Admin, etc. Each super administrator has a separate, non-admin account for daily activities, following the **principle of least privilege**.",
|
||||
"Risk": "A super admin account that also holds additional admin roles increases the **attack surface** for phishing and credential theft. Compromising a single dual-role account grants full administrative access, bypassing **separation of duties** and enabling unauthorized changes to users, billing, and security settings.",
|
||||
"RelatedUrl": "",
|
||||
"AdditionalURLs": [
|
||||
"https://knowledge.workspace.google.com/admin/users/prebuilt-administrator-roles",
|
||||
"https://support.google.com/a/answer/9011373"
|
||||
],
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Directory** > **Users**\n3. Click on the super admin user who also has additional admin roles\n4. Click **Admin roles and privileges**\n5. Remove the additional admin roles from the super admin account\n6. Create a separate account for daily admin tasks",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Apply the principle of separation of duties by maintaining dedicated super admin accounts exclusively for privileged tasks. Daily administrative activities should be performed from separate accounts with only the delegated roles required.",
|
||||
"Url": "https://hub.prowler.com/check/directory_super_admin_only_admin_roles"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"identity-access"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [
|
||||
"directory_super_admin_count"
|
||||
],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
from typing import List
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
|
||||
from prowler.providers.googleworkspace.services.directory.directory_client import (
|
||||
directory_client,
|
||||
)
|
||||
|
||||
|
||||
class directory_super_admin_only_admin_roles(Check):
|
||||
"""Check that super admin accounts are used only for super admin activities
|
||||
|
||||
This check verifies that no super admin user has additional admin roles assigned
|
||||
beyond the Super Admin role. Super admins should have separate accounts for daily
|
||||
activities to follow least privilege.
|
||||
"""
|
||||
|
||||
def execute(self) -> List[CheckReportGoogleWorkspace]:
|
||||
findings = []
|
||||
|
||||
if directory_client.users:
|
||||
dual_role_admins = {}
|
||||
for user in directory_client.users.values():
|
||||
if user.is_admin:
|
||||
extra_roles = [
|
||||
r.description or r.name
|
||||
for r in user.role_assignments
|
||||
if not r.is_super_admin_role
|
||||
]
|
||||
if extra_roles:
|
||||
dual_role_admins[user.email] = extra_roles
|
||||
|
||||
report = CheckReportGoogleWorkspace(
|
||||
metadata=self.metadata(),
|
||||
resource=directory_client.provider.identity,
|
||||
resource_name=directory_client.provider.identity.domain,
|
||||
resource_id=directory_client.provider.identity.customer_id,
|
||||
customer_id=directory_client.provider.identity.customer_id,
|
||||
location="global",
|
||||
)
|
||||
|
||||
if dual_role_admins:
|
||||
details = ", ".join(
|
||||
f"{email} ({', '.join(roles)})"
|
||||
for email, roles in dual_role_admins.items()
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"Super admin accounts also holding additional admin roles: {details}. "
|
||||
f"Super admin accounts should be used only for super admin activities."
|
||||
)
|
||||
else:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"All super admin accounts in domain {directory_client.provider.identity.domain} "
|
||||
f"are used only for super admin activities."
|
||||
)
|
||||
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -0,0 +1,41 @@
|
||||
{
|
||||
"Provider": "m365",
|
||||
"CheckID": "entra_conditional_access_policy_directory_sync_account_excluded",
|
||||
"CheckTitle": "Conditional Access policy excludes Directory Synchronization Accounts to protect Entra Connect sync",
|
||||
"CheckType": [],
|
||||
"ServiceName": "entra",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "NotDefined",
|
||||
"ResourceGroup": "IAM",
|
||||
"Description": "Conditional Access policies scoped to **all users** and **all cloud applications** are evaluated to confirm the **Directory Synchronization Accounts** role is explicitly excluded. The Microsoft Entra Connect Sync Account does not support multifactor authentication, so it must be excluded from restrictive policies to maintain directory synchronization.",
|
||||
"Risk": "If the Directory Synchronization Accounts role is not excluded from Conditional Access policies requiring MFA or blocking access, the Entra Connect Sync Account will be unable to authenticate. This breaks hybrid identity synchronization between on-premises Active Directory and Entra ID, potentially causing authentication failures and identity inconsistencies.",
|
||||
"RelatedUrl": "",
|
||||
"AdditionalURLs": [
|
||||
"https://learn.microsoft.com/en-us/entra/identity/conditional-access/howto-conditional-access-policy-all-users-mfa",
|
||||
"https://learn.microsoft.com/en-us/entra/identity/hybrid/connect/reference-connect-accounts-permissions",
|
||||
"https://maester.dev/docs/tests/MT.1020"
|
||||
],
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "1. Navigate to the Microsoft Entra admin center at https://entra.microsoft.com.\n2. Expand **Protection** > **Conditional Access** and select **Policies**.\n3. Open each policy that targets **All users** and **All cloud apps**.\n4. Under **Users** > **Exclude**, select **Directory roles** and add the **Directory Synchronization Accounts** role.\n5. Save the policy.",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Exclude the Directory Synchronization Accounts role from all Conditional Access policies that target all users and all cloud applications. This prevents breaking Entra Connect directory synchronization while maintaining security controls for interactive users.",
|
||||
"Url": "https://hub.prowler.com/check/entra_conditional_access_policy_directory_sync_account_excluded"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"identity-access",
|
||||
"e3"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [
|
||||
"entra_conditional_access_policy_require_mfa_for_management_api"
|
||||
],
|
||||
"Notes": "This check corresponds to Maester test MT.1020 (Test-MtCaExclusionForDirectorySyncAccount). The Directory Synchronization Accounts role template ID is d29b2b05-8046-44ba-8758-1e26182fcf32."
|
||||
}
|
||||
@@ -0,0 +1,88 @@
|
||||
"""Check if Conditional Access policies exclude the Directory Synchronization Account."""
|
||||
|
||||
from prowler.lib.check.models import Check, CheckReportM365
|
||||
from prowler.providers.m365.services.entra.entra_client import entra_client
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicyState,
|
||||
)
|
||||
|
||||
# The Directory Synchronization Accounts built-in role template ID in Entra ID.
|
||||
# This role is assigned to the Microsoft Entra Connect Sync service account and
|
||||
# does not support multifactor authentication.
|
||||
DIRECTORY_SYNC_ROLE_TEMPLATE_ID = "d29b2b05-8046-44ba-8758-1e26182fcf32"
|
||||
|
||||
|
||||
class entra_conditional_access_policy_directory_sync_account_excluded(Check):
|
||||
"""Check that Conditional Access policies exclude the Directory Synchronization Account.
|
||||
|
||||
The Microsoft Entra Connect Sync Account cannot support MFA. Conditional
|
||||
Access policies scoped to all users and all cloud apps must explicitly
|
||||
exclude the Directory Synchronization Accounts role to prevent breaking
|
||||
directory synchronization.
|
||||
|
||||
- PASS: The policy excludes the Directory Synchronization Accounts role.
|
||||
- FAIL: The policy does not exclude the Directory Synchronization Accounts role.
|
||||
"""
|
||||
|
||||
def execute(self) -> list[CheckReportM365]:
|
||||
"""Execute the check for Directory Sync Account exclusion from Conditional Access policies.
|
||||
|
||||
Iterates through all enabled Conditional Access policies that target
|
||||
all users and all cloud applications, verifying each one excludes the
|
||||
Directory Synchronization Accounts role.
|
||||
|
||||
Returns:
|
||||
A list of reports containing the result of the check.
|
||||
"""
|
||||
findings = []
|
||||
|
||||
for policy in entra_client.conditional_access_policies.values():
|
||||
if policy.state == ConditionalAccessPolicyState.DISABLED:
|
||||
continue
|
||||
|
||||
if not policy.conditions.user_conditions:
|
||||
continue
|
||||
|
||||
if "All" not in policy.conditions.user_conditions.included_users:
|
||||
continue
|
||||
|
||||
if not policy.conditions.application_conditions:
|
||||
continue
|
||||
|
||||
if (
|
||||
"All"
|
||||
not in policy.conditions.application_conditions.included_applications
|
||||
):
|
||||
continue
|
||||
|
||||
report = CheckReportM365(
|
||||
metadata=self.metadata(),
|
||||
resource=policy,
|
||||
resource_name=policy.display_name,
|
||||
resource_id=policy.id,
|
||||
)
|
||||
|
||||
if (
|
||||
DIRECTORY_SYNC_ROLE_TEMPLATE_ID
|
||||
in policy.conditions.user_conditions.excluded_roles
|
||||
):
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Conditional Access Policy '{policy.display_name}' excludes the Directory Synchronization Accounts role."
|
||||
else:
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Conditional Access Policy '{policy.display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
|
||||
|
||||
findings.append(report)
|
||||
|
||||
if not findings:
|
||||
report = CheckReportM365(
|
||||
metadata=self.metadata(),
|
||||
resource={},
|
||||
resource_name="Conditional Access Policies",
|
||||
resource_id="conditionalAccessPolicies",
|
||||
)
|
||||
report.status = "PASS"
|
||||
report.status_extended = "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
|
||||
findings.append(report)
|
||||
|
||||
return findings
|
||||
@@ -13,7 +13,11 @@ def init_parser(self):
|
||||
"--nhn-username", nargs="?", default=None, help="NHN API Username"
|
||||
)
|
||||
nhn_auth_subparser.add_argument(
|
||||
"--nhn-password", nargs="?", default=None, help="NHN API Password"
|
||||
"--nhn-password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="NHN_PASSWORD",
|
||||
help="NHN API Password",
|
||||
)
|
||||
nhn_auth_subparser.add_argument(
|
||||
"--nhn-tenant-id", nargs="?", default=None, help="NHN Tenant ID"
|
||||
|
||||
@@ -46,6 +46,7 @@ def init_parser(self):
|
||||
"--os-password",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="OS_PASSWORD",
|
||||
help="OpenStack password for authentication. Can also be set via OS_PASSWORD environment variable",
|
||||
)
|
||||
openstack_explicit_subparser.add_argument(
|
||||
|
||||
@@ -45,6 +45,34 @@ prowler/providers/{provider}/
|
||||
└── {check_name}.metadata.json
|
||||
```
|
||||
|
||||
## Sensitive CLI Arguments
|
||||
|
||||
Flags that accept secrets (tokens, passwords, API keys) MUST follow these rules:
|
||||
|
||||
1. **Use `nargs="?"` with `default=None`** — the flag accepts an optional value for backward compatibility; the recommended path is environment variables.
|
||||
2. **Set `metavar` to the environment variable name** users should use (e.g., `metavar="GITHUB_PERSONAL_ACCESS_TOKEN"`).
|
||||
3. **Add the flag to the `SENSITIVE_ARGUMENTS` frozenset** at the top of the provider's `arguments.py`. This set is used to redact values in HTML output and warn users who pass secrets directly.
|
||||
4. **Do not add new arguments that require passing secrets as CLI values** — secrets should come from environment variables. The flag accepts a value for backward compatibility, but CLI warns users to prefer env vars.
|
||||
|
||||
### Pattern
|
||||
|
||||
```python
|
||||
# prowler/providers/{provider}/lib/arguments/arguments.py
|
||||
|
||||
SENSITIVE_ARGUMENTS = frozenset({"--my-api-key", "--my-password"})
|
||||
|
||||
|
||||
def init_parser(self):
|
||||
auth_subparser = parser.add_argument_group("Authentication Modes")
|
||||
auth_subparser.add_argument(
|
||||
"--my-api-key",
|
||||
nargs="?",
|
||||
default=None,
|
||||
metavar="MY_API_KEY",
|
||||
help="API key for authentication. Use MY_API_KEY env var instead of passing directly.",
|
||||
)
|
||||
```
|
||||
|
||||
## Provider Class Template
|
||||
|
||||
```python
|
||||
|
||||
@@ -1,8 +1,14 @@
|
||||
import logging
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from prowler.lib.cli.redact import REDACTED_VALUE, get_sensitive_arguments, redact_argv
|
||||
from prowler.lib.cli.redact import (
|
||||
REDACTED_VALUE,
|
||||
get_sensitive_arguments,
|
||||
redact_argv,
|
||||
warn_sensitive_argument_values,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
@@ -87,6 +93,62 @@ class TestRedactArgv:
|
||||
assert redact_argv(argv) == "aws --region=us-east-1"
|
||||
|
||||
|
||||
class TestWarnSensitiveArgumentValues:
|
||||
def test_no_warning_without_sensitive_flags(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["aws", "--region", "eu-west-1"])
|
||||
assert caplog.text == ""
|
||||
|
||||
def test_no_warning_flag_without_value(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["github", "--personal-access-token"])
|
||||
assert caplog.text == ""
|
||||
|
||||
def test_no_warning_flag_followed_by_another_flag(
|
||||
self, caplog, mock_sensitive_args
|
||||
):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(
|
||||
["github", "--personal-access-token", "--region", "eu-west-1"]
|
||||
)
|
||||
assert caplog.text == ""
|
||||
|
||||
def test_warning_flag_with_value(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(
|
||||
["github", "--personal-access-token", "ghp_secret"]
|
||||
)
|
||||
assert "--personal-access-token" in caplog.text
|
||||
assert "not recommended" in caplog.text
|
||||
|
||||
def test_warning_flag_with_equals_syntax(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["aws", "--shodan=key123"])
|
||||
assert "--shodan" in caplog.text
|
||||
assert "not recommended" in caplog.text
|
||||
|
||||
def test_warning_multiple_flags(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(
|
||||
[
|
||||
"github",
|
||||
"--personal-access-token",
|
||||
"ghp_secret",
|
||||
"--shodan",
|
||||
"key",
|
||||
]
|
||||
)
|
||||
assert "--personal-access-token" in caplog.text
|
||||
assert "--shodan" in caplog.text
|
||||
|
||||
def test_no_color_output(self, caplog, mock_sensitive_args):
|
||||
with caplog.at_level(logging.WARNING):
|
||||
warn_sensitive_argument_values(["--no-color", "aws", "--shodan", "key123"])
|
||||
assert "not recommended" in caplog.text
|
||||
# Should not contain ANSI escape codes
|
||||
assert "\033[" not in caplog.text
|
||||
|
||||
|
||||
class TestGetSensitiveArguments:
|
||||
def test_discovers_known_sensitive_arguments(self):
|
||||
"""Integration test: verify the discovery mechanism finds flags from provider modules."""
|
||||
|
||||
@@ -1413,6 +1413,115 @@ class Test_Policy:
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_aws_CalledVia_str(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"aws:CalledVia": "cloudformation.amazonaws.com"}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement,
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
is_cross_account_allowed=True,
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_aws_CalledViaFirst_str(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"aws:CalledViaFirst": "cloudformation.amazonaws.com"}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement,
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
is_cross_account_allowed=True,
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_aws_CalledViaLast_str(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"aws:CalledViaLast": "glue.amazonaws.com"}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement,
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
is_cross_account_allowed=True,
|
||||
)
|
||||
|
||||
def test_condition_parser_string_like_aws_CalledVia_str(self):
|
||||
condition_statement = {"StringLike": {"aws:CalledVia": "*.amazonaws.com"}}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement,
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
is_cross_account_allowed=True,
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_kms_CallerAccount_str(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_kms_CallerAccount_str_not_valid(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"kms:CallerAccount": NON_TRUSTED_AWS_ACCOUNT_NUMBER}
|
||||
}
|
||||
assert not is_condition_block_restrictive(
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_kms_CallerAccount_list(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"kms:CallerAccount": [TRUSTED_AWS_ACCOUNT_NUMBER]}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_kms_CallerAccount_list_not_valid(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {
|
||||
"kms:CallerAccount": [
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
NON_TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
]
|
||||
}
|
||||
}
|
||||
assert not is_condition_block_restrictive(
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_equals_kms_ViaService_str(self):
|
||||
condition_statement = {
|
||||
"StringEquals": {"kms:ViaService": "glue.eu-central-1.amazonaws.com"}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement,
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
is_cross_account_allowed=True,
|
||||
)
|
||||
|
||||
def test_condition_parser_string_like_kms_CallerAccount_str(self):
|
||||
condition_statement = {
|
||||
"StringLike": {"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER}
|
||||
}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_like_kms_CallerAccount_str_not_valid(self):
|
||||
condition_statement = {
|
||||
"StringLike": {"kms:CallerAccount": NON_TRUSTED_AWS_ACCOUNT_NUMBER}
|
||||
}
|
||||
assert not is_condition_block_restrictive(
|
||||
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
|
||||
)
|
||||
|
||||
def test_condition_parser_string_like_kms_ViaService_str(self):
|
||||
condition_statement = {"StringLike": {"kms:ViaService": "glue.*.amazonaws.com"}}
|
||||
assert is_condition_block_restrictive(
|
||||
condition_statement,
|
||||
TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
is_cross_account_allowed=True,
|
||||
)
|
||||
|
||||
def test_condition_parser_two_lists_unrestrictive(self):
|
||||
condition_statement = {
|
||||
"StringLike": {
|
||||
@@ -2357,6 +2466,71 @@ class Test_Policy:
|
||||
trusted_ips=["1.2.3.4", "5.6.7.8"],
|
||||
)
|
||||
|
||||
def test_is_policy_public_kms_caller_account_and_via_service(self):
|
||||
policy = {
|
||||
"Version": "2008-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": [
|
||||
"kms:Encrypt",
|
||||
"kms:Decrypt",
|
||||
"kms:ReEncrypt*",
|
||||
"kms:GenerateDataKey*",
|
||||
"kms:CreateGrant",
|
||||
"kms:DescribeKey",
|
||||
],
|
||||
"Resource": "*",
|
||||
"Condition": {
|
||||
"StringEquals": {
|
||||
"kms:ViaService": "glue.eu-central-1.amazonaws.com",
|
||||
"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
}
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
|
||||
|
||||
def test_is_policy_public_kms_caller_account_only(self):
|
||||
policy = {
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": ["kms:Decrypt"],
|
||||
"Resource": "*",
|
||||
"Condition": {
|
||||
"StringEquals": {
|
||||
"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER,
|
||||
}
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
|
||||
|
||||
def test_is_policy_public_kms_via_service_without_account_restriction(self):
|
||||
policy = {
|
||||
"Version": "2012-10-17",
|
||||
"Statement": [
|
||||
{
|
||||
"Effect": "Allow",
|
||||
"Principal": {"AWS": "*"},
|
||||
"Action": ["kms:Decrypt"],
|
||||
"Resource": "*",
|
||||
"Condition": {
|
||||
"StringEquals": {
|
||||
"kms:ViaService": "glue.eu-central-1.amazonaws.com",
|
||||
}
|
||||
},
|
||||
},
|
||||
],
|
||||
}
|
||||
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
|
||||
|
||||
def test_check_admin_access(self):
|
||||
policy = {
|
||||
"Version": "2012-10-17",
|
||||
|
||||
@@ -43,6 +43,39 @@ USER_3 = {
|
||||
}
|
||||
|
||||
|
||||
# Role data for Directory API role tests
|
||||
SUPER_ADMIN_ROLE_ID = "13801188331880449"
|
||||
SEED_ADMIN_ROLE_ID = "13801188331880451"
|
||||
GROUPS_ADMIN_ROLE_ID = "13801188331880450"
|
||||
|
||||
ROLE_SUPER_ADMIN = {
|
||||
"roleId": SUPER_ADMIN_ROLE_ID,
|
||||
"roleName": "Super Admin",
|
||||
"roleDescription": "Super Admin",
|
||||
"isSystemRole": True,
|
||||
"isSuperAdminRole": True,
|
||||
}
|
||||
|
||||
# Google automatically assigns _SEED_ADMIN_ROLE to the first account that
|
||||
# created the domain. It is a super-admin-capable system role with a
|
||||
# different name, so it must also be excluded when counting "extra" roles.
|
||||
ROLE_SEED_ADMIN = {
|
||||
"roleId": SEED_ADMIN_ROLE_ID,
|
||||
"roleName": "_SEED_ADMIN_ROLE",
|
||||
"roleDescription": "Super Admin",
|
||||
"isSystemRole": True,
|
||||
"isSuperAdminRole": True,
|
||||
}
|
||||
|
||||
ROLE_GROUPS_ADMIN = {
|
||||
"roleId": GROUPS_ADMIN_ROLE_ID,
|
||||
"roleName": "_GROUPS_ADMIN_ROLE",
|
||||
"roleDescription": "Groups Administrator",
|
||||
"isSystemRole": True,
|
||||
"isSuperAdminRole": False,
|
||||
}
|
||||
|
||||
|
||||
def set_mocked_googleworkspace_provider(
|
||||
identity: GoogleWorkspaceIdentityInfo = GoogleWorkspaceIdentityInfo(
|
||||
domain=DOMAIN,
|
||||
|
||||
@@ -0,0 +1,130 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
CalendarPolicies,
|
||||
)
|
||||
from tests.providers.googleworkspace.googleworkspace_fixtures import (
|
||||
CUSTOMER_ID,
|
||||
DOMAIN,
|
||||
set_mocked_googleworkspace_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestCalendarExternalInvitationsWarning:
|
||||
def test_pass_warnings_enabled(self):
|
||||
"""Test PASS when external invitation warnings are enabled"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
|
||||
calendar_external_invitations_warning,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
external_invitations_warning=True
|
||||
)
|
||||
|
||||
check = calendar_external_invitations_warning()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
assert "enabled" in findings[0].status_extended
|
||||
assert findings[0].resource_name == DOMAIN
|
||||
assert findings[0].customer_id == CUSTOMER_ID
|
||||
|
||||
def test_fail_warnings_disabled(self):
|
||||
"""Test FAIL when external invitation warnings are disabled"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
|
||||
calendar_external_invitations_warning,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
external_invitations_warning=False
|
||||
)
|
||||
|
||||
check = calendar_external_invitations_warning()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "disabled" in findings[0].status_extended
|
||||
|
||||
def test_fail_no_policy_set(self):
|
||||
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
|
||||
calendar_external_invitations_warning,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
external_invitations_warning=None
|
||||
)
|
||||
|
||||
check = calendar_external_invitations_warning()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "not explicitly configured" in findings[0].status_extended
|
||||
|
||||
def test_no_findings_when_fetch_failed(self):
|
||||
"""Test no findings returned when the API fetch failed"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
|
||||
calendar_external_invitations_warning,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = False
|
||||
mock_calendar_client.policies = CalendarPolicies()
|
||||
|
||||
check = calendar_external_invitations_warning()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 0
|
||||
@@ -0,0 +1,161 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
CalendarPolicies,
|
||||
)
|
||||
from tests.providers.googleworkspace.googleworkspace_fixtures import (
|
||||
CUSTOMER_ID,
|
||||
DOMAIN,
|
||||
set_mocked_googleworkspace_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestCalendarExternalSharingPrimaryCalendar:
|
||||
def test_pass_free_busy_only(self):
|
||||
"""Test PASS when external sharing is restricted to free/busy only"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
|
||||
calendar_external_sharing_primary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
primary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY"
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_primary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
assert "free/busy information only" in findings[0].status_extended
|
||||
assert findings[0].resource_name == DOMAIN
|
||||
assert findings[0].customer_id == CUSTOMER_ID
|
||||
|
||||
def test_fail_read_only(self):
|
||||
"""Test FAIL when external sharing allows read-only access"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
|
||||
calendar_external_sharing_primary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
primary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_ONLY"
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_primary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "EXTERNAL_ALL_INFO_READ_ONLY" in findings[0].status_extended
|
||||
assert "free/busy information only" in findings[0].status_extended
|
||||
|
||||
def test_fail_read_write(self):
|
||||
"""Test FAIL when external sharing allows read-write access"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
|
||||
calendar_external_sharing_primary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
primary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE"
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_primary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "EXTERNAL_ALL_INFO_READ_WRITE" in findings[0].status_extended
|
||||
|
||||
def test_fail_no_policy_set(self):
|
||||
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
|
||||
calendar_external_sharing_primary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
primary_calendar_external_sharing=None
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_primary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "not explicitly configured" in findings[0].status_extended
|
||||
|
||||
def test_no_findings_when_fetch_failed(self):
|
||||
"""Test no findings returned when the API fetch failed"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
|
||||
calendar_external_sharing_primary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = False
|
||||
mock_calendar_client.policies = CalendarPolicies()
|
||||
|
||||
check = calendar_external_sharing_primary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 0
|
||||
@@ -0,0 +1,161 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
CalendarPolicies,
|
||||
)
|
||||
from tests.providers.googleworkspace.googleworkspace_fixtures import (
|
||||
CUSTOMER_ID,
|
||||
DOMAIN,
|
||||
set_mocked_googleworkspace_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestCalendarExternalSharingSecondaryCalendar:
|
||||
def test_pass_free_busy_only(self):
|
||||
"""Test PASS when external sharing is restricted to free/busy only"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
|
||||
calendar_external_sharing_secondary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
secondary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY"
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_secondary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
assert "free/busy information only" in findings[0].status_extended
|
||||
assert findings[0].resource_name == DOMAIN
|
||||
assert findings[0].customer_id == CUSTOMER_ID
|
||||
|
||||
def test_fail_read_only(self):
|
||||
"""Test FAIL when external sharing allows read-only access"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
|
||||
calendar_external_sharing_secondary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_ONLY"
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_secondary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "EXTERNAL_ALL_INFO_READ_ONLY" in findings[0].status_extended
|
||||
assert "free/busy information only" in findings[0].status_extended
|
||||
|
||||
def test_fail_read_write_manage(self):
|
||||
"""Test FAIL when external sharing allows read-write-manage access"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
|
||||
calendar_external_sharing_secondary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE_MANAGE"
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_secondary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "EXTERNAL_ALL_INFO_READ_WRITE_MANAGE" in findings[0].status_extended
|
||||
|
||||
def test_fail_no_policy_set(self):
|
||||
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
|
||||
calendar_external_sharing_secondary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = True
|
||||
mock_calendar_client.policies = CalendarPolicies(
|
||||
secondary_calendar_external_sharing=None
|
||||
)
|
||||
|
||||
check = calendar_external_sharing_secondary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "not explicitly configured" in findings[0].status_extended
|
||||
|
||||
def test_no_findings_when_fetch_failed(self):
|
||||
"""Test no findings returned when the API fetch failed"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
|
||||
) as mock_calendar_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
|
||||
calendar_external_sharing_secondary_calendar,
|
||||
)
|
||||
|
||||
mock_calendar_client.provider = mock_provider
|
||||
mock_calendar_client.policies_fetched = False
|
||||
mock_calendar_client.policies = CalendarPolicies()
|
||||
|
||||
check = calendar_external_sharing_secondary_calendar()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 0
|
||||
@@ -0,0 +1,231 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from tests.providers.googleworkspace.googleworkspace_fixtures import (
|
||||
set_mocked_googleworkspace_provider,
|
||||
)
|
||||
|
||||
|
||||
class TestCalendarService:
|
||||
def test_calendar_fetch_policies_all_settings(self):
|
||||
"""Test fetching all 3 calendar policy settings from Cloud Identity API"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_credentials = MagicMock()
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = mock_credentials
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
mock_policies_list = MagicMock()
|
||||
# Mock the actual Cloud Identity Policy API v1 response shape:
|
||||
# - "type" (not "name"), prefixed with "settings/"
|
||||
# - inner value field names are camelCase
|
||||
mock_policies_list.execute.return_value = {
|
||||
"policies": [
|
||||
{
|
||||
"setting": {
|
||||
"type": "settings/calendar.primary_calendar_max_allowed_external_sharing",
|
||||
"value": {
|
||||
"maxAllowedExternalSharing": "EXTERNAL_FREE_BUSY_ONLY"
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"setting": {
|
||||
"type": "settings/calendar.secondary_calendar_max_allowed_external_sharing",
|
||||
"value": {
|
||||
"maxAllowedExternalSharing": "EXTERNAL_ALL_INFO_READ_ONLY"
|
||||
},
|
||||
}
|
||||
},
|
||||
{
|
||||
"setting": {
|
||||
"type": "settings/calendar.external_invitations",
|
||||
"value": {"warnOnInvite": True},
|
||||
}
|
||||
},
|
||||
]
|
||||
}
|
||||
mock_service.policies().list.return_value = mock_policies_list
|
||||
mock_service.policies().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
Calendar,
|
||||
)
|
||||
|
||||
calendar = Calendar(mock_provider)
|
||||
|
||||
assert calendar.policies_fetched is True
|
||||
assert (
|
||||
calendar.policies.primary_calendar_external_sharing
|
||||
== "EXTERNAL_FREE_BUSY_ONLY"
|
||||
)
|
||||
assert (
|
||||
calendar.policies.secondary_calendar_external_sharing
|
||||
== "EXTERNAL_ALL_INFO_READ_ONLY"
|
||||
)
|
||||
assert calendar.policies.external_invitations_warning is True
|
||||
|
||||
def test_calendar_fetch_policies_empty_response(self):
|
||||
"""Test handling empty policies response"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
mock_policies_list = MagicMock()
|
||||
mock_policies_list.execute.return_value = {"policies": []}
|
||||
mock_service.policies().list.return_value = mock_policies_list
|
||||
mock_service.policies().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
Calendar,
|
||||
)
|
||||
|
||||
calendar = Calendar(mock_provider)
|
||||
|
||||
assert calendar.policies_fetched is True
|
||||
assert calendar.policies.primary_calendar_external_sharing is None
|
||||
assert calendar.policies.secondary_calendar_external_sharing is None
|
||||
assert calendar.policies.external_invitations_warning is None
|
||||
|
||||
def test_calendar_fetch_policies_api_error(self):
|
||||
"""Test handling of API errors during policy fetch"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
mock_service.policies().list.side_effect = Exception("API Error")
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
Calendar,
|
||||
)
|
||||
|
||||
calendar = Calendar(mock_provider)
|
||||
|
||||
assert calendar.policies_fetched is False
|
||||
assert calendar.policies.primary_calendar_external_sharing is None
|
||||
assert calendar.policies.secondary_calendar_external_sharing is None
|
||||
assert calendar.policies.external_invitations_warning is None
|
||||
|
||||
def test_calendar_fetch_policies_build_service_returns_none(self):
|
||||
"""Test early return when _build_service fails to construct the client"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
|
||||
return_value=None,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
Calendar,
|
||||
)
|
||||
|
||||
calendar = Calendar(mock_provider)
|
||||
|
||||
assert calendar.policies_fetched is False
|
||||
assert calendar.policies.primary_calendar_external_sharing is None
|
||||
assert calendar.policies.secondary_calendar_external_sharing is None
|
||||
assert calendar.policies.external_invitations_warning is None
|
||||
|
||||
def test_calendar_fetch_policies_execute_raises(self):
|
||||
"""Test inner except handler when request.execute() raises during pagination"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
mock_request = MagicMock()
|
||||
mock_request.execute.side_effect = Exception("Execute failed")
|
||||
mock_service.policies().list.return_value = mock_request
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
Calendar,
|
||||
)
|
||||
|
||||
calendar = Calendar(mock_provider)
|
||||
|
||||
assert calendar.policies_fetched is False
|
||||
assert calendar.policies.primary_calendar_external_sharing is None
|
||||
assert calendar.policies.secondary_calendar_external_sharing is None
|
||||
assert calendar.policies.external_invitations_warning is None
|
||||
|
||||
def test_calendar_policies_model(self):
|
||||
"""Test CalendarPolicies Pydantic model"""
|
||||
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
|
||||
CalendarPolicies,
|
||||
)
|
||||
|
||||
policies = CalendarPolicies(
|
||||
primary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY",
|
||||
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE",
|
||||
external_invitations_warning=True,
|
||||
)
|
||||
|
||||
assert policies.primary_calendar_external_sharing == "EXTERNAL_FREE_BUSY_ONLY"
|
||||
assert (
|
||||
policies.secondary_calendar_external_sharing
|
||||
== "EXTERNAL_ALL_INFO_READ_WRITE"
|
||||
)
|
||||
assert policies.external_invitations_warning is True
|
||||
@@ -1,6 +1,12 @@
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from tests.providers.googleworkspace.googleworkspace_fixtures import (
|
||||
GROUPS_ADMIN_ROLE_ID,
|
||||
ROLE_GROUPS_ADMIN,
|
||||
ROLE_SEED_ADMIN,
|
||||
ROLE_SUPER_ADMIN,
|
||||
SEED_ADMIN_ROLE_ID,
|
||||
SUPER_ADMIN_ROLE_ID,
|
||||
USER_1,
|
||||
USER_2,
|
||||
USER_3,
|
||||
@@ -25,6 +31,24 @@ class TestDirectoryService:
|
||||
mock_service.users().list.return_value = mock_users_list
|
||||
mock_service.users().list_next.return_value = None
|
||||
|
||||
# Mock roles response
|
||||
mock_roles_list = MagicMock()
|
||||
mock_roles_list.execute.return_value = {
|
||||
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
|
||||
}
|
||||
mock_service.roles().list.return_value = mock_roles_list
|
||||
mock_service.roles().list_next.return_value = None
|
||||
|
||||
mock_ra = MagicMock()
|
||||
mock_ra.execute.return_value = {
|
||||
"items": [
|
||||
{"assignedTo": "user1-id", "roleId": SUPER_ADMIN_ROLE_ID},
|
||||
{"assignedTo": "user2-id", "roleId": SUPER_ADMIN_ROLE_ID},
|
||||
]
|
||||
}
|
||||
mock_service.roleAssignments().list.return_value = mock_ra
|
||||
mock_service.roleAssignments().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
@@ -67,6 +91,17 @@ class TestDirectoryService:
|
||||
mock_service.users().list.return_value = mock_users_list
|
||||
mock_service.users().list_next.return_value = None
|
||||
|
||||
# Mock roles response
|
||||
mock_roles_list = MagicMock()
|
||||
mock_roles_list.execute.return_value = {"items": []}
|
||||
mock_service.roles().list.return_value = mock_roles_list
|
||||
mock_service.roles().list_next.return_value = None
|
||||
|
||||
mock_ra = MagicMock()
|
||||
mock_ra.execute.return_value = {"items": []}
|
||||
mock_service.roleAssignments().list.return_value = mock_ra
|
||||
mock_service.roleAssignments().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
@@ -97,6 +132,16 @@ class TestDirectoryService:
|
||||
mock_service = MagicMock()
|
||||
mock_service.users().list.side_effect = Exception("API Error")
|
||||
|
||||
mock_roles_list = MagicMock()
|
||||
mock_roles_list.execute.return_value = {"items": []}
|
||||
mock_service.roles().list.return_value = mock_roles_list
|
||||
mock_service.roles().list_next.return_value = None
|
||||
|
||||
mock_ra = MagicMock()
|
||||
mock_ra.execute.return_value = {"items": []}
|
||||
mock_service.roleAssignments().list.return_value = mock_ra
|
||||
mock_service.roleAssignments().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
@@ -130,3 +175,193 @@ class TestDirectoryService:
|
||||
assert user.id == "test-id"
|
||||
assert user.email == "test@test-company.com"
|
||||
assert user.is_admin is True
|
||||
assert user.role_assignments == []
|
||||
|
||||
def test_directory_list_roles(self):
|
||||
"""Test that _list_roles correctly builds a roleId-to-roleName mapping"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
|
||||
# Mock empty users
|
||||
mock_users_list = MagicMock()
|
||||
mock_users_list.execute.return_value = {"users": []}
|
||||
mock_service.users().list.return_value = mock_users_list
|
||||
mock_service.users().list_next.return_value = None
|
||||
|
||||
# Mock roles response
|
||||
mock_roles_list = MagicMock()
|
||||
mock_roles_list.execute.return_value = {
|
||||
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
|
||||
}
|
||||
mock_service.roles().list.return_value = mock_roles_list
|
||||
mock_service.roles().list_next.return_value = None
|
||||
|
||||
mock_ra = MagicMock()
|
||||
mock_ra.execute.return_value = {"items": []}
|
||||
mock_service.roleAssignments().list.return_value = mock_ra
|
||||
mock_service.roleAssignments().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_service import (
|
||||
Directory,
|
||||
)
|
||||
|
||||
directory = Directory(mock_provider)
|
||||
|
||||
super_admin_role = directory._roles[SUPER_ADMIN_ROLE_ID]
|
||||
assert super_admin_role.name == "Super Admin"
|
||||
assert super_admin_role.description == "Super Admin"
|
||||
assert super_admin_role.is_super_admin_role is True
|
||||
|
||||
groups_admin_role = directory._roles[GROUPS_ADMIN_ROLE_ID]
|
||||
assert groups_admin_role.name == "_GROUPS_ADMIN_ROLE"
|
||||
assert groups_admin_role.description == "Groups Administrator"
|
||||
assert groups_admin_role.is_super_admin_role is False
|
||||
|
||||
def test_directory_role_assignments_populated(self):
|
||||
"""Test that role assignments are fetched and resolved for super admins"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
|
||||
# Mock users - one super admin
|
||||
mock_users_list = MagicMock()
|
||||
mock_users_list.execute.return_value = {"users": [USER_1]}
|
||||
mock_service.users().list.return_value = mock_users_list
|
||||
mock_service.users().list_next.return_value = None
|
||||
|
||||
# Mock roles
|
||||
mock_roles_list = MagicMock()
|
||||
mock_roles_list.execute.return_value = {
|
||||
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
|
||||
}
|
||||
mock_service.roles().list.return_value = mock_roles_list
|
||||
mock_service.roles().list_next.return_value = None
|
||||
|
||||
mock_ra = MagicMock()
|
||||
mock_ra.execute.return_value = {
|
||||
"items": [
|
||||
{"assignedTo": "user1-id", "roleId": SUPER_ADMIN_ROLE_ID},
|
||||
{"assignedTo": "user1-id", "roleId": GROUPS_ADMIN_ROLE_ID},
|
||||
]
|
||||
}
|
||||
mock_service.roleAssignments().list.return_value = mock_ra
|
||||
mock_service.roleAssignments().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_service import (
|
||||
Directory,
|
||||
)
|
||||
|
||||
directory = Directory(mock_provider)
|
||||
|
||||
user = directory.users["user1-id"]
|
||||
role_names = [r.name for r in user.role_assignments]
|
||||
role_descriptions = [r.description for r in user.role_assignments]
|
||||
assert "Super Admin" in role_names
|
||||
assert "_GROUPS_ADMIN_ROLE" in role_names
|
||||
assert "Groups Administrator" in role_descriptions
|
||||
assert len(user.role_assignments) == 2
|
||||
assert user.is_admin is True
|
||||
|
||||
def test_directory_second_super_admin_detected_via_role_assignments(self):
|
||||
"""Regression: a second super admin whose users.list().isAdmin still
|
||||
reads False (e.g. API propagation lag, or only holding
|
||||
_SEED_ADMIN_ROLE) must still be recognised as a super admin through
|
||||
the Role Assignments API, AND any extra non-super-admin roles they
|
||||
hold must be surfaced on their User object."""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
mock_provider.audit_config = {}
|
||||
mock_provider.fixer_config = {}
|
||||
mock_session = MagicMock()
|
||||
mock_session.credentials = MagicMock()
|
||||
mock_provider.session = mock_session
|
||||
|
||||
mock_service = MagicMock()
|
||||
|
||||
stale_user_1 = {
|
||||
"id": "user1-id",
|
||||
"primaryEmail": "admin1@test-company.com",
|
||||
"isAdmin": False,
|
||||
}
|
||||
stale_user_2 = {
|
||||
"id": "user2-id",
|
||||
"primaryEmail": "admin2@test-company.com",
|
||||
"isAdmin": False,
|
||||
}
|
||||
mock_users_list = MagicMock()
|
||||
mock_users_list.execute.return_value = {"users": [stale_user_1, stale_user_2]}
|
||||
mock_service.users().list.return_value = mock_users_list
|
||||
mock_service.users().list_next.return_value = None
|
||||
|
||||
mock_roles_list = MagicMock()
|
||||
mock_roles_list.execute.return_value = {
|
||||
"items": [ROLE_SUPER_ADMIN, ROLE_SEED_ADMIN, ROLE_GROUPS_ADMIN]
|
||||
}
|
||||
mock_service.roles().list.return_value = mock_roles_list
|
||||
mock_service.roles().list_next.return_value = None
|
||||
|
||||
mock_ra = MagicMock()
|
||||
mock_ra.execute.return_value = {
|
||||
"items": [
|
||||
{"assignedTo": "user1-id", "roleId": SEED_ADMIN_ROLE_ID},
|
||||
{"assignedTo": "user2-id", "roleId": SUPER_ADMIN_ROLE_ID},
|
||||
{"assignedTo": "user2-id", "roleId": GROUPS_ADMIN_ROLE_ID},
|
||||
]
|
||||
}
|
||||
mock_service.roleAssignments().list.return_value = mock_ra
|
||||
mock_service.roleAssignments().list_next.return_value = None
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
|
||||
return_value=mock_service,
|
||||
),
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_service import (
|
||||
Directory,
|
||||
)
|
||||
|
||||
directory = Directory(mock_provider)
|
||||
|
||||
user1 = directory.users["user1-id"]
|
||||
user2 = directory.users["user2-id"]
|
||||
assert user1.is_admin is True
|
||||
assert user2.is_admin is True
|
||||
|
||||
assert [r.name for r in user1.role_assignments] == ["_SEED_ADMIN_ROLE"]
|
||||
user2_role_names = {r.name for r in user2.role_assignments}
|
||||
assert user2_role_names == {"Super Admin", "_GROUPS_ADMIN_ROLE"}
|
||||
|
||||
@@ -0,0 +1,446 @@
|
||||
from unittest.mock import patch
|
||||
|
||||
from prowler.providers.googleworkspace.services.directory.directory_service import (
|
||||
Role,
|
||||
User,
|
||||
)
|
||||
from tests.providers.googleworkspace.googleworkspace_fixtures import (
|
||||
CUSTOMER_ID,
|
||||
DOMAIN,
|
||||
set_mocked_googleworkspace_provider,
|
||||
)
|
||||
|
||||
SUPER_ADMIN_ROLE = Role(
|
||||
id="13801188331880449",
|
||||
name="Super Admin",
|
||||
description="Super Admin",
|
||||
is_super_admin_role=True,
|
||||
)
|
||||
SEED_ADMIN_ROLE = Role(
|
||||
id="13801188331880451",
|
||||
name="_SEED_ADMIN_ROLE",
|
||||
description="Super Admin",
|
||||
is_super_admin_role=True,
|
||||
)
|
||||
GROUPS_ADMIN_ROLE = Role(
|
||||
id="13801188331880450",
|
||||
name="_GROUPS_ADMIN_ROLE",
|
||||
description="Groups Administrator",
|
||||
is_super_admin_role=False,
|
||||
)
|
||||
USER_MANAGEMENT_ADMIN_ROLE = Role(
|
||||
id="13801188331880452",
|
||||
name="_USER_MANAGEMENT_ADMIN_ROLE",
|
||||
description="User Management Administrator",
|
||||
is_super_admin_role=False,
|
||||
)
|
||||
CUSTOM_ROLE_NO_DESCRIPTION = Role(
|
||||
id="13801188331880453",
|
||||
name="custom-helpdesk-role",
|
||||
description="",
|
||||
is_super_admin_role=False,
|
||||
)
|
||||
|
||||
|
||||
class TestDirectorySuperAdminOnlyAdminRoles:
|
||||
def test_pass_super_admins_only_super_admin_role(self):
|
||||
"""Test PASS when super admins have only the Super Admin role"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="admin1@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE],
|
||||
),
|
||||
"admin2-id": User(
|
||||
id="admin2-id",
|
||||
email="admin2@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE],
|
||||
),
|
||||
"user1-id": User(
|
||||
id="user1-id",
|
||||
email="user@test-company.com",
|
||||
is_admin=False,
|
||||
role_assignments=[],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
assert "used only for super admin activities" in findings[0].status_extended
|
||||
assert findings[0].resource_name == DOMAIN
|
||||
assert findings[0].customer_id == CUSTOMER_ID
|
||||
|
||||
def test_pass_super_admin_with_seed_admin_role(self):
|
||||
"""Test PASS when a super admin only holds _SEED_ADMIN_ROLE.
|
||||
|
||||
_SEED_ADMIN_ROLE is auto-assigned by Google to the original domain
|
||||
creator and has isSuperAdminRole=True, so it must not count as an
|
||||
"extra" role.
|
||||
"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="playground@prowler.cloud",
|
||||
is_admin=True,
|
||||
role_assignments=[SEED_ADMIN_ROLE],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
assert "_SEED_ADMIN_ROLE" not in findings[0].status_extended
|
||||
|
||||
def test_pass_super_admin_with_both_super_admin_and_seed_admin(self):
|
||||
"""Test PASS when admin holds both Super Admin and _SEED_ADMIN_ROLE"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="playground@prowler.cloud",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE, SEED_ADMIN_ROLE],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
|
||||
def test_fail_super_admin_with_additional_roles(self):
|
||||
"""Test FAIL when a super admin also has additional admin roles"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="admin1@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
|
||||
),
|
||||
"user1-id": User(
|
||||
id="user1-id",
|
||||
email="user@test-company.com",
|
||||
is_admin=False,
|
||||
role_assignments=[],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "admin1@test-company.com" in findings[0].status_extended
|
||||
assert "Groups Administrator" in findings[0].status_extended
|
||||
assert "_GROUPS_ADMIN_ROLE" not in findings[0].status_extended
|
||||
assert "used only for super admin activities" in findings[0].status_extended
|
||||
assert findings[0].resource_name == DOMAIN
|
||||
assert findings[0].customer_id == CUSTOMER_ID
|
||||
|
||||
def test_fail_seed_admin_with_additional_roles(self):
|
||||
"""Test FAIL when a _SEED_ADMIN_ROLE holder also has extra roles"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="playground@prowler.cloud",
|
||||
is_admin=True,
|
||||
role_assignments=[SEED_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "playground@prowler.cloud" in findings[0].status_extended
|
||||
assert "Groups Administrator" in findings[0].status_extended
|
||||
assert "_GROUPS_ADMIN_ROLE" not in findings[0].status_extended
|
||||
assert "_SEED_ADMIN_ROLE" not in findings[0].status_extended
|
||||
|
||||
def test_fail_multiple_super_admins_with_extra_roles(self):
|
||||
"""Test FAIL lists all super admins that have additional roles"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="admin1@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
|
||||
),
|
||||
"admin2-id": User(
|
||||
id="admin2-id",
|
||||
email="admin2@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE, USER_MANAGEMENT_ADMIN_ROLE],
|
||||
),
|
||||
"admin3-id": User(
|
||||
id="admin3-id",
|
||||
email="admin3@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "admin1@test-company.com" in findings[0].status_extended
|
||||
assert "admin2@test-company.com" in findings[0].status_extended
|
||||
assert "admin3@test-company.com" not in findings[0].status_extended
|
||||
|
||||
def test_no_findings_when_no_users(self):
|
||||
"""Test no findings when there are no users"""
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = {}
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 0
|
||||
|
||||
def test_non_super_admin_with_roles_not_flagged(self):
|
||||
"""Test that users who are not super admins are ignored even if they have roles"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="admin1@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE],
|
||||
),
|
||||
"delegated1-id": User(
|
||||
id="delegated1-id",
|
||||
email="delegated@test-company.com",
|
||||
is_admin=False,
|
||||
role_assignments=[GROUPS_ADMIN_ROLE],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
assert "delegated@test-company.com" not in findings[0].status_extended
|
||||
|
||||
def test_pass_super_admin_with_empty_role_assignments(self):
|
||||
"""Test PASS when super admin has no role assignments (edge case)"""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="admin1@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "PASS"
|
||||
|
||||
def test_fail_custom_role_without_description_falls_back_to_name(self):
|
||||
"""A custom role with an empty description should be displayed
|
||||
using its name as a fall-back, so the FAIL message is never blank
|
||||
for users that genuinely hold extra roles."""
|
||||
users = {
|
||||
"admin1-id": User(
|
||||
id="admin1-id",
|
||||
email="admin1@test-company.com",
|
||||
is_admin=True,
|
||||
role_assignments=[SUPER_ADMIN_ROLE, CUSTOM_ROLE_NO_DESCRIPTION],
|
||||
),
|
||||
}
|
||||
|
||||
mock_provider = set_mocked_googleworkspace_provider()
|
||||
|
||||
with (
|
||||
patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=mock_provider,
|
||||
),
|
||||
patch(
|
||||
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
|
||||
) as mock_directory_client,
|
||||
):
|
||||
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
|
||||
directory_super_admin_only_admin_roles,
|
||||
)
|
||||
|
||||
mock_directory_client.users = users
|
||||
mock_directory_client.provider = mock_provider
|
||||
|
||||
check = directory_super_admin_only_admin_roles()
|
||||
findings = check.execute()
|
||||
|
||||
assert len(findings) == 1
|
||||
assert findings[0].status == "FAIL"
|
||||
assert "custom-helpdesk-role" in findings[0].status_extended
|
||||
@@ -0,0 +1,711 @@
|
||||
"""Tests for the entra_conditional_access_policy_directory_sync_account_excluded check."""
|
||||
|
||||
from unittest import mock
|
||||
from uuid import uuid4
|
||||
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ApplicationEnforcedRestrictions,
|
||||
ApplicationsConditions,
|
||||
ConditionalAccessGrantControl,
|
||||
ConditionalAccessPolicyState,
|
||||
Conditions,
|
||||
GrantControlOperator,
|
||||
GrantControls,
|
||||
PersistentBrowser,
|
||||
SessionControls,
|
||||
SignInFrequency,
|
||||
SignInFrequencyInterval,
|
||||
UsersConditions,
|
||||
)
|
||||
from tests.providers.m365.m365_fixtures import DOMAIN, set_mocked_m365_provider
|
||||
|
||||
DIRECTORY_SYNC_ROLE_TEMPLATE_ID = "d29b2b05-8046-44ba-8758-1e26182fcf32"
|
||||
|
||||
CHECK_MODULE_PATH = "prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded"
|
||||
|
||||
|
||||
def _default_session_controls():
|
||||
"""Return default session controls for test policies."""
|
||||
return SessionControls(
|
||||
persistent_browser=PersistentBrowser(is_enabled=False, mode="always"),
|
||||
sign_in_frequency=SignInFrequency(
|
||||
is_enabled=False,
|
||||
frequency=None,
|
||||
type=None,
|
||||
interval=SignInFrequencyInterval.EVERY_TIME,
|
||||
),
|
||||
application_enforced_restrictions=ApplicationEnforcedRestrictions(
|
||||
is_enabled=False
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _default_grant_controls():
|
||||
"""Return default grant controls requiring MFA for test policies."""
|
||||
return GrantControls(
|
||||
built_in_controls=[ConditionalAccessGrantControl.MFA],
|
||||
operator=GrantControlOperator.AND,
|
||||
authentication_strength=None,
|
||||
)
|
||||
|
||||
|
||||
class Test_entra_conditional_access_policy_directory_sync_account_excluded:
|
||||
"""Test class for Directory Sync Account exclusion check."""
|
||||
|
||||
def test_no_conditional_access_policies(self):
|
||||
"""Test PASS when no Conditional Access policies exist."""
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
|
||||
)
|
||||
assert result[0].resource == {}
|
||||
assert result[0].resource_name == "Conditional Access Policies"
|
||||
assert result[0].resource_id == "conditionalAccessPolicies"
|
||||
assert result[0].location == "global"
|
||||
|
||||
def test_policy_disabled(self):
|
||||
"""Test PASS when only a disabled policy exists targeting all users and apps."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for All Users"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.DISABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
|
||||
)
|
||||
assert result[0].resource == {}
|
||||
assert result[0].resource_name == "Conditional Access Policies"
|
||||
assert result[0].resource_id == "conditionalAccessPolicies"
|
||||
assert result[0].location == "global"
|
||||
|
||||
def test_policy_targets_specific_users(self):
|
||||
"""Test PASS when the policy targets specific users, not all users."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for Admins"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=["some-group-id"],
|
||||
excluded_groups=[],
|
||||
included_users=[],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
|
||||
)
|
||||
|
||||
def test_policy_targets_specific_apps(self):
|
||||
"""Test PASS when the policy targets specific apps, not all apps."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for Office 365"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["some-app-id"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== "No Conditional Access Policy targets all users and all cloud apps, so no Directory Synchronization Accounts exclusion is needed."
|
||||
)
|
||||
|
||||
def test_policy_enabled_without_sync_exclusion(self):
|
||||
"""Test FAIL when an enabled policy targets all users and all apps but does not exclude the sync role."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for All Users"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
|
||||
)
|
||||
assert (
|
||||
result[0].resource
|
||||
== entra_client.conditional_access_policies[policy_id].dict()
|
||||
)
|
||||
assert result[0].resource_name == display_name
|
||||
assert result[0].resource_id == policy_id
|
||||
assert result[0].location == "global"
|
||||
|
||||
def test_policy_report_only_without_sync_exclusion(self):
|
||||
"""Test FAIL when a report-only policy targets all users and apps without excluding the sync role."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Report Only - Require MFA"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED_FOR_REPORTING,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
|
||||
)
|
||||
assert (
|
||||
result[0].resource
|
||||
== entra_client.conditional_access_policies[policy_id].dict()
|
||||
)
|
||||
assert result[0].resource_name == display_name
|
||||
assert result[0].resource_id == policy_id
|
||||
assert result[0].location == "global"
|
||||
|
||||
def test_policy_enabled_with_sync_exclusion(self):
|
||||
"""Test PASS when an enabled policy targets all users and apps and excludes the sync role."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for All Users"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[
|
||||
DIRECTORY_SYNC_ROLE_TEMPLATE_ID,
|
||||
],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name}' excludes the Directory Synchronization Accounts role."
|
||||
)
|
||||
assert (
|
||||
result[0].resource
|
||||
== entra_client.conditional_access_policies[policy_id].dict()
|
||||
)
|
||||
assert result[0].resource_name == display_name
|
||||
assert result[0].resource_id == policy_id
|
||||
assert result[0].location == "global"
|
||||
|
||||
def test_policy_with_sync_role_and_other_excluded_roles(self):
|
||||
"""Test PASS when the sync role is excluded alongside other roles."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for All Users"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[
|
||||
"some-other-role-id",
|
||||
DIRECTORY_SYNC_ROLE_TEMPLATE_ID,
|
||||
],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "PASS"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name}' excludes the Directory Synchronization Accounts role."
|
||||
)
|
||||
|
||||
def test_multiple_policies_mixed_results(self):
|
||||
"""Test multiple policies where one excludes sync role and another does not."""
|
||||
policy_id_pass = str(uuid4())
|
||||
policy_id_fail = str(uuid4())
|
||||
display_name_pass = "MFA Policy - With Exclusion"
|
||||
display_name_fail = "MFA Policy - Without Exclusion"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id_pass: ConditionalAccessPolicy(
|
||||
id=policy_id_pass,
|
||||
display_name=display_name_pass,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[
|
||||
DIRECTORY_SYNC_ROLE_TEMPLATE_ID,
|
||||
],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
),
|
||||
policy_id_fail: ConditionalAccessPolicy(
|
||||
id=policy_id_fail,
|
||||
display_name=display_name_fail,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=[],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
),
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 2
|
||||
|
||||
pass_results = [r for r in result if r.status == "PASS"]
|
||||
fail_results = [r for r in result if r.status == "FAIL"]
|
||||
|
||||
assert len(pass_results) == 1
|
||||
assert len(fail_results) == 1
|
||||
|
||||
assert pass_results[0].resource_name == display_name_pass
|
||||
assert pass_results[0].resource_id == policy_id_pass
|
||||
assert (
|
||||
pass_results[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name_pass}' excludes the Directory Synchronization Accounts role."
|
||||
)
|
||||
|
||||
assert fail_results[0].resource_name == display_name_fail
|
||||
assert fail_results[0].resource_id == policy_id_fail
|
||||
assert (
|
||||
fail_results[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name_fail}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
|
||||
)
|
||||
|
||||
def test_policy_with_wrong_excluded_role(self):
|
||||
"""Test FAIL when the policy excludes a different role but not the sync role."""
|
||||
policy_id = str(uuid4())
|
||||
display_name = "Require MFA for All Users"
|
||||
entra_client = mock.MagicMock
|
||||
entra_client.audited_tenant = "audited_tenant"
|
||||
entra_client.audited_domain = DOMAIN
|
||||
|
||||
with (
|
||||
mock.patch(
|
||||
"prowler.providers.common.provider.Provider.get_global_provider",
|
||||
return_value=set_mocked_m365_provider(),
|
||||
),
|
||||
mock.patch(
|
||||
f"{CHECK_MODULE_PATH}.entra_client",
|
||||
new=entra_client,
|
||||
),
|
||||
):
|
||||
from prowler.providers.m365.services.entra.entra_conditional_access_policy_directory_sync_account_excluded.entra_conditional_access_policy_directory_sync_account_excluded import (
|
||||
entra_conditional_access_policy_directory_sync_account_excluded,
|
||||
)
|
||||
from prowler.providers.m365.services.entra.entra_service import (
|
||||
ConditionalAccessPolicy,
|
||||
)
|
||||
|
||||
entra_client.conditional_access_policies = {
|
||||
policy_id: ConditionalAccessPolicy(
|
||||
id=policy_id,
|
||||
display_name=display_name,
|
||||
conditions=Conditions(
|
||||
application_conditions=ApplicationsConditions(
|
||||
included_applications=["All"],
|
||||
excluded_applications=[],
|
||||
included_user_actions=[],
|
||||
),
|
||||
user_conditions=UsersConditions(
|
||||
included_groups=[],
|
||||
excluded_groups=[],
|
||||
included_users=["All"],
|
||||
excluded_users=[],
|
||||
included_roles=[],
|
||||
excluded_roles=["some-other-role-id"],
|
||||
),
|
||||
client_app_types=[],
|
||||
user_risk_levels=[],
|
||||
),
|
||||
grant_controls=_default_grant_controls(),
|
||||
session_controls=_default_session_controls(),
|
||||
state=ConditionalAccessPolicyState.ENABLED,
|
||||
)
|
||||
}
|
||||
|
||||
check = entra_conditional_access_policy_directory_sync_account_excluded()
|
||||
result = check.execute()
|
||||
assert len(result) == 1
|
||||
assert result[0].status == "FAIL"
|
||||
assert (
|
||||
result[0].status_extended
|
||||
== f"Conditional Access Policy '{display_name}' does not exclude the Directory Synchronization Accounts role, which may break Entra Connect sync."
|
||||
)
|
||||
assert result[0].resource_name == display_name
|
||||
assert result[0].resource_id == policy_id
|
||||
@@ -6,6 +6,12 @@
|
||||
|
||||
set -e
|
||||
|
||||
# The Python pre-commit framework (see .pre-commit-config.yaml, hook "ui-checks")
|
||||
# exports GIT_WORK_TREE, GIT_DIR, and GIT_INDEX_FILE pointing to its temp staging
|
||||
# area. Unset them so git commands below resolve against the real repo and index.
|
||||
# See: https://github.com/prowler-cloud/prowler/pull/10574
|
||||
unset GIT_WORK_TREE GIT_DIR GIT_INDEX_FILE GIT_PREFIX GIT_COMMON_DIR GIT_OBJECT_DIRECTORY
|
||||
|
||||
# Colors
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
|
||||
Reference in New Issue
Block a user