Merge remote-tracking branch 'origin/master' into HEAD

This commit is contained in:
Hugo P.Brito
2026-04-08 12:38:40 +01:00
79 changed files with 2901 additions and 180 deletions
+2 -2
View File
@@ -216,11 +216,11 @@ jobs:
echo "AWS service_paths='${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}'"
if [ "${STEPS_AWS_SERVICES_OUTPUTS_RUN_ALL}" = "true" ]; then
poetry run pytest -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml tests/providers/aws
poetry run pytest -p no:randomly -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml tests/providers/aws
elif [ -z "${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}" ]; then
echo "No AWS service paths detected; skipping AWS tests."
else
poetry run pytest -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml ${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}
poetry run pytest -p no:randomly -n auto --cov=./prowler/providers/aws --cov-report=xml:aws_coverage.xml ${STEPS_AWS_SERVICES_OUTPUTS_SERVICE_PATHS}
fi
env:
STEPS_AWS_SERVICES_OUTPUTS_RUN_ALL: ${{ steps.aws-services.outputs.run_all }}
+4 -1
View File
@@ -317,7 +317,10 @@ python prowler-cli.py -v
- **Prowler SDK**: A Python SDK designed to extend the functionality of the Prowler CLI for advanced capabilities.
- **Prowler MCP Server**: A Model Context Protocol server that provides AI tools for Lighthouse, the AI-powered security assistant. This is a critical dependency for Lighthouse functionality.
![Prowler App Architecture](docs/products/img/prowler-app-architecture.png)
![Prowler App Architecture](docs/images/products/prowler-app-architecture.png)
<!-- Diagram source: docs/images/products/prowler-app-architecture.mmd — edit there, re-render at https://mermaid.live, and replace the PNG. -->
## Prowler CLI
+2
View File
@@ -10,6 +10,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Filter RBAC role lookup by `tenant_id` to prevent cross-tenant privilege leak [(#10491)](https://github.com/prowler-cloud/prowler/pull/10491)
- `VALKEY_SCHEME`, `VALKEY_USERNAME`, and `VALKEY_PASSWORD` environment variables to configure Celery broker TLS/auth connection details for Valkey/ElastiCache [(#10420)](https://github.com/prowler-cloud/prowler/pull/10420)
- `Vercel` provider support [(#10190)](https://github.com/prowler-cloud/prowler/pull/10190)
- Finding groups list and latest endpoints support `sort=delta`, ordering by `new_count` then `changed_count` so groups with the most new findings rank highest [(#10606)](https://github.com/prowler-cloud/prowler/pull/10606)
### 🔄 Changed
@@ -28,6 +29,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Membership `post_delete` signal using raw FK ids to avoid `DoesNotExist` during cascade deletions [(#10497)](https://github.com/prowler-cloud/prowler/pull/10497)
- Finding group resources endpoints returning false 404 when filters match no results, and `sort` parameter being ignored [(#10510)](https://github.com/prowler-cloud/prowler/pull/10510)
- Jira integration failing with `JiraInvalidIssueTypeError` on non-English Jira instances due to hardcoded `"Task"` issue type; now dynamically fetches available issue types per project [(#10534)](https://github.com/prowler-cloud/prowler/pull/10534)
- Finding group `first_seen_at` now reflects when a new finding appeared in the scan instead of the oldest carry-forward date across all unchanged findings [(#10595)](https://github.com/prowler-cloud/prowler/pull/10595)
- Attack Paths: Remove `clear_cache` call from read-only query endpoints; cache clearing belongs to the scan/ingestion flow, not API queries [(#10586)](https://github.com/prowler-cloud/prowler/pull/10586)
### 🔐 Security
+33
View File
@@ -16839,6 +16839,39 @@ class TestFindingGroupViewSet:
data = response.json()["data"]
assert len(data) > 0
@pytest.mark.parametrize(
"endpoint_name", ["finding-group-list", "finding-group-latest"]
)
def test_finding_groups_sort_by_delta(
self,
authenticated_client,
finding_groups_fixture,
endpoint_name,
):
"""Sort by delta orders by new_count then changed_count (lexicographic)."""
params = {"sort": "-delta"}
if endpoint_name == "finding-group-list":
params["filter[inserted_at]"] = TODAY
response = authenticated_client.get(reverse(endpoint_name), params)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) > 0
def delta_key(item):
attrs = item["attributes"]
return (attrs.get("new_count", 0), attrs.get("changed_count", 0))
desc_keys = [delta_key(item) for item in data]
assert desc_keys == sorted(desc_keys, reverse=True)
# Ascending order produces the inverse arrangement
params["sort"] = "delta"
response = authenticated_client.get(reverse(endpoint_name), params)
assert response.status_code == status.HTTP_200_OK
asc_keys = [delta_key(item) for item in response.json()["data"]]
assert asc_keys == sorted(asc_keys)
def test_finding_groups_latest_ignores_date_filters(
self, authenticated_client, finding_groups_fixture
):
+1
View File
@@ -4216,6 +4216,7 @@ class FindingGroupResourceSerializer(BaseSerializerV1):
provider = serializers.SerializerMethodField()
status = serializers.CharField()
severity = serializers.CharField()
delta = serializers.CharField(required=False, allow_null=True)
first_seen_at = serializers.DateTimeField(required=False, allow_null=True)
last_seen_at = serializers.DateTimeField(required=False, allow_null=True)
muted_reason = serializers.CharField(required=False, allow_null=True)
+57 -1
View File
@@ -7219,6 +7219,7 @@ class FindingGroupViewSet(BaseRLSViewSet):
"check_id": "check_id",
"check_title": "check_title",
"severity": "severity_order",
"delta": "delta_order",
"fail_count": "fail_count",
"pass_count": "pass_count",
"muted_count": "muted_count",
@@ -7234,6 +7235,7 @@ class FindingGroupViewSet(BaseRLSViewSet):
_RESOURCE_SORT_MAP = {
"status": "status_order",
"severity": "severity_order",
"delta": "delta_order",
"first_seen_at": "first_seen_at",
"last_seen_at": "last_seen_at",
"resource.uid": "resource_uid",
@@ -7370,6 +7372,22 @@ class FindingGroupViewSet(BaseRLSViewSet):
output_field=IntegerField(),
)
),
delta_order=Max(
Case(
When(
finding__delta="new",
finding__muted=False,
then=Value(2),
),
When(
finding__delta="changed",
finding__muted=False,
then=Value(1),
),
default=Value(0),
output_field=IntegerField(),
)
),
first_seen_at=Min("finding__first_seen_at"),
last_seen_at=Max("finding__inserted_at"),
# Max() on muted_reason / check_metadata is safe because
@@ -7402,6 +7420,22 @@ class FindingGroupViewSet(BaseRLSViewSet):
output_field=IntegerField(),
)
),
"delta_order": lambda: Max(
Case(
When(
finding__delta="new",
finding__muted=False,
then=Value(2),
),
When(
finding__delta="changed",
finding__muted=False,
then=Value(1),
),
default=Value(0),
output_field=IntegerField(),
)
),
"first_seen_at": lambda: Min("finding__first_seen_at"),
"last_seen_at": lambda: Max("finding__inserted_at"),
"resource_uid": lambda: Max("resource__uid"),
@@ -7448,6 +7482,14 @@ class FindingGroupViewSet(BaseRLSViewSet):
else:
status = "MUTED"
delta_order = row.get("delta_order", 0)
if delta_order == 2:
delta = "new"
elif delta_order == 1:
delta = "changed"
else:
delta = None
results.append(
{
"resource_id": row["resource_id"],
@@ -7463,6 +7505,7 @@ class FindingGroupViewSet(BaseRLSViewSet):
"severity": SEVERITY_ORDER_REVERSE.get(
severity_order, "informational"
),
"delta": delta,
"first_seen_at": row["first_seen_at"],
"last_seen_at": row["last_seen_at"],
"muted_reason": row.get("muted_reason"),
@@ -7527,7 +7570,20 @@ class FindingGroupViewSet(BaseRLSViewSet):
sort_param, self._FINDING_GROUP_SORT_MAP
)
if ordering:
aggregated_queryset = aggregated_queryset.order_by(*ordering)
# delta_order is a virtual sort field: expand it to a
# lexicographic ordering by (new_count, changed_count) so groups
# with more new findings rank higher, with changed_count as the
# tie-breaker (preserves the "new > changed" priority used by
# the resources endpoint, but driven by the actual counters).
expanded_ordering = []
for field in ordering:
if field.lstrip("-") == "delta_order":
sign = "-" if field.startswith("-") else ""
expanded_ordering.append(f"{sign}new_count")
expanded_ordering.append(f"{sign}changed_count")
else:
expanded_ordering.append(field)
aggregated_queryset = aggregated_queryset.order_by(*expanded_ordering)
else:
aggregated_queryset = aggregated_queryset.order_by(
"-fail_count", "-severity_order", "check_id"
+3 -1
View File
@@ -1824,7 +1824,9 @@ def aggregate_finding_group_summaries(tenant_id: str, scan_id: str):
filter=Q(status="FAIL", muted=False),
),
# Use prefixed names to avoid conflict with model field names
agg_first_seen_at=Min("first_seen_at"),
agg_first_seen_at=Min(
"first_seen_at", filter=Q(delta="new", muted=False)
),
agg_last_seen_at=Max("inserted_at"),
agg_failing_since=Min(
"first_seen_at", filter=Q(status="FAIL", muted=False)
+29
View File
@@ -750,6 +750,35 @@ def init_parser(self):
# More arguments for the provider.
```
##### Sensitive CLI Arguments
CLI flags that accept secrets (tokens, passwords, API keys) require special handling to protect credentials from leaking in HTML output and process listings:
1. **Use `nargs="?"` with `default=None`** so the flag works both with and without an inline value. This allows the provider to fall back to an environment variable when no value is passed.
2. **Add a `SENSITIVE_ARGUMENTS` frozenset** at the top of the `arguments.py` file listing every flag that accepts secret values:
```python
SENSITIVE_ARGUMENTS = frozenset({"--your-provider-password", "--your-provider-token"})
```
Prowler automatically discovers these frozensets and uses them to redact values in HTML output and warn users who pass secrets directly on the command line.
3. **Document the environment variable** in the `help` text so users know the recommended alternative:
```python
<provider_name>_parser.add_argument(
"--your-provider-password",
nargs="?",
default=None,
metavar="PASSWORD",
help="Password for authentication. We recommend using the YOUR_PROVIDER_PASSWORD environment variable instead.",
)
```
<Warning>
Do not add new arguments that require passing secrets as CLI values without an environment variable fallback. Prowler CLI warns users when sensitive flags receive explicit values on the command line.
</Warning>
#### Step 5: Implement Mutelist
**Explanation:**
Binary file not shown.

Before

Width:  |  Height:  |  Size: 192 KiB

@@ -11,16 +11,19 @@ Prowler App is a web application that simplifies running Prowler. It provides:
## Components
Prowler App consists of three main components:
Prowler App consists of four main components:
- **Prowler UI**: User-friendly web interface for running Prowler and viewing results, powered by Next.js
- **Prowler API**: Backend API that executes Prowler scans and stores results, built with Django REST Framework
- **Prowler SDK**: Python SDK that integrates with Prowler CLI for advanced functionality
- **Prowler MCP Server**: Model Context Protocol server that exposes AI tools for Lighthouse, the AI-powered security assistant. Required dependency for Lighthouse.
Supporting infrastructure includes:
- **PostgreSQL**: Persistent storage of scan results
- **Celery Workers**: Asynchronous execution of Prowler scans
- **Celery Beat (API Scheduler)**: Schedules recurring scans and enqueues jobs on the broker
- **Valkey**: In-memory database serving as message broker for Celery workers
- **Neo4j**: Graph database used by the Attack Paths feature to combine cloud inventory with Prowler findings (currently populated by AWS scans)
![Prowler App Architecture](/images/products/prowler-app-architecture.png)
@@ -0,0 +1,37 @@
flowchart TB
user([User / Security Team])
cli([Prowler CLI])
subgraph APP["Prowler App"]
ui["Prowler UI<br/>(Next.js)"]
api["Prowler API<br/>(Django REST Framework)"]
worker["API Worker<br/>(Celery)"]
beat["API Scheduler<br/>(Celery Beat)"]
mcp["Prowler MCP Server<br/>(Lighthouse AI tools)"]
end
sdk["Prowler SDK<br/>(Python)"]
subgraph DATA["Data Layer"]
pg[("PostgreSQL")]
valkey[("Valkey / Redis")]
neo4j[("Neo4j")]
end
providers["Providers"]
user --> ui
user --> cli
ui -->|REST| api
api --> pg
api --> valkey
beat -->|enqueue jobs| valkey
valkey -->|dispatch| worker
worker --> pg
worker -->|Attack Paths| neo4j
worker -->|invokes| sdk
cli --> sdk
api -. AI tools .-> mcp
mcp -. context .-> api
sdk --> providers
Binary file not shown.

Before

Width:  |  Height:  |  Size: 192 KiB

After

Width:  |  Height:  |  Size: 268 KiB

+24 -8
View File
@@ -66,22 +66,38 @@ prowler <provider> --categories internet-exposed
### Shodan
Prowler allows you check if any public IPs in your Cloud environments are exposed in Shodan with the `-N`/`--shodan <shodan_api_key>` option:
Prowler can check whether any public IPs in cloud environments are exposed in Shodan using the `-N`/`--shodan` option.
For example, you can check if any of your AWS Elastic Compute Cloud (EC2) instances has an elastic IP exposed in Shodan:
#### Using the Environment Variable (Recommended)
Set the `SHODAN_API_KEY` environment variable to avoid exposing the API key in process listings and shell history:
```console
prowler aws -N/--shodan <shodan_api_key> -c ec2_elastic_ip_shodan
export SHODAN_API_KEY=<shodan_api_key>
```
Also, you can check if any of your Azure Subscription has an public IP exposed in Shodan:
Then run Prowler with the `--shodan` flag (no value needed):
```console
prowler azure -N/--shodan <shodan_api_key> -c network_public_ip_shodan
prowler aws --shodan -c ec2_elastic_ip_shodan
```
And finally, you can check if any of your GCP projects has an public IP address exposed in Shodan:
```console
prowler gcp -N/--shodan <shodan_api_key> -c compute_public_address_shodan
prowler azure --shodan -c network_public_ip_shodan
```
```console
prowler gcp --shodan -c compute_public_address_shodan
```
#### Using the CLI Flag
Alternatively, pass the API key directly on the command line:
```console
prowler aws --shodan <shodan_api_key> -c ec2_elastic_ip_shodan
```
<Warning>
Passing secret values directly on the command line exposes them in process listings and shell history. Prowler CLI displays a warning when this pattern is detected. Use the `SHODAN_API_KEY` environment variable instead.
</Warning>
@@ -6,17 +6,19 @@ import { VersionBadge } from "/snippets/version-badge.mdx"
<VersionBadge version="5.19.0" />
Prowler for Google Workspace uses a **Service Account with Domain-Wide Delegation** to authenticate to the Google Workspace Admin SDK. This allows Prowler to read directory data on behalf of a super administrator without requiring an interactive login.
Prowler for Google Workspace uses a **Service Account with Domain-Wide Delegation** to authenticate to the Google Workspace Admin SDK and the Cloud Identity Policy API. This allows Prowler to read directory data and domain-level application policies on behalf of a super administrator without requiring an interactive login.
## Required Open Authorization (OAuth) Scopes
Prowler requests the following read-only OAuth 2.0 scopes from the Google Workspace Admin SDK:
Prowler requests the following read-only OAuth 2.0 scopes:
| Scope | Description |
|-------|-------------|
| `https://www.googleapis.com/auth/admin.directory.user.readonly` | Read access to user accounts and their admin status |
| `https://www.googleapis.com/auth/admin.directory.domain.readonly` | Read access to domain information |
| `https://www.googleapis.com/auth/admin.directory.customer.readonly` | Read access to customer information (Customer ID) |
| `https://www.googleapis.com/auth/cloud-identity.policies.readonly` | Read access to domain-level application policies (required for Calendar service checks) |
| `https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly` | Read access to admin roles and role assignments |
<Warning>
The delegated user must be a **super administrator** in your Google Workspace organization. Using a non-admin account will result in permission errors when accessing the Admin SDK.
@@ -30,13 +32,24 @@ If no GCP project exists, create one at [https://console.cloud.google.com](https
The project is only used to host the Service Account — it does not need to have any Google Workspace data in it.
### Step 2: Enable the Admin SDK API
### Step 2: Enable Required APIs
1. Navigate to the [Google Cloud Console](https://console.cloud.google.com)
2. Select the target project
3. Navigate to **APIs & Services → Library**
4. Search for **Admin SDK API**
5. Click **Enable**
In the [Google Cloud Console](https://console.cloud.google.com), select the target project and navigate to **APIs & Services → Library**. Search for and enable each of the following APIs:
| API | Required For |
|-----|--------------|
| **Admin SDK API** | Directory service checks (users, roles, domains) |
| **Cloud Identity API** | Calendar service checks (domain-level sharing and invitation policies) |
For each API:
1. Search for the API name in the library
2. Click the API result
3. Click **Enable**
<Note>
Both APIs must be enabled in the same GCP project that hosts the Service Account. Calendar checks will return no findings if the Cloud Identity API is not enabled.
</Note>
### Step 3: Create a Service Account
@@ -73,7 +86,7 @@ This JSON key grants access to your Google Workspace organization. Never commit
6. In the **OAuth scopes** field, enter the following scopes as a comma-separated list:
```
https://www.googleapis.com/auth/admin.directory.user.readonly,https://www.googleapis.com/auth/admin.directory.domain.readonly,https://www.googleapis.com/auth/admin.directory.customer.readonly
https://www.googleapis.com/auth/admin.directory.user.readonly,https://www.googleapis.com/auth/admin.directory.domain.readonly,https://www.googleapis.com/auth/admin.directory.customer.readonly,https://www.googleapis.com/auth/cloud-identity.policies.readonly,https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly
```
7. Click **Authorize**
@@ -114,7 +127,7 @@ The delegated user must be provided via the `GOOGLEWORKSPACE_DELEGATED_USER` env
- **Use environment variables** — Never hardcode credentials in scripts or commands
- **Use a dedicated Service Account** — Create one specifically for Prowler, separate from other integrations
- **Use read-only scopes** — Prowler only requires the three read-only scopes listed above
- **Use read-only scopes** — Prowler only requires the read-only scopes listed above
- **Restrict key access** — Set file permissions to `600` on the JSON key file
- **Rotate keys regularly** — Delete and regenerate the JSON key periodically
- **Use a least-privilege super admin** — Consider using a dedicated super admin account for Prowler's delegated user rather than a personal admin account
@@ -151,7 +164,7 @@ python3 -c "import json; json.load(open('/path/to/key.json'))" && echo "Valid JS
The Service Account cannot impersonate the delegated user. This usually means Domain-Wide Delegation has not been configured, or the OAuth scopes are incorrect. Verify:
- The Service Account Client ID is correctly entered in the Admin Console
- All three required OAuth scopes are included
- All required OAuth scopes are included
- The delegated user is a super administrator
### Permission Denied on Admin SDK Calls
@@ -159,5 +172,14 @@ The Service Account cannot impersonate the delegated user. This usually means Do
If Prowler connects but returns empty results or permission errors for specific API calls:
- Confirm Domain-Wide Delegation is fully propagated (wait a few minutes after setup)
- Verify all three scopes are authorized in the Admin Console
- Verify all scopes are authorized in the Admin Console
- Ensure the delegated user is an active super administrator
### Calendar Checks Return No Findings
If the Directory checks run successfully but the Calendar checks (e.g., `calendar_external_sharing_primary_calendar`) return no findings, the Cloud Identity Policy API is not reachable for this Service Account. Verify:
- The **Cloud Identity API** is enabled in the GCP project hosting the Service Account (Step 2)
- The scope `https://www.googleapis.com/auth/cloud-identity.policies.readonly` is included in the Domain-Wide Delegation OAuth scopes list in the Admin Console (Step 5)
- The delegated user is a super administrator (the Policy API only returns data to super admins)
- Domain-Wide Delegation has had time to propagate after adding the new scope (a few minutes)
@@ -78,7 +78,7 @@ The Service Account JSON is the full content of the key file downloaded when cre
![Check Connection](/images/providers/googleworkspace-check-connection.png)
<Note>
If the connection test fails, verify that Domain-Wide Delegation is properly configured and that all three OAuth scopes are authorized. It may take a few minutes for delegation changes to propagate. See the [Troubleshooting](/user-guide/providers/googleworkspace/authentication#troubleshooting) section for common errors.
If the connection test fails, verify that Domain-Wide Delegation is properly configured and that all required OAuth scopes are authorized. It may take a few minutes for delegation changes to propagate. See the [Troubleshooting](/user-guide/providers/googleworkspace/authentication#troubleshooting) section for common errors.
</Note>
### Step 5: Launch the Scan
+6
View File
@@ -11,11 +11,13 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `glue_etl_jobs_no_secrets_in_arguments` check for plaintext secrets in AWS Glue ETL job arguments [(#10368)](https://github.com/prowler-cloud/prowler/pull/10368)
- `awslambda_function_no_dead_letter_queue`, `awslambda_function_using_cross_account_layers`, and `awslambda_function_env_vars_not_encrypted_with_cmk` checks for AWS Lambda [(#10381)](https://github.com/prowler-cloud/prowler/pull/10381)
- `entra_conditional_access_policy_mdm_compliant_device_required` check for M365 provider [(#10220)](https://github.com/prowler-cloud/prowler/pull/10220)
- `directory_super_admin_only_admin_roles` check for Google Workspace provider [(#10488)](https://github.com/prowler-cloud/prowler/pull/10488)
- `ec2_securitygroup_allow_ingress_from_internet_to_any_port_from_ip` check for AWS provider using `ipaddress.is_global` for accurate public IP detection [(#10335)](https://github.com/prowler-cloud/prowler/pull/10335)
- `entra_conditional_access_policy_block_o365_elevated_insider_risk` check for M365 provider [(#10232)](https://github.com/prowler-cloud/prowler/pull/10232)
- `--resource-group` and `--list-resource-groups` CLI flags to filter checks by resource group across all providers [(#10479)](https://github.com/prowler-cloud/prowler/pull/10479)
- CISA SCuBA Google Workspace Baselines compliance [(#10466)](https://github.com/prowler-cloud/prowler/pull/10466)
- CIS Google Workspace Foundations Benchmark v1.3.0 compliance [(#10462)](https://github.com/prowler-cloud/prowler/pull/10462)
- `calendar_external_sharing_primary_calendar`, `calendar_external_sharing_secondary_calendar`, and `calendar_external_invitations_warning` checks for Google Workspace provider using the Cloud Identity Policy API [(#10597)](https://github.com/prowler-cloud/prowler/pull/10597)
- `entra_conditional_access_policy_device_registration_mfa_required` check and `entra_intune_enrollment_sign_in_frequency_every_time` enhancement for M365 provider [(#10222)](https://github.com/prowler-cloud/prowler/pull/10222)
- `entra_conditional_access_policy_block_elevated_insider_risk` check for M365 provider [(#10234)](https://github.com/prowler-cloud/prowler/pull/10234)
- `Vercel` provider support with 30 checks [(#10189)](https://github.com/prowler-cloud/prowler/pull/10189)
@@ -24,6 +26,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Added `internet-exposed` category to 13 AWS checks (CloudFront, CodeArtifact, EC2, EFS, RDS, SageMaker, Shield, VPC) [(#10502)](https://github.com/prowler-cloud/prowler/pull/10502)
- Minimum Python version from 3.9 to 3.10 and updated classifiers to reflect supported versions (3.10, 3.11, 3.12) [(#10464)](https://github.com/prowler-cloud/prowler/pull/10464)
- Sensitive CLI flags now warn when values are passed directly, recommending environment variables instead [(#10532)](https://github.com/prowler-cloud/prowler/pull/10532)
### 🐞 Fixed
@@ -33,6 +36,9 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `--list-checks` and `--list-checks-json` now include `threat-detection` category checks in their output [(#10578)](https://github.com/prowler-cloud/prowler/pull/10578)
- Missing `__init__.py` in `codebuild_project_uses_allowed_github_organizations` check preventing discovery by `--list-checks` [(#10584)](https://github.com/prowler-cloud/prowler/pull/10584)
- Azure Key Vault checks emitting incorrect findings for keys, secrets, and vault logging [(#10332)](https://github.com/prowler-cloud/prowler/pull/10332)
- `is_policy_public` now recognizes `kms:CallerAccount`, `kms:ViaService`, `aws:CalledVia`, `aws:CalledViaFirst`, and `aws:CalledViaLast` as restrictive condition keys, fixing false positives in `kms_key_policy_is_not_public` and other checks that use `is_condition_block_restrictive` [(#10600)](https://github.com/prowler-cloud/prowler/pull/10600)
- `_enabled_regions` empty-set bug in `AwsProvider.generate_regional_clients` creating boto3 clients for all 36 AWS regions instead of the audited ones, causing random CI timeouts and slow test runs [(#10598)](https://github.com/prowler-cloud/prowler/pull/10598)
- Retrieve only the latest version from a package in AWS CodeArtifact [(#10243)](https://github.com/prowler-cloud/prowler/pull/10243)
### 🔐 Security
@@ -54,7 +54,9 @@
{
"Id": "1.1.3",
"Description": "Ensure super admin accounts are used only for super admin activities",
"Checks": [],
"Checks": [
"directory_super_admin_only_admin_roles"
],
"Attributes": [
{
"Section": "1 Directory",
@@ -96,7 +98,9 @@
{
"Id": "3.1.1.1.1",
"Description": "Ensure external sharing options for primary calendars are configured",
"Checks": [],
"Checks": [
"calendar_external_sharing_primary_calendar"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -138,7 +142,9 @@
{
"Id": "3.1.1.1.3",
"Description": "Ensure external invitation warnings for Google Calendar are configured",
"Checks": [],
"Checks": [
"calendar_external_invitations_warning"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -159,7 +165,9 @@
{
"Id": "3.1.1.2.1",
"Description": "Ensure external sharing options for secondary calendars are configured",
"Checks": [],
"Checks": [
"calendar_external_sharing_secondary_calendar"
],
"Attributes": [
{
"Section": "3 Apps",
@@ -1310,7 +1310,9 @@
{
"Id": "GWS.CALENDAR.1.1",
"Description": "External Sharing Options for Primary Calendars SHALL be configured to Only free/busy information (hide event details)",
"Checks": [],
"Checks": [
"calendar_external_sharing_primary_calendar"
],
"Attributes": [
{
"Section": "Calendar",
@@ -1323,7 +1325,9 @@
{
"Id": "GWS.CALENDAR.1.2",
"Description": "External sharing options for secondary calendars SHALL be configured to Only free/busy information (hide event details)",
"Checks": [],
"Checks": [
"calendar_external_sharing_secondary_calendar"
],
"Attributes": [
{
"Section": "Calendar",
@@ -1336,7 +1340,9 @@
{
"Id": "GWS.CALENDAR.2.1",
"Description": "External invitations warnings SHALL be enabled to prompt users before sending invitations",
"Checks": [],
"Checks": [
"calendar_external_invitations_warning"
],
"Attributes": [
{
"Section": "Calendar",
+6 -3
View File
@@ -12,6 +12,7 @@ from prowler.config.config import (
default_output_directory,
)
from prowler.lib.check.models import Severity
from prowler.lib.cli.redact import warn_sensitive_argument_values
from prowler.lib.outputs.common import Status
from prowler.providers.common.arguments import (
init_providers_parser,
@@ -19,8 +20,6 @@ from prowler.providers.common.arguments import (
validate_provider_arguments,
)
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
class ProwlerArgumentParser:
# Set the default parser
@@ -126,6 +125,10 @@ Detailed documentation at https://docs.prowler.com
elif sys.argv[1] == "oci":
sys.argv[1] = "oraclecloud"
# Warn about sensitive flags passed with explicit values
# Snapshot argv before parse_args() which may exit on errors
warn_sensitive_argument_values(list(sys.argv[1:]))
# Parse arguments
args = self.parser.parse_args()
@@ -434,7 +437,7 @@ Detailed documentation at https://docs.prowler.com
nargs="?",
default=None,
metavar="SHODAN_API_KEY",
help="Check if any public IPs in your Cloud environments are exposed in Shodan.",
help="Check if any public IPs in your Cloud environments are exposed in Shodan. We recommend to use the SHODAN_API_KEY environment variable to provide the API key.",
)
third_party_subparser.add_argument(
"--slack",
+50 -5
View File
@@ -1,6 +1,9 @@
from functools import lru_cache
from importlib import import_module
from colorama import Fore, Style
from prowler.lib.cli.sensitive import SENSITIVE_ARGUMENTS as COMMON_SENSITIVE_ARGUMENTS
from prowler.lib.logger import logger
from prowler.providers.common.provider import Provider, providers_path
@@ -13,11 +16,7 @@ def get_sensitive_arguments() -> frozenset:
sensitive: set[str] = set()
# Common parser sensitive arguments (e.g., --shodan)
try:
parser_module = import_module("prowler.lib.cli.parser")
sensitive.update(getattr(parser_module, "SENSITIVE_ARGUMENTS", frozenset()))
except Exception as error:
logger.debug(f"Could not load SENSITIVE_ARGUMENTS from parser: {error}")
sensitive.update(COMMON_SENSITIVE_ARGUMENTS)
# Provider-specific sensitive arguments
for provider in Provider.get_available_providers():
@@ -66,3 +65,49 @@ def redact_argv(argv: list[str]) -> str:
result.append(arg)
return " ".join(result)
def warn_sensitive_argument_values(argv: list[str]) -> None:
"""Log a warning for each sensitive CLI flag that was passed with an explicit value.
Scans the raw argv list (not parsed args) to detect when users pass
secret values directly on the command line instead of using environment
variables. Handles both ``--flag value`` and ``--flag=value`` syntax.
Args:
argv: The argument list to check (typically ``sys.argv[1:]``).
"""
sensitive = get_sensitive_arguments()
if not sensitive:
return
use_color = "--no-color" not in argv
flags_with_values: list[str] = []
for i, arg in enumerate(argv):
# --flag=value syntax
if "=" in arg:
flag = arg.split("=", 1)[0]
if flag in sensitive:
flags_with_values.append(flag)
continue
# --flag value syntax
if arg in sensitive:
if i + 1 < len(argv) and not argv[i + 1].startswith("-"):
flags_with_values.append(arg)
for flag in flags_with_values:
if use_color:
logger.warning(
f"{Fore.YELLOW}{Style.BRIGHT}WARNING:{Style.RESET_ALL}{Fore.YELLOW} "
f"Passing a value directly to {flag} is not recommended. "
f"Use the corresponding environment variable instead to avoid "
f"exposing secrets in process listings and shell history.{Style.RESET_ALL}"
)
else:
logger.warning(
f"Passing a value directly to {flag} is not recommended. "
f"Use the corresponding environment variable instead to avoid "
f"exposing secrets in process listings and shell history."
)
+8
View File
@@ -0,0 +1,8 @@
"""Common parser sensitive arguments.
This module is kept dependency-free (no prowler-internal imports) so that
``prowler.lib.cli.redact`` and any provider argument module can import it
without circular-import risk.
"""
SENSITIVE_ARGUMENTS = frozenset({"--shodan"})
+6 -6
View File
@@ -96,7 +96,7 @@ class AwsProvider(Provider):
_audit_resources: list = []
_audit_config: dict
_scan_unused_services: bool = False
_enabled_regions: set = set()
_enabled_regions: set | None = None
_mutelist: AWSMutelist
# TODO: this is not optional, enforce for all providers
audit_metadata: Audit_Metadata
@@ -747,7 +747,7 @@ class AwsProvider(Provider):
)
# Get the regions enabled for the account and get the intersection with the service available regions
if self._enabled_regions:
if self._enabled_regions is not None:
enabled_regions = service_regions.intersection(self._enabled_regions)
else:
enabled_regions = service_regions
@@ -1104,14 +1104,14 @@ class AwsProvider(Provider):
file=pathlib.Path(__file__).name,
)
def get_aws_enabled_regions(self, current_session: Session) -> set:
"""get_aws_enabled_regions returns a set of enabled AWS regions
def get_aws_enabled_regions(self, current_session: Session) -> set | None:
"""get_aws_enabled_regions returns a set of enabled AWS regions, or None on failure.
Args:
- current_session: The AWS session object
Returns:
- set: set of strings representing the enabled AWS regions
- set | None: set of enabled AWS region strings, or None if regions could not be determined
"""
try:
# EC2 Client to check enabled regions
@@ -1131,7 +1131,7 @@ class AwsProvider(Provider):
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return set()
return None
# TODO: review this function
# Maybe this should be done within the AwsProvider and not in __main__.py
@@ -96,6 +96,7 @@ class CodeArtifact(AWSService):
namespace=package_namespace,
package=package_name,
sortBy="PUBLISHED_TIME",
maxResults=1,
)
)
else:
@@ -111,6 +112,7 @@ class CodeArtifact(AWSService):
format=package_format,
package=package_name,
sortBy="PUBLISHED_TIME",
maxResults=1,
)
)
latest_version = ""
@@ -617,6 +617,11 @@ def is_condition_block_restrictive(
"aws:sourceorgpaths",
"aws:userid",
"aws:username",
"aws:calledvia",
"aws:calledviafirst",
"aws:calledvialast",
"kms:calleraccount",
"kms:viaservice",
"s3:resourceaccount",
"lambda:eventsourcetoken", # For Alexa Home functions, a token that the invoker must supply.
],
@@ -635,6 +640,11 @@ def is_condition_block_restrictive(
"aws:sourceorgpaths",
"aws:userid",
"aws:username",
"aws:calledvia",
"aws:calledviafirst",
"aws:calledvialast",
"kms:calleraccount",
"kms:viaservice",
"s3:resourceaccount",
"lambda:eventsourcetoken",
],
@@ -95,8 +95,10 @@ class Route53(AWSService):
region, so we need to query all enabled regions to avoid false positives.
"""
logger.info("Route53 - Gathering Elastic IPs from all regions...")
all_regions = self.provider._enabled_regions or set(
self.provider._identity.audited_regions
all_regions = (
self.provider._enabled_regions
if self.provider._enabled_regions is not None
else set(self.provider._identity.audited_regions)
)
for region in all_regions:
@@ -59,11 +59,14 @@ class GoogleworkspaceProvider(Provider):
_mutelist: GoogleWorkspaceMutelist
audit_metadata: Audit_Metadata
# Google Workspace Admin SDK OAuth2 scopes
DIRECTORY_SCOPES = [
# Google Workspace OAuth2 scopes
SCOPES = [
"https://www.googleapis.com/auth/admin.directory.user.readonly",
"https://www.googleapis.com/auth/admin.directory.domain.readonly",
"https://www.googleapis.com/auth/admin.directory.customer.readonly",
# Cloud Identity Policy API (calendar and other app policies)
"https://www.googleapis.com/auth/cloud-identity.policies.readonly",
"https://www.googleapis.com/auth/admin.directory.rolemanagement.readonly",
]
def __init__(
@@ -214,7 +217,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_file(
credentials_file,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except FileNotFoundError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -241,7 +244,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_info(
credentials_data,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except ValueError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -264,7 +267,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_file(
env_file,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except FileNotFoundError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -293,7 +296,7 @@ class GoogleworkspaceProvider(Provider):
try:
credentials = service_account.Credentials.from_service_account_info(
credentials_data,
scopes=GoogleworkspaceProvider.DIRECTORY_SCOPES,
scopes=GoogleworkspaceProvider.SCOPES,
)
except ValueError as error:
raise GoogleWorkspaceInvalidCredentialsError(
@@ -414,7 +417,7 @@ class GoogleworkspaceProvider(Provider):
)
# Fetch all domains (primary + aliases) to support domain aliases
# The scope admin.directory.domain.readonly is already in DIRECTORY_SCOPES
# The scope admin.directory.domain.readonly is already in SCOPES above
try:
domains_response = service.domains().list(customer="my_customer").execute()
valid_domains = [
@@ -0,0 +1,6 @@
from prowler.providers.common.provider import Provider
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar_client = Calendar(Provider.get_global_provider())
@@ -0,0 +1,41 @@
{
"Provider": "googleworkspace",
"CheckID": "calendar_external_invitations_warning",
"CheckTitle": "External invitation warnings are enabled for Google Calendar",
"CheckType": [],
"ServiceName": "calendar",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "collaboration",
"Description": "Google Calendar **warns users** when they invite guests from outside the organization to an event. This prompt gives users a chance to reconsider before sharing meeting details with external parties, reducing the likelihood of **accidental information disclosure** through calendar invitations.",
"Risk": "Without external invitation warnings, users may unintentionally include **external guests** in internal meetings, exposing **confidential meeting details**, agendas, and internal attendee lists to unauthorized parties. This is a common vector for inadvertent data leakage through everyday calendar actions.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.google.com/a/answer/6329284",
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External invitations**, check **Warn users when inviting guests outside of the domain**\n5. Click **Save**",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable external invitation warnings so users are notified whenever a meeting invitation includes guests outside the organization. This simple prompt helps prevent accidental disclosure of meeting details to unintended recipients.",
"Url": "https://hub.prowler.com/check/calendar_external_invitations_warning"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"calendar_external_sharing_primary_calendar",
"calendar_external_sharing_secondary_calendar"
],
"Notes": ""
}
@@ -0,0 +1,56 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
calendar_client,
)
class calendar_external_invitations_warning(Check):
"""Check that external invitation warnings are enabled for Google Calendar
This check verifies that the domain-level policy warns users when they
invite guests from outside the organization, reducing the risk of accidental
information disclosure through calendar events.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if calendar_client.policies_fetched:
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=calendar_client.provider.identity,
resource_name=calendar_client.provider.identity.domain,
resource_id=calendar_client.provider.identity.customer_id,
customer_id=calendar_client.provider.identity.customer_id,
location="global",
)
warning_enabled = calendar_client.policies.external_invitations_warning
if warning_enabled is True:
report.status = "PASS"
report.status_extended = (
f"External invitation warnings for Google Calendar are enabled "
f"in domain {calendar_client.provider.identity.domain}."
)
else:
report.status = "FAIL"
if warning_enabled is None:
report.status_extended = (
f"External invitation warnings for Google Calendar are not "
f"explicitly configured in domain "
f"{calendar_client.provider.identity.domain}. "
f"Users should be warned when inviting guests outside the organization."
)
else:
report.status_extended = (
f"External invitation warnings for Google Calendar are disabled "
f"in domain {calendar_client.provider.identity.domain}. "
f"Users should be warned when inviting guests outside the organization."
)
findings.append(report)
return findings
@@ -0,0 +1,41 @@
{
"Provider": "googleworkspace",
"CheckID": "calendar_external_sharing_primary_calendar",
"CheckTitle": "External sharing for primary calendars is restricted to free/busy only",
"CheckType": [],
"ServiceName": "calendar",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "collaboration",
"Description": "Primary calendars in the Google Workspace domain share **only free/busy information** with external users. When external sharing is set to share full event details, sensitive information such as meeting titles, attendees, locations, and descriptions is exposed to users outside the organization.",
"Risk": "Overly permissive external sharing of primary calendars exposes **sensitive meeting metadata** — titles, attendees, locations, and descriptions — to users outside the organization. This increases the risk of **information disclosure**, **social engineering**, and **targeted phishing** based on insights into organizational activities.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.google.com/a/answer/60765",
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External sharing options for primary calendars**, select **Only free/busy information (hide event details)**\n5. Click **Save**",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict external sharing of primary calendars to free/busy information only. This preserves scheduling functionality with external users while preventing exposure of sensitive meeting details.",
"Url": "https://hub.prowler.com/check/calendar_external_sharing_primary_calendar"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"calendar_external_sharing_secondary_calendar",
"calendar_external_invitations_warning"
],
"Notes": ""
}
@@ -0,0 +1,56 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
calendar_client,
)
class calendar_external_sharing_primary_calendar(Check):
"""Check that external sharing for primary calendars is restricted to free/busy only
This check verifies that the domain-level policy for primary calendar external
sharing is set to share only free/busy information, preventing exposure of
event details to external users.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if calendar_client.policies_fetched:
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=calendar_client.provider.identity,
resource_name=calendar_client.provider.identity.domain,
resource_id=calendar_client.provider.identity.customer_id,
customer_id=calendar_client.provider.identity.customer_id,
location="global",
)
sharing = calendar_client.policies.primary_calendar_external_sharing
if sharing == "EXTERNAL_FREE_BUSY_ONLY":
report.status = "PASS"
report.status_extended = (
f"Primary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is restricted to "
f"free/busy information only."
)
else:
report.status = "FAIL"
if sharing is None:
report.status_extended = (
f"Primary calendar external sharing is not explicitly configured "
f"in domain {calendar_client.provider.identity.domain}. "
f"External sharing should be restricted to free/busy information only."
)
else:
report.status_extended = (
f"Primary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is set to {sharing}. "
f"External sharing should be restricted to free/busy information only."
)
findings.append(report)
return findings
@@ -0,0 +1,41 @@
{
"Provider": "googleworkspace",
"CheckID": "calendar_external_sharing_secondary_calendar",
"CheckTitle": "External sharing for secondary calendars is restricted to free/busy only",
"CheckType": [],
"ServiceName": "calendar",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "collaboration",
"Description": "Secondary calendars in the Google Workspace domain share **only free/busy information** with external users. Secondary calendars are additional calendars users create beyond their primary calendar (e.g., for projects, teams, or personal events), and are commonly used to organize sensitive or focused activities that should not be visible to external parties.",
"Risk": "Overly permissive external sharing of secondary calendars exposes **project-specific or team-specific event details** to users outside the organization. Because secondary calendars often hold more targeted activities (e.g., product launches, internal reviews), unrestricted external sharing increases the risk of **information disclosure** and **competitive intelligence leakage**.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.google.com/a/answer/60765",
"https://knowledge.workspace.google.com/admin/calendar/set-google-calendar-sharing-options",
"https://cloud.google.com/identity/docs/concepts/supported-policy-api-settings"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Apps** > **Google Workspace** > **Calendar**\n3. Click **Sharing settings**\n4. Under **External sharing options for secondary calendars**, select **Only free/busy information (hide event details)**\n5. Click **Save**",
"Terraform": ""
},
"Recommendation": {
"Text": "Restrict external sharing of secondary calendars to free/busy information only. This preserves scheduling interoperability with external collaborators while preventing exposure of sensitive event details in user-created calendars.",
"Url": "https://hub.prowler.com/check/calendar_external_sharing_secondary_calendar"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [
"calendar_external_sharing_primary_calendar",
"calendar_external_invitations_warning"
],
"Notes": ""
}
@@ -0,0 +1,56 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.calendar.calendar_client import (
calendar_client,
)
class calendar_external_sharing_secondary_calendar(Check):
"""Check that external sharing for secondary calendars is restricted to free/busy only
This check verifies that the domain-level policy for secondary calendar external
sharing is set to share only free/busy information, preventing exposure of
event details in user-created calendars to external users.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if calendar_client.policies_fetched:
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=calendar_client.provider.identity,
resource_name=calendar_client.provider.identity.domain,
resource_id=calendar_client.provider.identity.customer_id,
customer_id=calendar_client.provider.identity.customer_id,
location="global",
)
sharing = calendar_client.policies.secondary_calendar_external_sharing
if sharing == "EXTERNAL_FREE_BUSY_ONLY":
report.status = "PASS"
report.status_extended = (
f"Secondary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is restricted to "
f"free/busy information only."
)
else:
report.status = "FAIL"
if sharing is None:
report.status_extended = (
f"Secondary calendar external sharing is not explicitly configured "
f"in domain {calendar_client.provider.identity.domain}. "
f"External sharing should be restricted to free/busy information only."
)
else:
report.status_extended = (
f"Secondary calendar external sharing in domain "
f"{calendar_client.provider.identity.domain} is set to {sharing}. "
f"External sharing should be restricted to free/busy information only."
)
findings.append(report)
return findings
@@ -0,0 +1,112 @@
from typing import Optional
from pydantic import BaseModel
from prowler.lib.logger import logger
from prowler.providers.googleworkspace.lib.service.service import GoogleWorkspaceService
class Calendar(GoogleWorkspaceService):
"""Google Workspace Calendar service for auditing domain-level calendar policies.
Uses the Cloud Identity Policy API v1 to read calendar sharing
and invitation settings configured in the Admin Console.
"""
def __init__(self, provider):
super().__init__(provider)
self.policies = CalendarPolicies()
self.policies_fetched = False
self._fetch_calendar_policies()
def _fetch_calendar_policies(self):
"""Fetch calendar policies from the Cloud Identity Policy API v1."""
logger.info("Calendar - Fetching calendar policies...")
try:
service = self._build_service("cloudidentity", "v1")
if not service:
logger.error("Failed to build Cloud Identity service")
return
request = service.policies().list(pageSize=100)
fetch_succeeded = True
while request is not None:
try:
response = request.execute()
for policy in response.get("policies", []):
setting = policy.get("setting", {})
setting_type = setting.get("type", "").removeprefix("settings/")
value = setting.get("value", {})
if (
setting_type
== "calendar.primary_calendar_max_allowed_external_sharing"
):
self.policies.primary_calendar_external_sharing = value.get(
"maxAllowedExternalSharing"
)
logger.debug(
"Primary calendar external sharing: "
f"{self.policies.primary_calendar_external_sharing}"
)
elif (
setting_type
== "calendar.secondary_calendar_max_allowed_external_sharing"
):
self.policies.secondary_calendar_external_sharing = (
value.get("maxAllowedExternalSharing")
)
logger.debug(
"Secondary calendar external sharing: "
f"{self.policies.secondary_calendar_external_sharing}"
)
elif setting_type == "calendar.external_invitations":
self.policies.external_invitations_warning = value.get(
"warnOnInvite"
)
logger.debug(
"External invitations warning: "
f"{self.policies.external_invitations_warning}"
)
request = service.policies().list_next(request, response)
except Exception as error:
self._handle_api_error(
error,
"fetching calendar policies",
self.provider.identity.customer_id,
)
fetch_succeeded = False
break
self.policies_fetched = fetch_succeeded
logger.info(
f"Calendar policies fetched - "
f"Primary sharing: {self.policies.primary_calendar_external_sharing}, "
f"Secondary sharing: {self.policies.secondary_calendar_external_sharing}, "
f"Invitation warnings: {self.policies.external_invitations_warning}"
)
except Exception as error:
self._handle_api_error(
error,
"fetching calendar policies",
self.provider.identity.customer_id,
)
self.policies_fetched = False
class CalendarPolicies(BaseModel):
"""Model for domain-level Calendar policy settings."""
primary_calendar_external_sharing: Optional[str] = None
secondary_calendar_external_sharing: Optional[str] = None
external_invitations_warning: Optional[bool] = None
@@ -8,23 +8,21 @@ class Directory(GoogleWorkspaceService):
def __init__(self, provider):
super().__init__(provider)
self._service = self._build_service("admin", "directory_v1")
self.users = self._list_users()
self._roles = self._list_roles()
self._populate_role_assignments()
def _list_users(self):
logger.info("Directory - Listing Users...")
users = {}
try:
# Build the Admin SDK Directory service
service = self._build_service("admin", "directory_v1")
if not service:
if not self._service:
logger.error("Failed to build Directory service")
return users
# Fetch users using the Directory API
# Reference: https://developers.google.com/admin-sdk/directory/reference/rest/v1/users/list
request = service.users().list(
request = self._service.users().list(
customer=self.provider.identity.customer_id,
maxResults=500, # Max allowed by API
orderBy="email",
@@ -38,14 +36,11 @@ class Directory(GoogleWorkspaceService):
user = User(
id=user_data.get("id"),
email=user_data.get("primaryEmail"),
is_admin=user_data.get("isAdmin", False),
)
users[user.id] = user
logger.debug(
f"Processed user: {user.email} (Admin: {user.is_admin})"
)
logger.debug(f"Processed user: {user.email}")
request = service.users().list_next(request, response)
request = self._service.users().list_next(request, response)
except Exception as error:
self._handle_api_error(
@@ -62,9 +57,108 @@ class Directory(GoogleWorkspaceService):
return users
def _list_roles(self):
logger.info("Directory - Listing Roles...")
roles = {}
try:
if not self._service:
return roles
request = self._service.roles().list(
customer=self.provider.identity.customer_id,
)
while request is not None:
try:
response = request.execute()
for role_data in response.get("items", []):
role_id = str(role_data.get("roleId", ""))
role_name = role_data.get("roleName", "")
if role_id and role_name:
roles[role_id] = Role(
id=role_id,
name=role_name,
description=role_data.get("roleDescription", ""),
is_super_admin_role=role_data.get(
"isSuperAdminRole", False
),
)
request = self._service.roles().list_next(request, response)
except Exception as error:
self._handle_api_error(
error,
"listing roles",
self.provider.identity.customer_id,
)
break
logger.info(f"Found {len(roles)} roles in the domain")
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return roles
def _populate_role_assignments(self):
logger.info("Directory - Fetching Role Assignments...")
if not self._service:
return
try:
request = self._service.roleAssignments().list(
customer=self.provider.identity.customer_id,
)
while request is not None:
try:
response = request.execute()
for assignment in response.get("items", []):
user_id = str(assignment.get("assignedTo", ""))
role_id = str(assignment.get("roleId", ""))
user = self.users.get(user_id)
role = self._roles.get(role_id)
if user and role:
user.role_assignments.append(role)
if role.is_super_admin_role:
user.is_admin = True
request = self._service.roleAssignments().list_next(
request, response
)
except Exception as error:
self._handle_api_error(
error,
"listing role assignments",
self.provider.identity.customer_id,
)
break
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
class Role(BaseModel):
id: str
name: str
description: str = ""
is_super_admin_role: bool = False
class User(BaseModel):
id: str
email: str
is_admin: bool = False
role_assignments: list[Role] = []
@@ -0,0 +1,39 @@
{
"Provider": "googleworkspace",
"CheckID": "directory_super_admin_only_admin_roles",
"CheckTitle": "All super admin accounts are used only for super admin activities",
"CheckType": [],
"ServiceName": "directory",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "IAM",
"Description": "Super admin accounts do not also hold **additional admin roles** such as Groups Admin, User Management Admin, etc. Each super administrator has a separate, non-admin account for daily activities, following the **principle of least privilege**.",
"Risk": "A super admin account that also holds additional admin roles increases the **attack surface** for phishing and credential theft. Compromising a single dual-role account grants full administrative access, bypassing **separation of duties** and enabling unauthorized changes to users, billing, and security settings.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://knowledge.workspace.google.com/admin/users/prebuilt-administrator-roles",
"https://support.google.com/a/answer/9011373"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the Google **Admin console** at https://admin.google.com\n2. Navigate to **Directory** > **Users**\n3. Click on the super admin user who also has additional admin roles\n4. Click **Admin roles and privileges**\n5. Remove the additional admin roles from the super admin account\n6. Create a separate account for daily admin tasks",
"Terraform": ""
},
"Recommendation": {
"Text": "Apply the principle of separation of duties by maintaining dedicated super admin accounts exclusively for privileged tasks. Daily administrative activities should be performed from separate accounts with only the delegated roles required.",
"Url": "https://hub.prowler.com/check/directory_super_admin_only_admin_roles"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [
"directory_super_admin_count"
],
"Notes": ""
}
@@ -0,0 +1,60 @@
from typing import List
from prowler.lib.check.models import Check, CheckReportGoogleWorkspace
from prowler.providers.googleworkspace.services.directory.directory_client import (
directory_client,
)
class directory_super_admin_only_admin_roles(Check):
"""Check that super admin accounts are used only for super admin activities
This check verifies that no super admin user has additional admin roles assigned
beyond the Super Admin role. Super admins should have separate accounts for daily
activities to follow least privilege.
"""
def execute(self) -> List[CheckReportGoogleWorkspace]:
findings = []
if directory_client.users:
dual_role_admins = {}
for user in directory_client.users.values():
if user.is_admin:
extra_roles = [
r.description or r.name
for r in user.role_assignments
if not r.is_super_admin_role
]
if extra_roles:
dual_role_admins[user.email] = extra_roles
report = CheckReportGoogleWorkspace(
metadata=self.metadata(),
resource=directory_client.provider.identity,
resource_name=directory_client.provider.identity.domain,
resource_id=directory_client.provider.identity.customer_id,
customer_id=directory_client.provider.identity.customer_id,
location="global",
)
if dual_role_admins:
details = ", ".join(
f"{email} ({', '.join(roles)})"
for email, roles in dual_role_admins.items()
)
report.status = "FAIL"
report.status_extended = (
f"Super admin accounts also holding additional admin roles: {details}. "
f"Super admin accounts should be used only for super admin activities."
)
else:
report.status = "PASS"
report.status_extended = (
f"All super admin accounts in domain {directory_client.provider.identity.domain} "
f"are used only for super admin activities."
)
findings.append(report)
return findings
@@ -13,7 +13,11 @@ def init_parser(self):
"--nhn-username", nargs="?", default=None, help="NHN API Username"
)
nhn_auth_subparser.add_argument(
"--nhn-password", nargs="?", default=None, help="NHN API Password"
"--nhn-password",
nargs="?",
default=None,
metavar="NHN_PASSWORD",
help="NHN API Password",
)
nhn_auth_subparser.add_argument(
"--nhn-tenant-id", nargs="?", default=None, help="NHN Tenant ID"
@@ -46,6 +46,7 @@ def init_parser(self):
"--os-password",
nargs="?",
default=None,
metavar="OS_PASSWORD",
help="OpenStack password for authentication. Can also be set via OS_PASSWORD environment variable",
)
openstack_explicit_subparser.add_argument(
+28
View File
@@ -45,6 +45,34 @@ prowler/providers/{provider}/
└── {check_name}.metadata.json
```
## Sensitive CLI Arguments
Flags that accept secrets (tokens, passwords, API keys) MUST follow these rules:
1. **Use `nargs="?"` with `default=None`** — the flag accepts an optional value for backward compatibility; the recommended path is environment variables.
2. **Set `metavar` to the environment variable name** users should use (e.g., `metavar="GITHUB_PERSONAL_ACCESS_TOKEN"`).
3. **Add the flag to the `SENSITIVE_ARGUMENTS` frozenset** at the top of the provider's `arguments.py`. This set is used to redact values in HTML output and warn users who pass secrets directly.
4. **Do not add new arguments that require passing secrets as CLI values** — secrets should come from environment variables. The flag accepts a value for backward compatibility, but CLI warns users to prefer env vars.
### Pattern
```python
# prowler/providers/{provider}/lib/arguments/arguments.py
SENSITIVE_ARGUMENTS = frozenset({"--my-api-key", "--my-password"})
def init_parser(self):
auth_subparser = parser.add_argument_group("Authentication Modes")
auth_subparser.add_argument(
"--my-api-key",
nargs="?",
default=None,
metavar="MY_API_KEY",
help="API key for authentication. Use MY_API_KEY env var instead of passing directly.",
)
```
## Provider Class Template
```python
+63 -1
View File
@@ -1,8 +1,14 @@
import logging
from unittest.mock import patch
import pytest
from prowler.lib.cli.redact import REDACTED_VALUE, get_sensitive_arguments, redact_argv
from prowler.lib.cli.redact import (
REDACTED_VALUE,
get_sensitive_arguments,
redact_argv,
warn_sensitive_argument_values,
)
@pytest.fixture
@@ -87,6 +93,62 @@ class TestRedactArgv:
assert redact_argv(argv) == "aws --region=us-east-1"
class TestWarnSensitiveArgumentValues:
def test_no_warning_without_sensitive_flags(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["aws", "--region", "eu-west-1"])
assert caplog.text == ""
def test_no_warning_flag_without_value(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["github", "--personal-access-token"])
assert caplog.text == ""
def test_no_warning_flag_followed_by_another_flag(
self, caplog, mock_sensitive_args
):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(
["github", "--personal-access-token", "--region", "eu-west-1"]
)
assert caplog.text == ""
def test_warning_flag_with_value(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(
["github", "--personal-access-token", "ghp_secret"]
)
assert "--personal-access-token" in caplog.text
assert "not recommended" in caplog.text
def test_warning_flag_with_equals_syntax(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["aws", "--shodan=key123"])
assert "--shodan" in caplog.text
assert "not recommended" in caplog.text
def test_warning_multiple_flags(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(
[
"github",
"--personal-access-token",
"ghp_secret",
"--shodan",
"key",
]
)
assert "--personal-access-token" in caplog.text
assert "--shodan" in caplog.text
def test_no_color_output(self, caplog, mock_sensitive_args):
with caplog.at_level(logging.WARNING):
warn_sensitive_argument_values(["--no-color", "aws", "--shodan", "key123"])
assert "not recommended" in caplog.text
# Should not contain ANSI escape codes
assert "\033[" not in caplog.text
class TestGetSensitiveArguments:
def test_discovers_known_sensitive_arguments(self):
"""Integration test: verify the discovery mechanism finds flags from provider modules."""
@@ -78,7 +78,9 @@ class TestAWSService:
def test_AWSService_non_global_service_uses_profile_region(self):
"""Non-global services should use the profile region when available."""
service_name = "s3"
provider = set_mocked_aws_provider(profile_region=AWS_REGION_EU_WEST_1)
provider = set_mocked_aws_provider(
audited_regions=[], profile_region=AWS_REGION_EU_WEST_1
)
service = AWSService(service_name, provider)
assert service.region == AWS_REGION_EU_WEST_1
@@ -312,7 +312,9 @@ class Test_awslambda_function_not_publicly_accessible:
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1]
),
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
@@ -552,7 +554,9 @@ class Test_awslambda_function_not_publicly_accessible:
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1]
),
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
@@ -615,7 +619,9 @@ class Test_awslambda_function_not_publicly_accessible:
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1]
),
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
@@ -690,7 +696,9 @@ class Test_awslambda_function_not_publicly_accessible:
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
return_value=set_mocked_aws_provider(
audited_regions=[AWS_REGION_EU_WEST_1]
),
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client",
@@ -54,6 +54,9 @@ def mock_make_api_call(self, operation_name, kwarg):
}
if operation_name == "ListPackageVersions":
assert (
kwarg.get("maxResults") == 1
), "list_package_versions must pass maxResults=1 to avoid fetching all versions"
return {
"defaultDisplayVersion": "latest",
"format": "pypi",
@@ -204,3 +207,102 @@ class Test_CodeArtifact_Service:
.latest_version.origin.origin_type
== OriginInformationValues.INTERNAL
)
def mock_make_api_call_no_namespace(self, operation_name, kwarg):
"""Mock for packages without a namespace to exercise the else branch"""
if operation_name == "ListRepositories":
return {
"repositories": [
{
"name": "test-repository",
"administratorAccount": AWS_ACCOUNT_NUMBER,
"domainName": "test-domain",
"domainOwner": AWS_ACCOUNT_NUMBER,
"arn": TEST_REPOSITORY_ARN,
"description": "test description",
},
]
}
if operation_name == "ListPackages":
return {
"packages": [
{
"format": "pypi",
"package": "test-package-no-ns",
"originConfiguration": {
"restrictions": {
"publish": "ALLOW",
"upstream": "BLOCK",
}
},
},
],
}
if operation_name == "ListPackageVersions":
assert (
kwarg.get("maxResults") == 1
), "list_package_versions must pass maxResults=1 to avoid fetching all versions"
assert (
"namespace" not in kwarg
), "namespace should not be passed when package has no namespace"
return {
"defaultDisplayVersion": "1.0.0",
"format": "pypi",
"package": "test-package-no-ns",
"versions": [
{
"version": "1.0.0",
"revision": "abc123",
"status": "Published",
"origin": {
"domainEntryPoint": {
"repositoryName": "test-repository",
"externalConnectionName": "",
},
"originType": "EXTERNAL",
},
},
],
}
if operation_name == "ListTagsForResource":
return {"tags": []}
return make_api_call(self, operation_name, kwarg)
@patch(
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_no_namespace,
)
@patch(
"prowler.providers.aws.aws_provider.AwsProvider.generate_regional_clients",
new=mock_generate_regional_clients,
)
class Test_CodeArtifact_Service_No_Namespace:
def test_list_packages_no_namespace(self):
codeartifact = CodeArtifact(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1])
)
assert len(codeartifact.repositories[TEST_REPOSITORY_ARN].packages) == 1
package = codeartifact.repositories[TEST_REPOSITORY_ARN].packages[0]
assert package.name == "test-package-no-ns"
assert package.namespace is None
assert package.format == "pypi"
assert (
package.origin_configuration.restrictions.publish == RestrictionValues.ALLOW
)
assert (
package.origin_configuration.restrictions.upstream
== RestrictionValues.BLOCK
)
assert package.latest_version.version == "1.0.0"
assert package.latest_version.status == LatestPackageVersionStatus.Published
assert (
package.latest_version.origin.origin_type
== OriginInformationValues.EXTERNAL
)
@@ -139,7 +139,7 @@ class Test_Codebuild_Service:
)
@mock_aws
def test_codebuild_service(self):
codebuild = Codebuild(set_mocked_aws_provider())
codebuild = Codebuild(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert codebuild.session.__class__.__name__ == "Session"
assert codebuild.service == "codebuild"
@@ -76,7 +76,7 @@ class Test_CodePipeline_Service:
)
@mock_aws
def test_codepipeline_service(self):
codepipeline = CodePipeline(set_mocked_aws_provider())
codepipeline = CodePipeline(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert codepipeline.session.__class__.__name__ == "Session"
assert codepipeline.service == "codepipeline"
@@ -106,27 +106,27 @@ def mock_generate_regional_clients(provider, service):
class Test_DataSync_Service:
# Test DataSync Service initialization
def test_service(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
datasync = DataSync(aws_provider)
assert datasync.service == "datasync"
# Test DataSync clients creation
def test_client(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
datasync = DataSync(aws_provider)
for reg_client in datasync.regional_clients.values():
assert reg_client.__class__.__name__ == "DataSync"
# Test DataSync session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
datasync = DataSync(aws_provider)
assert datasync.session.__class__.__name__ == "Session"
# Test listing DataSync tasks
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_list_tasks(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
datasync = DataSync(aws_provider)
task_arn = "arn:aws:datasync:eu-west-1:123456789012:task/task-12345678901234567"
@@ -142,7 +142,7 @@ class Test_DataSync_Service:
# Test generic exception in list_tasks
def test_list_tasks_generic_exception(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
# Mock the regional client's list_tasks method specifically
mock_client = MagicMock()
@@ -155,7 +155,7 @@ class Test_DataSync_Service:
# Test describing DataSync tasks with various exceptions
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_describe_tasks_with_exceptions(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
datasync = DataSync(aws_provider)
# Check all tasks were processed despite exceptions
@@ -183,7 +183,7 @@ class Test_DataSync_Service:
# Test listing task tags with various exceptions
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_list_task_tags_with_exceptions(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
datasync = DataSync(aws_provider)
tasks_by_name = {task.name: task for task in datasync.tasks.values()}
@@ -170,20 +170,20 @@ def mock_generate_regional_clients(provider, service):
class Test_ECR_Service:
# Test ECR Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert ecr.service == "ecr"
# Test ECR client
def test_client(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
for regional_client in ecr.regional_clients.values():
assert regional_client.__class__.__name__ == "ECR"
# Test ECR session
def test_get_session(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert ecr.session.__class__.__name__ == "Session"
@@ -198,7 +198,7 @@ class Test_ECR_Service:
{"Key": "test", "Value": "test"},
],
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert len(ecr.registries) == 1
@@ -226,7 +226,7 @@ class Test_ECR_Service:
imageScanningConfiguration={"scanOnPush": True},
imageTagMutability="IMMUTABLE",
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert len(ecr.registries) == 1
assert len(ecr.registries[AWS_REGION_EU_WEST_1].repositories) == 1
@@ -255,7 +255,7 @@ class Test_ECR_Service:
imageScanningConfiguration={"scanOnPush": True},
imageTagMutability="IMMUTABLE",
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert len(ecr.registries) == 1
assert len(ecr.registries[AWS_REGION_EU_WEST_1].repositories) == 1
@@ -273,7 +273,7 @@ class Test_ECR_Service:
repositoryName=repo_name,
imageScanningConfiguration={"scanOnPush": True},
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert len(ecr.registries) == 1
@@ -366,7 +366,7 @@ class Test_ECR_Service:
# Test get ECR Registries Scanning Configuration
@mock_aws
def test_get_registry_scanning_configuration(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecr = ECR(aws_provider)
assert len(ecr.registries) == 1
assert ecr.registries[AWS_REGION_EU_WEST_1].id == AWS_ACCOUNT_NUMBER
@@ -122,27 +122,27 @@ def mock_generate_regional_clients(provider, service):
class Test_ECS_Service:
# Test ECS Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
assert ecs.service == "ecs"
# Test ECS client
def test_client(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
for reg_client in ecs.regional_clients.values():
assert reg_client.__class__.__name__ == "ECS"
# Test ECS session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
assert ecs.session.__class__.__name__ == "Session"
# Test list ECS task definitions
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_list_task_definitions(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
task_arn = "arn:aws:ecs:eu-west-1:123456789012:task-definition/test_cluster_1/test_ecs_task:1"
@@ -156,7 +156,7 @@ class Test_ECS_Service:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test describe ECS task definitions
def test_describe_task_definitions(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
task_arn = "arn:aws:ecs:eu-west-1:123456789012:task-definition/test_cluster_1/test_ecs_task:1"
@@ -204,7 +204,7 @@ class Test_ECS_Service:
# Test list ECS clusters
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_list_clusters(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
cluster_arn1 = "arn:aws:ecs:eu-west-1:123456789012:cluster/test_cluster_1"
@@ -217,7 +217,7 @@ class Test_ECS_Service:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test describe ECS clusters
def test_describe_clusters(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
cluster_arn1 = "arn:aws:ecs:eu-west-1:123456789012:cluster/test_cluster_1"
@@ -237,7 +237,7 @@ class Test_ECS_Service:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test describe ECS services
def test_describe_services(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
service_arn = (
@@ -93,18 +93,18 @@ def mock_generate_regional_clients(provider, service):
class Test_EFS:
# Test EFS Session
def test__get_session__(self):
access_analyzer = EFS(set_mocked_aws_provider())
access_analyzer = EFS(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert access_analyzer.session.__class__.__name__ == "Session"
# Test EFS Service
def test__get_service__(self):
access_analyzer = EFS(set_mocked_aws_provider())
access_analyzer = EFS(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert access_analyzer.service == "efs"
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test EFS describe file systems
def test_describe_file_systems(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
efs = EFS(aws_provider)
efs_arn = f"arn:aws:elasticfilesystem:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:file-system/{FILE_SYSTEM_ID}"
assert len(efs.filesystems) == 1
@@ -119,7 +119,7 @@ class Test_EFS:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test EFS describe file systems policies
def test_describe_file_system_policies(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
efs = EFS(aws_provider)
efs_arn = f"arn:aws:elasticfilesystem:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:file-system/{FILE_SYSTEM_ID}"
assert len(efs.filesystems) == 1
@@ -131,7 +131,7 @@ class Test_EFS:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test EFS describe mount targets
def test_describe_mount_targets(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
efs = EFS(aws_provider)
assert len(efs.filesystems) == 1
efs_arn = f"arn:aws:elasticfilesystem:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:file-system/{FILE_SYSTEM_ID}"
@@ -144,7 +144,7 @@ class Test_EFS:
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
# Test EFS describe access points
def test_describe_access_points(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
efs = EFS(aws_provider)
assert len(efs.filesystems) == 1
efs_arn = f"arn:aws:elasticfilesystem:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:file-system/{FILE_SYSTEM_ID}"
@@ -31,20 +31,20 @@ def mock_generate_regional_clients(provider, service):
class Test_EKS_Service:
# Test EKS Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
eks = EKS(aws_provider)
assert eks.service == "eks"
# Test EKS client
def test_client(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
eks = EKS(aws_provider)
for reg_client in eks.regional_clients.values():
assert reg_client.__class__.__name__ == "EKS"
# Test EKS session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
eks = EKS(aws_provider)
assert eks.session.__class__.__name__ == "Session"
@@ -73,7 +73,7 @@ class Test_EKS_Service:
roleArn=f"arn:aws:iam::{AWS_ACCOUNT_NUMBER}:role/eks-service-role-AWSServiceRoleForAmazonEKS-J7ONKE3BQ4PI",
tags={"test": "test"},
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
eks = EKS(aws_provider)
assert len(eks.clusters) == 1
assert eks.clusters[0].name == cluster_name
@@ -126,7 +126,7 @@ class Test_EKS_Service:
},
],
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
eks = EKS(aws_provider)
assert len(eks.clusters) == 1
assert eks.clusters[0].name == cluster_name
@@ -59,7 +59,9 @@ class Test_ElasticBeanstalk_Service:
# Test ElasticBeanstalk Client
@mock_aws
def test_get_client(self):
elasticbeanstalk = ElasticBeanstalk(set_mocked_aws_provider())
elasticbeanstalk = ElasticBeanstalk(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
)
assert (
elasticbeanstalk.regional_clients[AWS_REGION_EU_WEST_1].__class__.__name__
== "ElasticBeanstalk"
@@ -68,13 +70,17 @@ class Test_ElasticBeanstalk_Service:
# Test ElasticBeanstalk Session
@mock_aws
def test__get_session__(self):
elasticbeanstalk = ElasticBeanstalk(set_mocked_aws_provider())
elasticbeanstalk = ElasticBeanstalk(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
)
assert elasticbeanstalk.session.__class__.__name__ == "Session"
# Test ElasticBeanstalk Service
@mock_aws
def test__get_service__(self):
elasticbeanstalk = ElasticBeanstalk(set_mocked_aws_provider())
elasticbeanstalk = ElasticBeanstalk(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
)
assert elasticbeanstalk.service == "elasticbeanstalk"
# Test _describe_environments
@@ -90,7 +96,9 @@ class Test_ElasticBeanstalk_Service:
EnvironmentName="test-env",
)
# ElasticBeanstalk Class
elasticbeanstalk = ElasticBeanstalk(set_mocked_aws_provider())
elasticbeanstalk = ElasticBeanstalk(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
)
assert len(elasticbeanstalk.environments) == 1
assert (
@@ -125,7 +133,9 @@ class Test_ElasticBeanstalk_Service:
EnvironmentName="test-env",
)
# ElasticBeanstalk Class
elasticbeanstalk = ElasticBeanstalk(set_mocked_aws_provider())
elasticbeanstalk = ElasticBeanstalk(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
)
assert (
elasticbeanstalk.environments[
environment["EnvironmentArn"]
@@ -158,7 +168,9 @@ class Test_ElasticBeanstalk_Service:
Tags=[{"Key": "test-key", "Value": "test-value"}],
)
# ElasticBeanstalk Class
elasticbeanstalk = ElasticBeanstalk(set_mocked_aws_provider())
elasticbeanstalk = ElasticBeanstalk(
set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
)
assert elasticbeanstalk.environments[environment["EnvironmentArn"]].tags == [
{"Key": "test-key", "Value": "test-value"}
]
@@ -91,7 +91,11 @@ class Test_emr_cluster_publicly_accesible:
),
mock.patch(
"prowler.providers.aws.services.emr.emr_cluster_publicly_accesible.emr_cluster_publicly_accesible.ec2_client",
new=EC2(set_mocked_aws_provider(create_default_organization=False)),
new=EC2(
set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1], create_default_organization=False
)
),
),
):
# Test Check
@@ -161,7 +165,11 @@ class Test_emr_cluster_publicly_accesible:
),
mock.patch(
"prowler.providers.aws.services.emr.emr_cluster_publicly_accesible.emr_cluster_publicly_accesible.ec2_client",
new=EC2(set_mocked_aws_provider(create_default_organization=False)),
new=EC2(
set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1], create_default_organization=False
)
),
),
):
# Test Check
@@ -248,7 +256,11 @@ class Test_emr_cluster_publicly_accesible:
),
mock.patch(
"prowler.providers.aws.services.emr.emr_cluster_publicly_accesible.emr_cluster_publicly_accesible.ec2_client",
new=EC2(set_mocked_aws_provider(create_default_organization=False)),
new=EC2(
set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1], create_default_organization=False
)
),
),
):
# Test Check
@@ -338,7 +350,11 @@ class Test_emr_cluster_publicly_accesible:
),
mock.patch(
"prowler.providers.aws.services.emr.emr_cluster_publicly_accesible.emr_cluster_publicly_accesible.ec2_client",
new=EC2(set_mocked_aws_provider(create_default_organization=False)),
new=EC2(
set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1], create_default_organization=False
)
),
),
):
# Test Check
@@ -425,7 +441,11 @@ class Test_emr_cluster_publicly_accesible:
),
mock.patch(
"prowler.providers.aws.services.emr.emr_cluster_publicly_accesible.emr_cluster_publicly_accesible.ec2_client",
new=EC2(set_mocked_aws_provider(create_default_organization=False)),
new=EC2(
set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1], create_default_organization=False
)
),
),
):
# Test Check
@@ -53,19 +53,19 @@ class Test_EMR_Service:
# Test EMR Client
@mock_aws
def test_get_client(self):
emr = EMR(set_mocked_aws_provider())
emr = EMR(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert emr.regional_clients[AWS_REGION_EU_WEST_1].__class__.__name__ == "EMR"
# Test EMR Session
@mock_aws
def test__get_session__(self):
emr = EMR(set_mocked_aws_provider())
emr = EMR(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert emr.session.__class__.__name__ == "Session"
# Test EMR Service
@mock_aws
def test__get_service__(self):
emr = EMR(set_mocked_aws_provider())
emr = EMR(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert emr.service == "emr"
# Test _list_clusters and _describe_cluster
@@ -93,7 +93,7 @@ class Test_EMR_Service:
)
cluster_id = emr_client.run_job_flow(**run_job_flow_args)["JobFlowId"]
# EMR Class
emr = EMR(set_mocked_aws_provider())
emr = EMR(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert len(emr.clusters) == 1
assert emr.clusters[cluster_id].id == cluster_id
@@ -115,7 +115,7 @@ class Test_EMR_Service:
@mock_aws
def test_get_block_public_access_configuration(self):
emr = EMR(set_mocked_aws_provider())
emr = EMR(set_mocked_aws_provider([AWS_REGION_EU_WEST_1]))
assert len(emr.block_public_access_configuration) == 1
assert emr.block_public_access_configuration[
@@ -55,27 +55,27 @@ class Test_GlobalAccelerator_Service:
# Test GlobalAccelerator Service
def test_service(self):
# GlobalAccelerator client for this test class
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
globalaccelerator = GlobalAccelerator(aws_provider)
assert globalaccelerator.service == "globalaccelerator"
# Test GlobalAccelerator Client
def test_client(self):
# GlobalAccelerator client for this test class
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
globalaccelerator = GlobalAccelerator(aws_provider)
assert globalaccelerator.client.__class__.__name__ == "GlobalAccelerator"
# Test GlobalAccelerator Session
def test__get_session__(self):
# GlobalAccelerator client for this test class
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
globalaccelerator = GlobalAccelerator(aws_provider)
assert globalaccelerator.session.__class__.__name__ == "Session"
def test_list_accelerators(self):
# GlobalAccelerator client for this test class
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
globalaccelerator = GlobalAccelerator(aws_provider)
accelerator_name = "TestAccelerator"
@@ -99,7 +99,7 @@ class Test_GlobalAccelerator_Service:
def test_list_tags(self):
# GlobalAccelerator client for this test class
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_US_WEST_2])
globalaccelerator = GlobalAccelerator(aws_provider)
assert len(globalaccelerator.accelerators) == 1
@@ -39,7 +39,7 @@ def mock_make_api_call_members_managers(self, operation_name, api_params):
class Test_guardduty_centrally_managed:
@mock_aws
def test_no_detectors(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -67,7 +67,7 @@ class Test_guardduty_centrally_managed:
detector_id = guardduty_client.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -112,7 +112,7 @@ class Test_guardduty_centrally_managed:
detector_id = guardduty_client.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -156,7 +156,7 @@ class Test_guardduty_centrally_managed:
detector_id = guardduty_client.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -64,7 +64,7 @@ class Test_guardduty_delegated_admin_enabled_all_regions:
@mock_aws
def test_no_detectors(self):
"""Test when no GuardDuty detectors exist."""
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -102,7 +102,7 @@ class Test_guardduty_delegated_admin_enabled_all_regions:
guardduty_client_boto = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
detector_id = guardduty_client_boto.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -148,7 +148,7 @@ class Test_guardduty_delegated_admin_enabled_all_regions:
guardduty_client_boto = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
detector_id = guardduty_client_boto.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -194,7 +194,7 @@ class Test_guardduty_delegated_admin_enabled_all_regions:
guardduty_client_boto = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
detector_id = guardduty_client_boto.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -44,7 +44,7 @@ def mock_make_api_call(self, operation_name, kwarg):
class Test_guardduty_ec2_malware_protection_enabled:
def test_no_detectors(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -74,7 +74,7 @@ class Test_guardduty_ec2_malware_protection_enabled:
guardduty_client.create_detector(Enable=False)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -112,7 +112,7 @@ class Test_guardduty_ec2_malware_protection_enabled:
},
)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -161,7 +161,7 @@ class Test_guardduty_ec2_malware_protection_enabled:
},
)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -12,7 +12,7 @@ from tests.providers.aws.utils import (
class Test_guardduty_eks_audit_log_enabled:
def test_no_detectors(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -42,7 +42,7 @@ class Test_guardduty_eks_audit_log_enabled:
guardduty_client.create_detector(Enable=False)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -74,7 +74,7 @@ class Test_guardduty_eks_audit_log_enabled:
Enable=True, DataSources={"Kubernetes": {"AuditLogs": {"Enable": True}}}
)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -118,7 +118,7 @@ class Test_guardduty_eks_audit_log_enabled:
Enable=True, DataSources={"Kubernetes": {"AuditLogs": {"Enable": False}}}
)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -6,6 +6,7 @@ from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)
@@ -13,7 +14,7 @@ from tests.providers.aws.utils import (
class Test_guardduty_is_enabled:
@mock_aws
def test_no_detectors(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -43,7 +44,7 @@ class Test_guardduty_is_enabled:
detector_id = guardduty_client.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -85,7 +86,7 @@ class Test_guardduty_is_enabled:
detector_id = guardduty_client.create_detector(Enable=False)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -131,7 +132,7 @@ class Test_guardduty_is_enabled:
detector_id = guardduty_client.create_detector(Enable=False)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -177,7 +178,9 @@ class Test_guardduty_is_enabled:
detector_id = guardduty_client.create_detector(Enable=False)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1, AWS_REGION_EU_WEST_1]
)
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -15,7 +15,7 @@ orig = botocore.client.BaseClient._make_api_call
class Test_guardduty_lambda_protection_enabled:
def test_no_detectors(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -45,7 +45,7 @@ class Test_guardduty_lambda_protection_enabled:
guardduty_client.create_detector(Enable=False)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -78,7 +78,7 @@ class Test_guardduty_lambda_protection_enabled:
Features=[{"Name": "LAMBDA_NETWORK_LOGS", "Status": "ENABLED"}],
)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -123,7 +123,7 @@ class Test_guardduty_lambda_protection_enabled:
Features=[{"Name": "LAMBDA_NETWORK_LOGS", "Status": "DISABLED"}],
)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -28,7 +28,7 @@ def mock_make_api_call(self, operation_name, kwarg):
class Test_guardduty_no_high_severity_findings:
@mock_aws
def test_no_detectors(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -56,7 +56,7 @@ class Test_guardduty_no_high_severity_findings:
detector_id = guardduty_client.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -97,7 +97,7 @@ class Test_guardduty_no_high_severity_findings:
detector_id = guardduty_client.create_detector(Enable=True)["DetectorId"]
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
from prowler.providers.aws.services.guardduty.guardduty_service import GuardDuty
@@ -66,20 +66,20 @@ def mock_generate_regional_clients(provider, service):
class Test_GuardDuty_Service:
# Test GuardDuty Service
def test_service(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert guardduty.service == "guardduty"
# Test GuardDuty client
def test_client(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
for reg_client in guardduty.regional_clients.values():
assert reg_client.__class__.__name__ == "GuardDuty"
# Test GuardDuty session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert guardduty.session.__class__.__name__ == "Session"
@@ -89,7 +89,7 @@ class Test_GuardDuty_Service:
guardduty_client = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
response = guardduty_client.create_detector(Enable=True, Tags={"test": "test"})
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert len(guardduty.detectors) == 1
@@ -121,7 +121,7 @@ class Test_GuardDuty_Service:
],
)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert len(guardduty.detectors) == 1
@@ -149,7 +149,7 @@ class Test_GuardDuty_Service:
guardduty_client = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
response = guardduty_client.create_detector(Enable=True)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert len(guardduty.detectors) == 1
@@ -170,7 +170,7 @@ class Test_GuardDuty_Service:
guardduty_client = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
response = guardduty_client.create_detector(Enable=True)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert len(guardduty.detectors) == 1
@@ -192,7 +192,7 @@ class Test_GuardDuty_Service:
guardduty_client = client("guardduty", region_name=AWS_REGION_EU_WEST_1)
response = guardduty_client.create_detector(Enable=True)
aws_provider = set_mocked_aws_provider()
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
guardduty = GuardDuty(aws_provider)
assert len(guardduty.detectors) == 1
@@ -1413,6 +1413,115 @@ class Test_Policy:
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_aws_CalledVia_str(self):
condition_statement = {
"StringEquals": {"aws:CalledVia": "cloudformation.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_equals_aws_CalledViaFirst_str(self):
condition_statement = {
"StringEquals": {"aws:CalledViaFirst": "cloudformation.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_equals_aws_CalledViaLast_str(self):
condition_statement = {
"StringEquals": {"aws:CalledViaLast": "glue.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_like_aws_CalledVia_str(self):
condition_statement = {"StringLike": {"aws:CalledVia": "*.amazonaws.com"}}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_equals_kms_CallerAccount_str(self):
condition_statement = {
"StringEquals": {"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_CallerAccount_str_not_valid(self):
condition_statement = {
"StringEquals": {"kms:CallerAccount": NON_TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert not is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_CallerAccount_list(self):
condition_statement = {
"StringEquals": {"kms:CallerAccount": [TRUSTED_AWS_ACCOUNT_NUMBER]}
}
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_CallerAccount_list_not_valid(self):
condition_statement = {
"StringEquals": {
"kms:CallerAccount": [
TRUSTED_AWS_ACCOUNT_NUMBER,
NON_TRUSTED_AWS_ACCOUNT_NUMBER,
]
}
}
assert not is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_equals_kms_ViaService_str(self):
condition_statement = {
"StringEquals": {"kms:ViaService": "glue.eu-central-1.amazonaws.com"}
}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_string_like_kms_CallerAccount_str(self):
condition_statement = {
"StringLike": {"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_like_kms_CallerAccount_str_not_valid(self):
condition_statement = {
"StringLike": {"kms:CallerAccount": NON_TRUSTED_AWS_ACCOUNT_NUMBER}
}
assert not is_condition_block_restrictive(
condition_statement, TRUSTED_AWS_ACCOUNT_NUMBER
)
def test_condition_parser_string_like_kms_ViaService_str(self):
condition_statement = {"StringLike": {"kms:ViaService": "glue.*.amazonaws.com"}}
assert is_condition_block_restrictive(
condition_statement,
TRUSTED_AWS_ACCOUNT_NUMBER,
is_cross_account_allowed=True,
)
def test_condition_parser_two_lists_unrestrictive(self):
condition_statement = {
"StringLike": {
@@ -2357,6 +2466,71 @@ class Test_Policy:
trusted_ips=["1.2.3.4", "5.6.7.8"],
)
def test_is_policy_public_kms_caller_account_and_via_service(self):
policy = {
"Version": "2008-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": [
"kms:Encrypt",
"kms:Decrypt",
"kms:ReEncrypt*",
"kms:GenerateDataKey*",
"kms:CreateGrant",
"kms:DescribeKey",
],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:ViaService": "glue.eu-central-1.amazonaws.com",
"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER,
}
},
},
],
}
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_is_policy_public_kms_caller_account_only(self):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["kms:Decrypt"],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:CallerAccount": TRUSTED_AWS_ACCOUNT_NUMBER,
}
},
},
],
}
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_is_policy_public_kms_via_service_without_account_restriction(self):
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": ["kms:Decrypt"],
"Resource": "*",
"Condition": {
"StringEquals": {
"kms:ViaService": "glue.eu-central-1.amazonaws.com",
}
},
},
],
}
assert not is_policy_public(policy, TRUSTED_AWS_ACCOUNT_NUMBER)
def test_check_admin_access(self):
policy = {
"Version": "2012-10-17",
@@ -104,26 +104,26 @@ def mock_generate_regional_clients(provider, service):
class TestOpenSearchServiceService:
# Test OpenSearchService Service
def test_service(self):
aws_provider = set_mocked_aws_provider([])
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
opensearch = OpenSearchService(aws_provider)
assert opensearch.service == "opensearch"
# Test OpenSearchService_ client
def test_client(self):
aws_provider = set_mocked_aws_provider([])
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
opensearch = OpenSearchService(aws_provider)
for reg_client in opensearch.regional_clients.values():
assert reg_client.__class__.__name__ == "OpenSearchService"
# Test OpenSearchService session
def test__get_session__(self):
aws_provider = set_mocked_aws_provider([])
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
opensearch = OpenSearchService(aws_provider)
assert opensearch.session.__class__.__name__ == "Session"
# Test OpenSearchService list domains names
def test_list_domain_names(self):
aws_provider = set_mocked_aws_provider([])
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
opensearch = OpenSearchService(aws_provider)
assert len(opensearch.opensearch_domains) == 1
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
@@ -132,7 +132,7 @@ class TestOpenSearchServiceService:
# Test OpenSearchService describe domain
@mock_aws
def test_describe_domain(self):
aws_provider = set_mocked_aws_provider([])
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
opensearch = OpenSearchService(aws_provider)
assert len(opensearch.opensearch_domains) == 1
assert opensearch.opensearch_domains[domain_arn].name == test_domain_name
@@ -237,7 +237,7 @@ class TestOpenSearchServiceService:
"botocore.client.BaseClient._make_api_call",
new=mock_make_api_call_missing_fields,
):
aws_provider = set_mocked_aws_provider([])
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
opensearch = OpenSearchService(aws_provider)
# Should not crash even with missing optional fields
@@ -248,6 +248,7 @@ class Test_rds_instance_no_public_access:
PubliclyAccessible=True,
VpcSecurityGroupIds=[default_sg_id],
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.rds.rds_service import RDS
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
@@ -256,9 +257,15 @@ class Test_rds_instance_no_public_access:
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client",
new=RDS(aws_provider),
with (
mock.patch(
"prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.rds_client",
new=RDS(aws_provider),
),
mock.patch(
"prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access.ec2_client",
new=EC2(aws_provider),
),
):
# Test Check
from prowler.providers.aws.services.rds.rds_instance_no_public_access.rds_instance_no_public_access import (
+4 -2
View File
@@ -96,7 +96,7 @@ ADMINISTRATOR_ROLE_ASSUME_ROLE_POLICY = {
# This here causes to call this function mocking the AWS calls
@mock_aws
def set_mocked_aws_provider(
audited_regions: list[str] = [],
audited_regions: list[str] = [AWS_REGION_US_EAST_1],
audited_account: str = AWS_ACCOUNT_NUMBER,
audited_account_arn: str = AWS_ACCOUNT_ARN,
audited_partition: str = AWS_COMMERCIAL_PARTITION,
@@ -143,7 +143,9 @@ def set_mocked_aws_provider(
# Mock Configiration
provider._scan_unused_services = scan_unused_services
provider._enabled_regions = (
enabled_regions if enabled_regions else set(audited_regions)
enabled_regions
if enabled_regions is not None
else (set(audited_regions) if audited_regions else None)
)
# TODO: we can create the organizations metadata here with moto
provider._organizations_metadata = None
@@ -43,6 +43,39 @@ USER_3 = {
}
# Role data for Directory API role tests
SUPER_ADMIN_ROLE_ID = "13801188331880449"
SEED_ADMIN_ROLE_ID = "13801188331880451"
GROUPS_ADMIN_ROLE_ID = "13801188331880450"
ROLE_SUPER_ADMIN = {
"roleId": SUPER_ADMIN_ROLE_ID,
"roleName": "Super Admin",
"roleDescription": "Super Admin",
"isSystemRole": True,
"isSuperAdminRole": True,
}
# Google automatically assigns _SEED_ADMIN_ROLE to the first account that
# created the domain. It is a super-admin-capable system role with a
# different name, so it must also be excluded when counting "extra" roles.
ROLE_SEED_ADMIN = {
"roleId": SEED_ADMIN_ROLE_ID,
"roleName": "_SEED_ADMIN_ROLE",
"roleDescription": "Super Admin",
"isSystemRole": True,
"isSuperAdminRole": True,
}
ROLE_GROUPS_ADMIN = {
"roleId": GROUPS_ADMIN_ROLE_ID,
"roleName": "_GROUPS_ADMIN_ROLE",
"roleDescription": "Groups Administrator",
"isSystemRole": True,
"isSuperAdminRole": False,
}
def set_mocked_googleworkspace_provider(
identity: GoogleWorkspaceIdentityInfo = GoogleWorkspaceIdentityInfo(
domain=DOMAIN,
@@ -0,0 +1,130 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
class TestCalendarExternalInvitationsWarning:
def test_pass_warnings_enabled(self):
"""Test PASS when external invitation warnings are enabled"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
external_invitations_warning=True
)
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "enabled" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_warnings_disabled(self):
"""Test FAIL when external invitation warnings are disabled"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
external_invitations_warning=False
)
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "disabled" in findings[0].status_extended
def test_fail_no_policy_set(self):
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
external_invitations_warning=None
)
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not explicitly configured" in findings[0].status_extended
def test_no_findings_when_fetch_failed(self):
"""Test no findings returned when the API fetch failed"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_invitations_warning.calendar_external_invitations_warning import (
calendar_external_invitations_warning,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = False
mock_calendar_client.policies = CalendarPolicies()
check = calendar_external_invitations_warning()
findings = check.execute()
assert len(findings) == 0
@@ -0,0 +1,161 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
class TestCalendarExternalSharingPrimaryCalendar:
def test_pass_free_busy_only(self):
"""Test PASS when external sharing is restricted to free/busy only"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY"
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "free/busy information only" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_read_only(self):
"""Test FAIL when external sharing allows read-only access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_ONLY"
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_ONLY" in findings[0].status_extended
assert "free/busy information only" in findings[0].status_extended
def test_fail_read_write(self):
"""Test FAIL when external sharing allows read-write access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE"
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_WRITE" in findings[0].status_extended
def test_fail_no_policy_set(self):
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
primary_calendar_external_sharing=None
)
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not explicitly configured" in findings[0].status_extended
def test_no_findings_when_fetch_failed(self):
"""Test no findings returned when the API fetch failed"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_primary_calendar.calendar_external_sharing_primary_calendar import (
calendar_external_sharing_primary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = False
mock_calendar_client.policies = CalendarPolicies()
check = calendar_external_sharing_primary_calendar()
findings = check.execute()
assert len(findings) == 0
@@ -0,0 +1,161 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
class TestCalendarExternalSharingSecondaryCalendar:
def test_pass_free_busy_only(self):
"""Test PASS when external sharing is restricted to free/busy only"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY"
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "free/busy information only" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_read_only(self):
"""Test FAIL when external sharing allows read-only access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_ONLY"
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_ONLY" in findings[0].status_extended
assert "free/busy information only" in findings[0].status_extended
def test_fail_read_write_manage(self):
"""Test FAIL when external sharing allows read-write-manage access"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE_MANAGE"
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "EXTERNAL_ALL_INFO_READ_WRITE_MANAGE" in findings[0].status_extended
def test_fail_no_policy_set(self):
"""Test FAIL when no explicit policy is set (None) but fetch succeeded"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = True
mock_calendar_client.policies = CalendarPolicies(
secondary_calendar_external_sharing=None
)
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "not explicitly configured" in findings[0].status_extended
def test_no_findings_when_fetch_failed(self):
"""Test no findings returned when the API fetch failed"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar.calendar_client"
) as mock_calendar_client,
):
from prowler.providers.googleworkspace.services.calendar.calendar_external_sharing_secondary_calendar.calendar_external_sharing_secondary_calendar import (
calendar_external_sharing_secondary_calendar,
)
mock_calendar_client.provider = mock_provider
mock_calendar_client.policies_fetched = False
mock_calendar_client.policies = CalendarPolicies()
check = calendar_external_sharing_secondary_calendar()
findings = check.execute()
assert len(findings) == 0
@@ -0,0 +1,231 @@
from unittest.mock import MagicMock, patch
from tests.providers.googleworkspace.googleworkspace_fixtures import (
set_mocked_googleworkspace_provider,
)
class TestCalendarService:
def test_calendar_fetch_policies_all_settings(self):
"""Test fetching all 3 calendar policy settings from Cloud Identity API"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_credentials = MagicMock()
mock_session = MagicMock()
mock_session.credentials = mock_credentials
mock_provider.session = mock_session
mock_service = MagicMock()
mock_policies_list = MagicMock()
# Mock the actual Cloud Identity Policy API v1 response shape:
# - "type" (not "name"), prefixed with "settings/"
# - inner value field names are camelCase
mock_policies_list.execute.return_value = {
"policies": [
{
"setting": {
"type": "settings/calendar.primary_calendar_max_allowed_external_sharing",
"value": {
"maxAllowedExternalSharing": "EXTERNAL_FREE_BUSY_ONLY"
},
}
},
{
"setting": {
"type": "settings/calendar.secondary_calendar_max_allowed_external_sharing",
"value": {
"maxAllowedExternalSharing": "EXTERNAL_ALL_INFO_READ_ONLY"
},
}
},
{
"setting": {
"type": "settings/calendar.external_invitations",
"value": {"warnOnInvite": True},
}
},
]
}
mock_service.policies().list.return_value = mock_policies_list
mock_service.policies().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is True
assert (
calendar.policies.primary_calendar_external_sharing
== "EXTERNAL_FREE_BUSY_ONLY"
)
assert (
calendar.policies.secondary_calendar_external_sharing
== "EXTERNAL_ALL_INFO_READ_ONLY"
)
assert calendar.policies.external_invitations_warning is True
def test_calendar_fetch_policies_empty_response(self):
"""Test handling empty policies response"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
mock_policies_list = MagicMock()
mock_policies_list.execute.return_value = {"policies": []}
mock_service.policies().list.return_value = mock_policies_list
mock_service.policies().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is True
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_fetch_policies_api_error(self):
"""Test handling of API errors during policy fetch"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
mock_service.policies().list.side_effect = Exception("API Error")
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is False
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_fetch_policies_build_service_returns_none(self):
"""Test early return when _build_service fails to construct the client"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=None,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is False
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_fetch_policies_execute_raises(self):
"""Test inner except handler when request.execute() raises during pagination"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
mock_request = MagicMock()
mock_request.execute.side_effect = Exception("Execute failed")
mock_service.policies().list.return_value = mock_request
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.calendar.calendar_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
Calendar,
)
calendar = Calendar(mock_provider)
assert calendar.policies_fetched is False
assert calendar.policies.primary_calendar_external_sharing is None
assert calendar.policies.secondary_calendar_external_sharing is None
assert calendar.policies.external_invitations_warning is None
def test_calendar_policies_model(self):
"""Test CalendarPolicies Pydantic model"""
from prowler.providers.googleworkspace.services.calendar.calendar_service import (
CalendarPolicies,
)
policies = CalendarPolicies(
primary_calendar_external_sharing="EXTERNAL_FREE_BUSY_ONLY",
secondary_calendar_external_sharing="EXTERNAL_ALL_INFO_READ_WRITE",
external_invitations_warning=True,
)
assert policies.primary_calendar_external_sharing == "EXTERNAL_FREE_BUSY_ONLY"
assert (
policies.secondary_calendar_external_sharing
== "EXTERNAL_ALL_INFO_READ_WRITE"
)
assert policies.external_invitations_warning is True
@@ -1,6 +1,12 @@
from unittest.mock import MagicMock, patch
from tests.providers.googleworkspace.googleworkspace_fixtures import (
GROUPS_ADMIN_ROLE_ID,
ROLE_GROUPS_ADMIN,
ROLE_SEED_ADMIN,
ROLE_SUPER_ADMIN,
SEED_ADMIN_ROLE_ID,
SUPER_ADMIN_ROLE_ID,
USER_1,
USER_2,
USER_3,
@@ -25,6 +31,24 @@ class TestDirectoryService:
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles response
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {
"items": [
{"assignedTo": "user1-id", "roleId": SUPER_ADMIN_ROLE_ID},
{"assignedTo": "user2-id", "roleId": SUPER_ADMIN_ROLE_ID},
]
}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
@@ -67,6 +91,17 @@ class TestDirectoryService:
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles response
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {"items": []}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {"items": []}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
@@ -97,6 +132,16 @@ class TestDirectoryService:
mock_service = MagicMock()
mock_service.users().list.side_effect = Exception("API Error")
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {"items": []}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {"items": []}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
@@ -130,3 +175,193 @@ class TestDirectoryService:
assert user.id == "test-id"
assert user.email == "test@test-company.com"
assert user.is_admin is True
assert user.role_assignments == []
def test_directory_list_roles(self):
"""Test that _list_roles correctly builds a roleId-to-roleName mapping"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
# Mock empty users
mock_users_list = MagicMock()
mock_users_list.execute.return_value = {"users": []}
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles response
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {"items": []}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.directory.directory_service import (
Directory,
)
directory = Directory(mock_provider)
super_admin_role = directory._roles[SUPER_ADMIN_ROLE_ID]
assert super_admin_role.name == "Super Admin"
assert super_admin_role.description == "Super Admin"
assert super_admin_role.is_super_admin_role is True
groups_admin_role = directory._roles[GROUPS_ADMIN_ROLE_ID]
assert groups_admin_role.name == "_GROUPS_ADMIN_ROLE"
assert groups_admin_role.description == "Groups Administrator"
assert groups_admin_role.is_super_admin_role is False
def test_directory_role_assignments_populated(self):
"""Test that role assignments are fetched and resolved for super admins"""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
# Mock users - one super admin
mock_users_list = MagicMock()
mock_users_list.execute.return_value = {"users": [USER_1]}
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
# Mock roles
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {
"items": [
{"assignedTo": "user1-id", "roleId": SUPER_ADMIN_ROLE_ID},
{"assignedTo": "user1-id", "roleId": GROUPS_ADMIN_ROLE_ID},
]
}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.directory.directory_service import (
Directory,
)
directory = Directory(mock_provider)
user = directory.users["user1-id"]
role_names = [r.name for r in user.role_assignments]
role_descriptions = [r.description for r in user.role_assignments]
assert "Super Admin" in role_names
assert "_GROUPS_ADMIN_ROLE" in role_names
assert "Groups Administrator" in role_descriptions
assert len(user.role_assignments) == 2
assert user.is_admin is True
def test_directory_second_super_admin_detected_via_role_assignments(self):
"""Regression: a second super admin whose users.list().isAdmin still
reads False (e.g. API propagation lag, or only holding
_SEED_ADMIN_ROLE) must still be recognised as a super admin through
the Role Assignments API, AND any extra non-super-admin roles they
hold must be surfaced on their User object."""
mock_provider = set_mocked_googleworkspace_provider()
mock_provider.audit_config = {}
mock_provider.fixer_config = {}
mock_session = MagicMock()
mock_session.credentials = MagicMock()
mock_provider.session = mock_session
mock_service = MagicMock()
stale_user_1 = {
"id": "user1-id",
"primaryEmail": "admin1@test-company.com",
"isAdmin": False,
}
stale_user_2 = {
"id": "user2-id",
"primaryEmail": "admin2@test-company.com",
"isAdmin": False,
}
mock_users_list = MagicMock()
mock_users_list.execute.return_value = {"users": [stale_user_1, stale_user_2]}
mock_service.users().list.return_value = mock_users_list
mock_service.users().list_next.return_value = None
mock_roles_list = MagicMock()
mock_roles_list.execute.return_value = {
"items": [ROLE_SUPER_ADMIN, ROLE_SEED_ADMIN, ROLE_GROUPS_ADMIN]
}
mock_service.roles().list.return_value = mock_roles_list
mock_service.roles().list_next.return_value = None
mock_ra = MagicMock()
mock_ra.execute.return_value = {
"items": [
{"assignedTo": "user1-id", "roleId": SEED_ADMIN_ROLE_ID},
{"assignedTo": "user2-id", "roleId": SUPER_ADMIN_ROLE_ID},
{"assignedTo": "user2-id", "roleId": GROUPS_ADMIN_ROLE_ID},
]
}
mock_service.roleAssignments().list.return_value = mock_ra
mock_service.roleAssignments().list_next.return_value = None
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_service.GoogleWorkspaceService._build_service",
return_value=mock_service,
),
):
from prowler.providers.googleworkspace.services.directory.directory_service import (
Directory,
)
directory = Directory(mock_provider)
user1 = directory.users["user1-id"]
user2 = directory.users["user2-id"]
assert user1.is_admin is True
assert user2.is_admin is True
assert [r.name for r in user1.role_assignments] == ["_SEED_ADMIN_ROLE"]
user2_role_names = {r.name for r in user2.role_assignments}
assert user2_role_names == {"Super Admin", "_GROUPS_ADMIN_ROLE"}
@@ -0,0 +1,446 @@
from unittest.mock import patch
from prowler.providers.googleworkspace.services.directory.directory_service import (
Role,
User,
)
from tests.providers.googleworkspace.googleworkspace_fixtures import (
CUSTOMER_ID,
DOMAIN,
set_mocked_googleworkspace_provider,
)
SUPER_ADMIN_ROLE = Role(
id="13801188331880449",
name="Super Admin",
description="Super Admin",
is_super_admin_role=True,
)
SEED_ADMIN_ROLE = Role(
id="13801188331880451",
name="_SEED_ADMIN_ROLE",
description="Super Admin",
is_super_admin_role=True,
)
GROUPS_ADMIN_ROLE = Role(
id="13801188331880450",
name="_GROUPS_ADMIN_ROLE",
description="Groups Administrator",
is_super_admin_role=False,
)
USER_MANAGEMENT_ADMIN_ROLE = Role(
id="13801188331880452",
name="_USER_MANAGEMENT_ADMIN_ROLE",
description="User Management Administrator",
is_super_admin_role=False,
)
CUSTOM_ROLE_NO_DESCRIPTION = Role(
id="13801188331880453",
name="custom-helpdesk-role",
description="",
is_super_admin_role=False,
)
class TestDirectorySuperAdminOnlyAdminRoles:
def test_pass_super_admins_only_super_admin_role(self):
"""Test PASS when super admins have only the Super Admin role"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
"admin2-id": User(
id="admin2-id",
email="admin2@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
"user1-id": User(
id="user1-id",
email="user@test-company.com",
is_admin=False,
role_assignments=[],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "used only for super admin activities" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_pass_super_admin_with_seed_admin_role(self):
"""Test PASS when a super admin only holds _SEED_ADMIN_ROLE.
_SEED_ADMIN_ROLE is auto-assigned by Google to the original domain
creator and has isSuperAdminRole=True, so it must not count as an
"extra" role.
"""
users = {
"admin1-id": User(
id="admin1-id",
email="playground@prowler.cloud",
is_admin=True,
role_assignments=[SEED_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "_SEED_ADMIN_ROLE" not in findings[0].status_extended
def test_pass_super_admin_with_both_super_admin_and_seed_admin(self):
"""Test PASS when admin holds both Super Admin and _SEED_ADMIN_ROLE"""
users = {
"admin1-id": User(
id="admin1-id",
email="playground@prowler.cloud",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, SEED_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
def test_fail_super_admin_with_additional_roles(self):
"""Test FAIL when a super admin also has additional admin roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
),
"user1-id": User(
id="user1-id",
email="user@test-company.com",
is_admin=False,
role_assignments=[],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "admin1@test-company.com" in findings[0].status_extended
assert "Groups Administrator" in findings[0].status_extended
assert "_GROUPS_ADMIN_ROLE" not in findings[0].status_extended
assert "used only for super admin activities" in findings[0].status_extended
assert findings[0].resource_name == DOMAIN
assert findings[0].customer_id == CUSTOMER_ID
def test_fail_seed_admin_with_additional_roles(self):
"""Test FAIL when a _SEED_ADMIN_ROLE holder also has extra roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="playground@prowler.cloud",
is_admin=True,
role_assignments=[SEED_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "playground@prowler.cloud" in findings[0].status_extended
assert "Groups Administrator" in findings[0].status_extended
assert "_GROUPS_ADMIN_ROLE" not in findings[0].status_extended
assert "_SEED_ADMIN_ROLE" not in findings[0].status_extended
def test_fail_multiple_super_admins_with_extra_roles(self):
"""Test FAIL lists all super admins that have additional roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, GROUPS_ADMIN_ROLE],
),
"admin2-id": User(
id="admin2-id",
email="admin2@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, USER_MANAGEMENT_ADMIN_ROLE],
),
"admin3-id": User(
id="admin3-id",
email="admin3@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "admin1@test-company.com" in findings[0].status_extended
assert "admin2@test-company.com" in findings[0].status_extended
assert "admin3@test-company.com" not in findings[0].status_extended
def test_no_findings_when_no_users(self):
"""Test no findings when there are no users"""
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = {}
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 0
def test_non_super_admin_with_roles_not_flagged(self):
"""Test that users who are not super admins are ignored even if they have roles"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE],
),
"delegated1-id": User(
id="delegated1-id",
email="delegated@test-company.com",
is_admin=False,
role_assignments=[GROUPS_ADMIN_ROLE],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
assert "delegated@test-company.com" not in findings[0].status_extended
def test_pass_super_admin_with_empty_role_assignments(self):
"""Test PASS when super admin has no role assignments (edge case)"""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "PASS"
def test_fail_custom_role_without_description_falls_back_to_name(self):
"""A custom role with an empty description should be displayed
using its name as a fall-back, so the FAIL message is never blank
for users that genuinely hold extra roles."""
users = {
"admin1-id": User(
id="admin1-id",
email="admin1@test-company.com",
is_admin=True,
role_assignments=[SUPER_ADMIN_ROLE, CUSTOM_ROLE_NO_DESCRIPTION],
),
}
mock_provider = set_mocked_googleworkspace_provider()
with (
patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=mock_provider,
),
patch(
"prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles.directory_client"
) as mock_directory_client,
):
from prowler.providers.googleworkspace.services.directory.directory_super_admin_only_admin_roles.directory_super_admin_only_admin_roles import (
directory_super_admin_only_admin_roles,
)
mock_directory_client.users = users
mock_directory_client.provider = mock_provider
check = directory_super_admin_only_admin_roles()
findings = check.execute()
assert len(findings) == 1
assert findings[0].status == "FAIL"
assert "custom-helpdesk-role" in findings[0].status_extended