fix(alibabacloud): fix CS service SDK compatibility and harden Alibaba provider (#10871)

This commit is contained in:
Daniel Barranquero
2026-04-24 09:26:09 +02:00
committed by GitHub
parent 0df24eeff6
commit 80d62f355f
22 changed files with 1119 additions and 187 deletions
@@ -203,10 +203,10 @@ For detailed authentication configuration, see the [Authentication documentation
## Regions
Alibaba Cloud has multiple regions across the globe. By default, Prowler audits all available regions. You can specify specific regions using the `--regions` CLI argument:
Alibaba Cloud has multiple regions across the globe. By default, Prowler audits all available regions. You can specify specific regions using the `--region` CLI argument:
```bash
prowler alibabacloud --regions cn-hangzhou cn-shanghai
prowler alibabacloud --region cn-hangzhou cn-shanghai
```
The list of supported regions is maintained in [`prowler/providers/alibabacloud/config.py`](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/alibabacloud/config.py).
@@ -2,7 +2,7 @@
title: 'Alibaba Cloud Authentication in Prowler'
---
Prowler requires Alibaba Cloud credentials to perform security checks. Authentication is supported via multiple methods, prioritized as follows:
Prowler supports multiple Alibaba Cloud authentication flows. If more than one is configured at the same time, the provider resolves them in this order:
1. **Credentials URI**
2. **OIDC Role Authentication**
@@ -12,119 +12,325 @@ Prowler requires Alibaba Cloud credentials to perform security checks. Authentic
6. **Permanent Access Keys**
7. **Default Credential Chain**
## Authentication Methods
<Warning>
Do not use the AccessKey pair of the main Alibaba Cloud account for Prowler. Use a RAM user, a RAM role, or another temporary credential flow instead.
</Warning>
### Credentials URI (Recommended for Centralized Services)
## Choose The Right Method
Prowler can retrieve credentials from an external URI endpoint. Provide the URI via the `--credentials-uri` flag or the `ALIBABA_CLOUD_CREDENTIALS_URI` environment variable. The URI must return credentials in the standard JSON format.
| Where Prowler runs | What you need to create | Recommended method |
| --- | --- | --- |
| Local workstation | RAM user + AccessKey pair | [RAM User And AccessKey](#ram-user-and-accesskey) |
| CI runner outside Alibaba Cloud | RAM user + AccessKey pair, optionally a target RAM role | [RAM Role Assumption](#ram-role-assumption-recommended) |
| ECS instance | ECS RAM role attached to the instance | [ECS RAM Role](#ecs-ram-role) |
| ACK / Kubernetes | OIDC IdP + RAM role + OIDC token file | [OIDC Role Authentication](#oidc-role-authentication) |
| Internal credential broker | An HTTP endpoint that returns STS credentials | [Credentials URI](#credentials-uri) |
## RAM User And AccessKey
This is the simplest setup for a workstation or a basic CI runner.
### Create The RAM User
1. Open the [RAM console](https://ram.console.alibabacloud.com/).
2. Go to `Identities` > `Users`.
3. Click `Create User`.
4. Enter a logon name and display name.
5. In `Access Configuration`, select `Permanent AccessKey`.
![Create a RAM user and enable Permanent AccessKey](./img/create_user.png)
6. Save the generated `AccessKey ID` and `AccessKey Secret` immediately. Alibaba Cloud only shows the secret once.
7. Grant the user the read permissions required for the Alibaba Cloud services you want Prowler to scan.
![Grant permissions to the RAM user](./img/grant_permissions.png)
Alibaba Cloud walkthroughs with current console screenshots:
- [Create a RAM user](https://www.alibabacloud.com/help/en/ram/user-guide/create-a-ram-user)
- [Create an AccessKey pair](https://www.alibabacloud.com/help/en/ram/user-guide/create-an-accesskey-pair)
- [Grant permissions to a RAM user](https://www.alibabacloud.com/help/en/ram/user-guide/grant-permissions-to-the-ram-user)
### Use The AccessKey With Prowler
```bash
# Using CLI flag
prowler alibabacloud --credentials-uri http://localhost:8080/credentials
# Or using environment variable
export ALIBABA_CLOUD_CREDENTIALS_URI="http://localhost:8080/credentials"
prowler alibabacloud
```
### OIDC Role Authentication (Recommended for ACK/Kubernetes)
OIDC authentication assumes the specified role using an OIDC token. This is the most secure method for containerized applications running in ACK (Alibaba Container Service for Kubernetes) with RRSA enabled.
The role ARN can be provided via the `--oidc-role-arn` flag or the `ALIBABA_CLOUD_ROLE_ARN` environment variable. The OIDC provider ARN and token file must be set via environment variables:
- `ALIBABA_CLOUD_OIDC_PROVIDER_ARN`
- `ALIBABA_CLOUD_OIDC_TOKEN_FILE`
```bash
# Using CLI flag for role ARN
export ALIBABA_CLOUD_OIDC_PROVIDER_ARN="acs:ram::123456789012:oidc-provider/ack-rrsa-provider"
export ALIBABA_CLOUD_OIDC_TOKEN_FILE="/var/run/secrets/tokens/oidc-token"
prowler alibabacloud --oidc-role-arn acs:ram::123456789012:role/YourRole
# Or using all environment variables
export ALIBABA_CLOUD_ROLE_ARN="acs:ram::123456789012:role/YourRole"
export ALIBABA_CLOUD_OIDC_PROVIDER_ARN="acs:ram::123456789012:oidc-provider/ack-rrsa-provider"
export ALIBABA_CLOUD_OIDC_TOKEN_FILE="/var/run/secrets/tokens/oidc-token"
prowler alibabacloud
```
### ECS RAM Role (Recommended for ECS Instances)
When running on an ECS instance with an attached RAM role, Prowler can obtain credentials from the ECS instance metadata service.
```bash
# Using CLI argument
prowler alibabacloud --ecs-ram-role RoleName
# Or using environment variable
export ALIBABA_CLOUD_ECS_METADATA="RoleName"
prowler alibabacloud
```
### RAM Role Assumption (Recommended for Cross-Account)
For cross-account access, use RAM role assumption. Provide the initial credentials (access keys) via environment variables and the target role ARN via the `--role-arn` flag or the `ALIBABA_CLOUD_ROLE_ARN` environment variable.
The `--role-session-name` flag customizes the session identifier (defaults to `ProwlerAssessmentSession`).
```bash
# Using CLI flags
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
prowler alibabacloud --role-arn acs:ram::123456789012:role/ProwlerAuditRole --role-session-name MyAuditSession
# Or using all environment variables
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
export ALIBABA_CLOUD_ROLE_ARN="acs:ram::123456789012:role/ProwlerAuditRole"
prowler alibabacloud
```
### STS Temporary Credentials
Prowler also accepts `ALIYUN_ACCESS_KEY_ID` and `ALIYUN_ACCESS_KEY_SECRET` for compatibility, but `ALIBABA_CLOUD_*` is the preferred naming.
If you already have temporary STS credentials, you can provide them via environment variables.
### Use The Default Credential Chain
If you prefer not to export credentials in every shell, you can store them with the Alibaba Cloud CLI and let Prowler reuse the default credential chain from `~/.aliyun/config.json`.
```bash
aliyun configure --mode AK
prowler alibabacloud
```
For profile management details, see Alibaba Cloud's [CLI credential management guide](https://www.alibabacloud.com/help/en/cli/other-configure-command-operations).
## RAM Role Assumption (Recommended)
Use this when:
- you want short-lived credentials instead of long-lived AccessKeys in Prowler,
- you are scanning another Alibaba Cloud account, or
- you are configuring Alibaba Cloud in Prowler Cloud and want to provide a `Role ARN`.
This flow has two parts:
1. A source identity that can call `sts:AssumeRole`.
2. A target RAM role that has the scan permissions.
### Create The Source Identity
Create a RAM user with an AccessKey pair by following the steps in [RAM User And AccessKey](#ram-user-and-accesskey), or reuse an existing automation identity.
### Create The Target Role
1. Open the [RAM console](https://ram.console.alibabacloud.com/).
2. Go to `Identities` > `Roles`.
3. Click `Create Role`.
4. Set `Principal Type` to `Cloud Account`.
5. Choose:
- `Current Account` if the RAM user and the role are in the same account.
- `Other Account` if the RAM user belongs to a different Alibaba Cloud account.
6. Give the role a name such as `ProwlerAuditRole`.
7. Attach the scan permissions to the role.
8. Copy the role ARN in the format `acs:ram::<account-id>:role/<role-name>`.
If you want to restrict the role so that only one RAM user or one RAM role can assume it, edit the trust policy accordingly.
Helpful references:
- [Create a RAM role for a trusted Alibaba Cloud account](https://www.alibabacloud.com/help/en/ram/user-guide/create-a-ram-role-for-a-trusted-alibaba-cloud-account)
- [Assume a RAM role](https://www.alibabacloud.com/help/doc-detail/116820.html)
### Allow The Source Identity To Assume The Role
The source RAM user must be able to call `sts:AssumeRole`.
The easiest starting point is to attach Alibaba Cloud's `AliyunSTSAssumeRoleAccess` policy to that RAM user. If you want tighter scope, attach a custom policy limited to the target role ARN.
### Run Prowler
```bash
export ALIBABA_CLOUD_ACCESS_KEY_ID="source-user-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="source-user-access-key-secret"
prowler alibabacloud \
--role-arn acs:ram::123456789012:role/ProwlerAuditRole \
--role-session-name ProwlerAssessmentSession
```
You can also set the role ARN with `ALIBABA_CLOUD_ROLE_ARN`, but the source AccessKey pair is still required for this flow.
## STS Temporary Credentials
Use this if another tool already gives you a temporary `AccessKey ID`, `AccessKey Secret`, and `SecurityToken`.
This is common when:
- a CI platform brokers Alibaba credentials for the job,
- your internal tooling already calls `AssumeRole`, or
- you want to test with a short-lived session before switching to a RAM role flow.
```bash
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-sts-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-sts-access-key-secret"
export ALIBABA_CLOUD_SECURITY_TOKEN="your-sts-security-token"
prowler alibabacloud
```
### Permanent Access Keys
You can use standard permanent access keys via environment variables.
You can also store the session in the Alibaba CLI configuration:
```bash
export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"
export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
aliyun configure --mode StsToken
prowler alibabacloud
```
## Required Permissions
<Note>
Prowler does not mint standalone STS sessions for you. If you use this method, you must provide all three STS values from your external workflow.
</Note>
The credentials used by Prowler should have the minimum required permissions to audit the resources. At a minimum, the following permissions are recommended:
## ECS RAM Role
- `ram:GetUser`
- `ram:ListUsers`
- `ram:GetPasswordPolicy`
- `ram:GetAccountSummary`
- `ram:ListVirtualMFADevices`
- `ram:ListGroups`
- `ram:ListPolicies`
- `ram:ListAccessKeys`
- `ram:GetLoginProfile`
- `ram:ListPoliciesForUser`
- `ram:ListGroupsForUser`
- `actiontrail:DescribeTrails`
- `oss:GetBucketLogging`
- `oss:GetBucketAcl`
- `rds:DescribeDBInstances`
- `rds:DescribeDBInstanceAttribute`
- `ecs:DescribeInstances`
- `vpc:DescribeVpcs`
- `sls:ListProject`
- `sls:ListAlerts`
- `sls:ListLogStores`
- `sls:GetLogStore`
Use this when Prowler runs on an ECS instance and you do not want to store any AccessKeys on disk.
### Create And Attach The Role
1. Open the [RAM console](https://ram.console.alibabacloud.com/).
2. Go to `Identities` > `Roles`.
3. Click `Create Role`.
4. Set the trusted entity to `Alibaba Cloud Service`.
5. Select `ECS` as the trusted service.
6. Attach the read permissions required for the scan.
7. Attach that RAM role to the ECS instance that runs Prowler.
Alibaba Cloud guide:
- [Instance RAM roles](https://www.alibabacloud.com/help/en/doc-detail/54579.html)
### Run Prowler
```bash
prowler alibabacloud --ecs-ram-role ProwlerEcsRole
```
Or:
```bash
export ALIBABA_CLOUD_ECS_METADATA="ProwlerEcsRole"
prowler alibabacloud
```
## OIDC Role Authentication
Use this when Prowler runs in ACK or another Kubernetes environment that provides an OIDC token file.
### Create The OIDC Identity Provider
1. Open the [RAM console](https://ram.console.alibabacloud.com/).
2. Go to `Integrations` > `SSO`.
3. Select `Role-based SSO`, then the `OIDC` tab.
4. Click `Create IdP`.
5. Fill in:
- `IdP Name`
- `Issuer URL`
- `Fingerprint`
- `Client ID`
6. Create the IdP and note its ARN.
Alibaba Cloud guides:
- [Manage an OIDC IdP](https://www.alibabacloud.com/help/en/ram/manage-an-oidc-idp)
- [Overview of role-based OIDC SSO](https://www.alibabacloud.com/help/en/ram/overview-of-oidc-based-sso)
### Create The RAM Role Trusted By That IdP
Create a RAM role whose trusted entity is the OIDC IdP, then attach the scan permissions to that role.
If you are running in ACK with RRSA, this is typically the role bound to the service account that runs Prowler.
### Provide The OIDC Variables To Prowler
Prowler currently expects:
- `--oidc-role-arn` for the RAM role ARN,
- `ALIBABA_CLOUD_OIDC_PROVIDER_ARN` for the OIDC provider ARN,
- `ALIBABA_CLOUD_OIDC_TOKEN_FILE` for the token file path.
Example:
```bash
export ALIBABA_CLOUD_OIDC_PROVIDER_ARN="acs:ram::123456789012:oidc-provider/ack-rrsa-provider"
export ALIBABA_CLOUD_OIDC_TOKEN_FILE="/var/run/secrets/ack.alibabacloud.com/rrsa-tokens/token"
prowler alibabacloud --oidc-role-arn acs:ram::123456789012:role/ProwlerAckRole
```
If you use ACK RRSA, Alibaba's `ack-pod-identity-webhook` can inject the three required environment variables and mount the token file into the pod automatically:
- [ack-pod-identity-webhook](https://www.alibabacloud.com/help/en/cs/user-guide/ack-pod-identity-webhook)
- [Use RRSA to authorize different pods to access different cloud services](https://www.alibabacloud.com/help/doc-detail/356611.html)
<Note>
Even if your pod already exposes `ALIBABA_CLOUD_ROLE_ARN`, use `--oidc-role-arn` with Prowler. The provider currently reads the role ARN for OIDC from the CLI argument.
</Note>
## Credentials URI
Use this only if you already operate an internal credential broker that returns temporary Alibaba Cloud credentials over HTTP.
The endpoint must return a JSON body with this structure:
```json
{
"Code": "Success",
"AccessKeyId": "STS.xxxxx",
"AccessKeySecret": "xxxxx",
"SecurityToken": "xxxxx",
"Expiration": "2026-04-23T10:00:00Z"
}
```
Run Prowler with:
```bash
prowler alibabacloud --credentials-uri http://localhost:8080/credentials
```
Or:
```bash
export ALIBABA_CLOUD_CREDENTIALS_URI="http://localhost:8080/credentials"
prowler alibabacloud
```
For the expected response format, see Alibaba Cloud's SDK guide for [URI credentials](https://www.alibabacloud.com/help/en/sdk/developer-reference/v2-manage-access-credentials).
## Permissions Guidance
The exact minimum policy depends on the checks and services you enable.
If you are using the RAM console's `Grant Permission` screen, search for the **system policy names** below. Alibaba Cloud often uses product policy names that differ from the service name shown in Prowler.
### System Policies In The RAM Console
| Prowler use case | Policy name in RAM console | Notes |
| --- | --- | --- |
| Source user for `--role-arn` | `AliyunSTSAssumeRoleAccess` | Grants `sts:AssumeRole` so the source identity can assume the scan role. |
| RAM checks | `AliyunRAMReadOnlyAccess` | Covers RAM read APIs such as users, groups, policies, MFA devices, and account alias. |
| ECS checks | `AliyunECSReadOnlyAccess` | Read-only ECS access. |
| VPC checks | `AliyunVPCReadOnlyAccess` | Read-only VPC access. |
| OSS checks | `AliyunOSSReadOnlyAccess` | Read-only OSS access. |
| ActionTrail checks | `AliyunActionTrailReadOnlyAccess` | Read-only ActionTrail access. |
| SLS checks | `AliyunLogReadOnlyAccess` | In the RAM console, Simple Log Service appears as `Log`. |
| RDS checks | `AliyunRDSReadOnlyAccess` | Read-only RDS access. |
| ACK / Container Service checks | `AliyunCSReadOnlyAccess` | In the RAM console, ACK permissions appear under `CS`. |
| Security Center checks | `AliyunYundunSASReadOnlyAccess` | In the RAM console, Security Center appears under `Yundun SAS`. |
### Recommended Starting Point
For a broad Alibaba Cloud scan, the identity used by Prowler usually needs read access to the services Prowler currently audits, including:
- `RAM`
- `ECS`
- `VPC`
- `OSS`
- `ActionTrail`
- `Simple Log Service (SLS)`
- `RDS`
- `Container Service / ACK`
- `Security Center`
Use the following setup as a practical starting point:
- If you use **static AccessKeys**, attach the read-only policies above directly to the RAM user used by Prowler.
- If you use **RAM role assumption**, attach `AliyunSTSAssumeRoleAccess` to the source RAM user and attach the read-only policies above to the target scan role.
- If you use **ECS RAM role** or **OIDC/RRSA**, attach the read-only policies above to the role assumed by Prowler.
If you prefer a tighter custom policy instead of system policies, the current provider relies on read APIs such as:
- `ram:Get*`, `ram:List*`
- `ecs:Describe*`
- `vpc:Describe*`
- `oss:Get*`, `oss:List*`
- `actiontrail:Describe*`
- `log:Get*`, `log:List*`, `log:Query*`
- `rds:Describe*`
- `cs:Get*`, `cs:List*`, `cs:Describe*`
- `yundun-sas:Get*`, `yundun-sas:Describe*`, `yundun-sas:List*`
<Note>
If a service is denied, Prowler can still start, but checks for that service may fail or return incomplete results.
</Note>
@@ -12,9 +12,9 @@ Before you begin, make sure you have:
1. An **Alibaba Cloud Account ID** (visible in the Alibaba Cloud Console under your profile).
2. **Credentials** with appropriate permissions:
- **RAM User with Access Keys**: For static credential authentication.
- **RAM Role**: For cross-account access using role assumption (recommended).
3. The required permissions for Prowler to audit your resources. See the [Alibaba Cloud Authentication](/user-guide/providers/alibabacloud/authentication) guide for the full list of required permissions.
- **RAM User with Access Keys**: For local CLI usage or simple CI setups. See [RAM User and AccessKey](/user-guide/providers/alibabacloud/authentication#ram-user-and-accesskey).
- **RAM Role**: For role assumption and Prowler Cloud onboarding. See [RAM Role Assumption](/user-guide/providers/alibabacloud/authentication#ram-role-assumption-recommended).
3. The required permissions for Prowler to audit your resources. See the [Alibaba Cloud Authentication](/user-guide/providers/alibabacloud/authentication) guide for setup steps and permission guidance.
<CardGroup cols={2}>
<Card title="Prowler Cloud" icon="cloud" href="#prowler-cloud">
@@ -64,7 +64,7 @@ After the Account ID is in place, select the authentication method that matches
#### RAM Role Assumption (Recommended)
Use this method for secure cross-account access. For detailed instructions on how to create the RAM role, see the [Authentication guide](/user-guide/providers/alibabacloud/authentication#ram-role-assumption-recommended-for-cross-account).
Use this method for secure cross-account access. For detailed instructions on how to create the RAM role, see the [Authentication guide](/user-guide/providers/alibabacloud/authentication#ram-role-assumption-recommended).
1. Enter the **Role ARN** (format: `acs:ram::<account-id>:role/<role-name>`)
2. Enter the **Access Key ID** and **Access Key Secret** of the RAM user that will assume the role
@@ -77,7 +77,7 @@ The RAM user whose credentials you provide must have permission to assume the ta
#### Credentials (Static Access Keys)
Use static credentials for quick scans (not recommended for production). For detailed setup, see the [Authentication guide](/user-guide/providers/alibabacloud/authentication#permanent-access-keys).
Use static credentials for quick scans (not recommended for production). For detailed setup, see the [Authentication guide](/user-guide/providers/alibabacloud/authentication#ram-user-and-accesskey).
1. Enter the **Access Key ID** and **Access Key Secret**
@@ -104,7 +104,7 @@ You can also run Alibaba Cloud assessments directly from the CLI. Both command-l
### Step 1: Select an Authentication Method
Choose one of the following authentication methods. For the complete list and detailed configuration, see the [Authentication guide](/user-guide/providers/alibabacloud/authentication).
Choose one of the following authentication methods. For step-by-step credential creation and the full list of supported authentication modes, see the [Authentication guide](/user-guide/providers/alibabacloud/authentication).
#### Environment Variables
@@ -114,6 +114,13 @@ export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"
prowler alibabacloud
```
#### Default Credential Chain
```bash
aliyun configure --mode AK
prowler alibabacloud
```
#### RAM Role Assumption
```bash
@@ -146,7 +153,7 @@ prowler alibabacloud
#### Scan specific regions
```bash
prowler alibabacloud --regions cn-hangzhou cn-shanghai
prowler alibabacloud --region cn-hangzhou cn-shanghai
```
#### Run specific checks
Binary file not shown.

After

Width:  |  Height:  |  Size: 134 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 282 KiB

+4
View File
@@ -10,6 +10,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
- `repository_default_branch_dismisses_stale_reviews` check for GitHub provider to ensure stale pull request approvals are dismissed when new commits are pushed [(#10569)](https://github.com/prowler-cloud/prowler/pull/10569)
- Official Prowler GitHub Action (`prowler-cloud/prowler@5.25`) for running scans in GitHub workflows with optional `--push-to-cloud` and SARIF upload to GitHub Code Scanning [(#10872)](https://github.com/prowler-cloud/prowler/pull/10872)
### 🐞 Fixed
- Alibaba Cloud CS service SDK compatibility, harden other services and improve documentation [(#10871)](https://github.com/prowler-cloud/prowler/pull/10871)
---
## [5.24.3] (Prowler v5.24.3)
@@ -68,6 +68,45 @@ class AlibabaCloudService:
return self.regional_clients[region]
return self.client
@staticmethod
def _is_retriable_error(error: Exception) -> bool:
"""Return True when an Alibaba API error is worth retrying once."""
error_code = getattr(error, "code", "")
status_code = getattr(error, "statusCode", None) or getattr(
error, "status_code", None
)
message = str(error)
retriable_codes = {"ServiceUnavailable", "Throttling", "Throttling.User"}
retriable_substrings = (
"Connection reset by peer",
"Connection aborted",
"ConnectTimeoutError",
"ReadTimeout",
"timed out",
"temporarily unavailable",
)
return (
error_code in retriable_codes
or status_code in {429, 500, 502, 503, 504}
or any(fragment in message for fragment in retriable_substrings)
)
def _call_with_retries(self, func, *args, retries: int = 1, **kwargs):
"""Call a function and retry once for transient Alibaba API failures."""
last_error = None
for attempt in range(retries + 1):
try:
return func(*args, **kwargs)
except Exception as error: # pragma: no cover - exercised via services
last_error = error
if attempt >= retries or not self._is_retriable_error(error):
raise
raise last_error
def __threading_call__(self, call, iterator=None):
"""
Execute a function across multiple regions or items using threads.
+31 -9
View File
@@ -150,6 +150,34 @@ class AlibabaCloudSession:
)
return self._credentials
@staticmethod
def _get_securitycenter_endpoint(region: str) -> str:
"""Return the public Security Center OpenAPI endpoint for a region."""
securitycenter_region = region or ALIBABACLOUD_DEFAULT_REGION
if securitycenter_region.startswith("cn-"):
return "tds.cn-shanghai.aliyuncs.com"
return "tds.ap-southeast-1.aliyuncs.com"
@staticmethod
def _get_rds_endpoint(region: str) -> str:
"""Return the public RDS OpenAPI endpoint for a region."""
rds_region = region or ALIBABACLOUD_DEFAULT_REGION
shared_rds_regions = {
"cn-qingdao",
"cn-beijing",
"cn-hangzhou",
"cn-shanghai",
"cn-shenzhen",
"cn-heyuan",
"cn-hongkong",
"cn-beijing-finance-1",
"cn-hangzhou-finance",
"cn-shanghai-finance-1",
}
if rds_region in shared_rds_regions:
return "rds.aliyuncs.com"
return f"rds.{rds_region}.aliyuncs.com"
def client(self, service: str, region: str = None):
"""
Create a service client for the given service and region.
@@ -196,11 +224,8 @@ class AlibabaCloudSession:
config.endpoint = f"ecs.{ALIBABACLOUD_DEFAULT_REGION}.aliyuncs.com"
return EcsClient(config)
elif service == "sas" or service == "securitycenter":
# SAS (Security Center) endpoint is regional: sas.{region}.aliyuncs.com
if region:
config.endpoint = f"sas.{region}.aliyuncs.com"
else:
config.endpoint = f"sas.{ALIBABACLOUD_DEFAULT_REGION}.aliyuncs.com"
# Security Center uses regional groups of shared TDS endpoints.
config.endpoint = self._get_securitycenter_endpoint(region)
return SasClient(config)
elif service == "oss":
if region:
@@ -226,10 +251,7 @@ class AlibabaCloudSession:
config.endpoint = f"cs.{ALIBABACLOUD_DEFAULT_REGION}.aliyuncs.com"
return CSClient(config)
elif service == "rds":
if region:
config.endpoint = f"rds.{region}.aliyuncs.com"
else:
config.endpoint = f"rds.{ALIBABACLOUD_DEFAULT_REGION}.aliyuncs.com"
config.endpoint = self._get_rds_endpoint(region)
return RdsClient(config)
elif service == "sls":
if region:
@@ -33,7 +33,7 @@ class ActionTrail(AlibabaCloudService):
try:
# Use Tea SDK client (ActionTrail is regional service)
request = actiontrail_models.DescribeTrailsRequest()
response = regional_client.describe_trails(request)
response = self._call_with_retries(regional_client.describe_trails, request)
if response and response.body and response.body.trail_list:
# trail_list is already a list, not an object with a trail attribute
@@ -1,4 +1,6 @@
import json
from datetime import datetime
from threading import Lock
from typing import Optional
from alibabacloud_cs20151215 import models as cs_models
@@ -23,6 +25,8 @@ class CS(AlibabaCloudService):
# Fetch CS resources
self.clusters = []
self._cluster_ids_lock = Lock()
self._seen_cluster_ids = set()
self.__threading_call__(self._describe_clusters)
def _describe_clusters(self, regional_client):
@@ -33,18 +37,30 @@ class CS(AlibabaCloudService):
try:
# DescribeClustersV1 returns cluster list
request = cs_models.DescribeClustersV1Request()
response = regional_client.describe_clusters_v1(request)
response = self._call_with_retries(
regional_client.describe_clusters_v1, request
)
if response and response.body and response.body.clusters:
for cluster_data in response.body.clusters:
cluster_id = getattr(cluster_data, "cluster_id", "")
cluster_region = getattr(cluster_data, "region_id", "") or region
if (
cluster_region != region
and cluster_region in self.regional_clients
):
continue
if not self.audit_resources or is_resource_filtered(
cluster_id, self.audit_resources
):
cluster_client = self.regional_clients.get(
cluster_region, regional_client
)
# Get detailed information for each cluster
cluster_detail = self._get_cluster_detail(
regional_client, cluster_id
cluster_client, cluster_id
)
if cluster_detail:
@@ -60,12 +76,12 @@ class CS(AlibabaCloudService):
# Get node pools to check CloudMonitor
cloudmonitor_enabled = self._check_cloudmonitor_enabled(
regional_client, cluster_id
cluster_client, cluster_id
)
# Check if cluster checks have been run in the last week
last_check_time = self._get_last_cluster_check(
regional_client, cluster_id
cluster_client, cluster_id
)
# Check addons for dashboard, network policy, etc.
@@ -78,33 +94,33 @@ class CS(AlibabaCloudService):
cluster_detail, region
)
self.clusters.append(
Cluster(
id=cluster_id,
name=getattr(cluster_data, "name", cluster_id),
region=region,
cluster_type=getattr(
cluster_data, "cluster_type", ""
),
state=getattr(cluster_data, "state", ""),
audit_project_name=audit_project_name,
log_service_enabled=bool(audit_project_name),
cloudmonitor_enabled=cloudmonitor_enabled,
rbac_enabled=rbac_enabled,
last_check_time=last_check_time,
dashboard_enabled=addons_status[
"dashboard_enabled"
],
network_policy_enabled=addons_status[
"network_policy_enabled"
],
eni_multiple_ip_enabled=addons_status[
"eni_multiple_ip_enabled"
],
private_cluster_enabled=not public_access_enabled,
)
cluster = Cluster(
id=cluster_id,
name=getattr(cluster_data, "name", cluster_id),
region=cluster_region,
cluster_type=getattr(cluster_data, "cluster_type", ""),
state=getattr(cluster_data, "state", ""),
audit_project_name=audit_project_name,
log_service_enabled=bool(audit_project_name),
cloudmonitor_enabled=cloudmonitor_enabled,
rbac_enabled=rbac_enabled,
last_check_time=last_check_time,
dashboard_enabled=addons_status["dashboard_enabled"],
network_policy_enabled=addons_status[
"network_policy_enabled"
],
eni_multiple_ip_enabled=addons_status[
"eni_multiple_ip_enabled"
],
private_cluster_enabled=not public_access_enabled,
)
with self._cluster_ids_lock:
if cluster_id in self._seen_cluster_ids:
continue
self._seen_cluster_ids.add(cluster_id)
self.clusters.append(cluster)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
@@ -114,19 +130,43 @@ class CS(AlibabaCloudService):
"""Get detailed information for a specific cluster."""
try:
# DescribeClusterDetail returns detailed cluster information
request = cs_models.DescribeClusterDetailRequest()
response = regional_client.describe_cluster_detail(cluster_id, request)
if hasattr(cs_models, "DescribeClusterDetailRequest"):
request = cs_models.DescribeClusterDetailRequest()
response = self._call_with_retries(
regional_client.describe_cluster_detail,
cluster_id,
request,
)
else:
response = self._call_with_retries(
regional_client.describe_cluster_detail, cluster_id
)
if response and response.body:
# Convert response body to dict
body = response.body
result = {"meta_data": {}}
result = {"meta_data": {}, "parameters": {}, "master_url": ""}
# Check if meta_data exists in the response
# The ACK SDK exposes meta_data as a JSON string in recent versions.
if hasattr(body, "meta_data"):
meta_data = body.meta_data
if meta_data:
result["meta_data"] = dict(meta_data)
if isinstance(meta_data, dict):
result["meta_data"] = meta_data
elif isinstance(meta_data, str):
try:
parsed_meta_data = json.loads(meta_data)
except (TypeError, ValueError):
parsed_meta_data = {}
if isinstance(parsed_meta_data, dict):
result["meta_data"] = parsed_meta_data
if hasattr(body, "parameters") and body.parameters:
result["parameters"] = body.parameters
if hasattr(body, "master_url") and body.master_url:
result["master_url"] = body.master_url
return result
@@ -143,7 +183,9 @@ class CS(AlibabaCloudService):
try:
# DescribeClusterNodePools returns node pool information
request = cs_models.DescribeClusterNodePoolsRequest()
response = regional_client.describe_cluster_node_pools(cluster_id, request)
response = self._call_with_retries(
regional_client.describe_cluster_node_pools, cluster_id, request
)
if response and response.body and response.body.nodepools:
nodepools = response.body.nodepools
@@ -214,9 +256,19 @@ class CS(AlibabaCloudService):
or None if no successful checks found.
"""
try:
# DescribeClusterChecks returns cluster check history
request = cs_models.DescribeClusterChecksRequest()
response = regional_client.describe_cluster_checks(cluster_id, request)
# Newer ACK SDKs expose ListClusterChecks; older ones used DescribeClusterChecks.
if hasattr(cs_models, "ListClusterChecksRequest") and hasattr(
regional_client, "list_cluster_checks"
):
request = cs_models.ListClusterChecksRequest()
response = self._call_with_retries(
regional_client.list_cluster_checks, cluster_id, request
)
else:
request = cs_models.DescribeClusterChecksRequest()
response = self._call_with_retries(
regional_client.describe_cluster_checks, cluster_id, request
)
if response and response.body and response.body.checks:
checks = response.body.checks
@@ -267,18 +319,20 @@ class CS(AlibabaCloudService):
# Note: Addons structure from API is typically a string representation of JSON or a list
# Based on sample: "Addons": [{"name": "gateway-api", ...}, ...]
addons = meta_data.get("Addons", [])
if addons is None:
addons = []
# If addons is string, try to parse it?
# The SDK typically handles this conversion, but let's be safe
if isinstance(addons, str):
import json
try:
addons = json.loads(addons)
except Exception:
addons = []
for addon in addons:
if not isinstance(addon, dict):
continue
name = addon.get("name", "")
disabled = addon.get("disabled", False)
@@ -317,7 +371,13 @@ class CS(AlibabaCloudService):
parameters = cluster_detail.get("parameters", {})
endpoint_public = parameters.get("endpoint_public", "")
if endpoint_public:
if isinstance(endpoint_public, str):
normalized_public = endpoint_public.strip().lower()
if normalized_public in {"true", "1", "yes"}:
return True
if normalized_public in {"false", "0", "no", ""}:
return False
elif endpoint_public:
return True
# If we can't find explicit indicator, check if master_url is present
@@ -29,6 +29,8 @@ class OSS(AlibabaCloudService):
# Treat as regional for client generation consistency with other services
super().__init__(__class__.__name__, provider, global_service=False)
self._buckets_lock = Lock()
self._bucket_inventory_lock = Lock()
self._bucket_inventory_loaded = False
# Fetch OSS resources
self.buckets = {}
@@ -40,6 +42,11 @@ class OSS(AlibabaCloudService):
def _list_buckets(self, regional_client=None):
region = "unknown"
try:
with self._bucket_inventory_lock:
if self._bucket_inventory_loaded:
return
self._bucket_inventory_loaded = True
regional_client = regional_client or self.client
region = getattr(regional_client, "region", self.region)
endpoint = f"oss-{region}.aliyuncs.com"
@@ -75,11 +82,20 @@ class OSS(AlibabaCloudService):
headers["Authorization"] = f"OSS {credentials.access_key_id}:{signature}"
url = f"https://{endpoint}/"
response = requests.get(url, headers=headers, timeout=10)
response = self._call_with_retries(
requests.get, url, headers=headers, timeout=10
)
if response.status_code != 200:
logger.error(
f"OSS - HTTP listing {endpoint_label} returned {response.status_code}: {response.text}"
)
if response.status_code == 403 and "UserDisable" in (
response.text or ""
):
logger.info(
f"OSS - HTTP listing {endpoint_label} skipped because OSS is disabled for this account."
)
else:
logger.error(
f"OSS - HTTP listing {endpoint_label} returned {response.status_code}: {response.text}"
)
return
try:
@@ -22,6 +22,18 @@ class RDS(AlibabaCloudService):
self.instances = []
self.__threading_call__(self._describe_instances)
@staticmethod
def _set_region_id(request, regional_client) -> None:
"""Populate RegionId on RDS requests when the SDK model exposes it."""
region = getattr(regional_client, "region", "")
if not region:
return
if hasattr(request, "region_id"):
request.region_id = region
elif hasattr(request, "RegionId"):
request.RegionId = region
def _describe_instances(self, regional_client):
"""List all RDS instances and fetch their details in a specific region."""
region = getattr(regional_client, "region", "unknown")
@@ -30,7 +42,10 @@ class RDS(AlibabaCloudService):
try:
# DescribeDBInstances returns instance list
request = rds_models.DescribeDBInstancesRequest()
response = regional_client.describe_dbinstances(request)
self._set_region_id(request, regional_client)
response = self._call_with_retries(
regional_client.describe_dbinstances, request
)
if response and response.body and response.body.items:
for instance_data in response.body.items.dbinstance:
@@ -123,7 +138,10 @@ class RDS(AlibabaCloudService):
try:
request = rds_models.DescribeDBInstanceAttributeRequest()
request.dbinstance_id = instance_id
response = regional_client.describe_dbinstance_attribute(request)
self._set_region_id(request, regional_client)
response = self._call_with_retries(
regional_client.describe_dbinstance_attribute, request
)
if (
response
@@ -146,7 +164,10 @@ class RDS(AlibabaCloudService):
try:
request = rds_models.DescribeDBInstanceSSLRequest()
request.dbinstance_id = instance_id
response = regional_client.describe_dbinstance_ssl(request)
self._set_region_id(request, regional_client)
response = self._call_with_retries(
regional_client.describe_dbinstance_ssl, request
)
if response and response.body:
# response.body is a DescribeDBInstanceSSLResponseBody model object, use getattr
@@ -169,7 +190,10 @@ class RDS(AlibabaCloudService):
try:
request = rds_models.DescribeDBInstanceTDERequest()
request.dbinstance_id = instance_id
response = regional_client.describe_dbinstance_tde(request)
self._set_region_id(request, regional_client)
response = self._call_with_retries(
regional_client.describe_dbinstance_tde, request
)
if response and response.body:
return {
@@ -187,7 +211,10 @@ class RDS(AlibabaCloudService):
try:
request = rds_models.DescribeDBInstanceIPArrayListRequest()
request.dbinstance_id = instance_id
response = regional_client.describe_dbinstance_iparray_list(request)
self._set_region_id(request, regional_client)
response = self._call_with_retries(
regional_client.describe_dbinstance_iparray_list, request
)
ips = []
if response and response.body and response.body.items:
@@ -205,12 +232,12 @@ class RDS(AlibabaCloudService):
def _describe_sql_collector_policy(self, regional_client, instance_id: str) -> dict:
"""Check SQL audit status."""
try:
request = rds_models.DescribeSQLLogRecordsRequest()
request.dbinstance_id = instance_id
policy_request = rds_models.DescribeSQLCollectorPolicyRequest()
policy_request.dbinstance_id = instance_id
response = regional_client.describe_sqlcollector_policy(policy_request)
self._set_region_id(policy_request, regional_client)
response = self._call_with_retries(
regional_client.describe_sqlcollector_policy, policy_request
)
if response and response.body:
status = getattr(response.body, "sqlcollector_status", "")
@@ -232,7 +259,10 @@ class RDS(AlibabaCloudService):
try:
request = rds_models.DescribeParametersRequest()
request.dbinstance_id = instance_id
response = regional_client.describe_parameters(request)
self._set_region_id(request, regional_client)
response = self._call_with_retries(
regional_client.describe_parameters, request
)
params = {}
if response and response.body and response.body.running_parameters:
@@ -50,7 +50,9 @@ class SecurityCenter(AlibabaCloudService):
request.page_size = 100
while True:
response = self.client.describe_vul_list(request)
response = self._call_with_retries(
self.client.describe_vul_list, request
)
if response and response.body and response.body.vul_records:
vul_records = response.body.vul_records
@@ -112,7 +114,9 @@ class SecurityCenter(AlibabaCloudService):
request.page_size = 100
while True:
response = self.client.describe_cloud_center_instances(request)
response = self._call_with_retries(
self.client.describe_cloud_center_instances, request
)
if response and response.body and response.body.instances:
instances = response.body.instances
@@ -174,7 +178,9 @@ class SecurityCenter(AlibabaCloudService):
request.page_size = 100
while True:
response = self.client.list_uninstall_aegis_machines(request)
response = self._call_with_retries(
self.client.list_uninstall_aegis_machines, request
)
if response and response.body and response.body.machine_list:
machines = response.body.machine_list
@@ -221,7 +227,9 @@ class SecurityCenter(AlibabaCloudService):
try:
# Get notification configurations
request = sas_models.DescribeNoticeConfigRequest()
response = self.client.describe_notice_config(request)
response = self._call_with_retries(
self.client.describe_notice_config, request
)
if response and response.body and response.body.notice_config_list:
notice_configs = response.body.notice_config_list
@@ -253,7 +261,7 @@ class SecurityCenter(AlibabaCloudService):
try:
# Get vulnerability scan configuration
request = sas_models.DescribeVulConfigRequest()
response = self.client.describe_vul_config(request)
response = self._call_with_retries(self.client.describe_vul_config, request)
if response and response.body and response.body.target_configs:
target_configs = response.body.target_configs
@@ -281,7 +289,9 @@ class SecurityCenter(AlibabaCloudService):
try:
# Get vulnerability scan level priorities
request = sas_models.DescribeConcernNecessityRequest()
response = self.client.describe_concern_necessity(request)
response = self._call_with_retries(
self.client.describe_concern_necessity, request
)
if response and response.body:
concern_necessity = getattr(response.body, "concern_necessity", [])
@@ -314,7 +324,9 @@ class SecurityCenter(AlibabaCloudService):
try:
# Get Security Center edition
request = sas_models.DescribeVersionConfigRequest()
response = self.client.describe_version_config(request)
response = self._call_with_retries(
self.client.describe_version_config, request
)
if response and response.body:
# Get Version field from response
@@ -39,7 +39,9 @@ class Sls(AlibabaCloudService):
try:
# List Projects
list_project_request = sls_models.ListProjectRequest(offset=0, size=500)
projects_resp = client.list_project(list_project_request)
projects_resp = self._call_with_retries(
client.list_project, list_project_request
)
if projects_resp.body and projects_resp.body.projects:
for project in projects_resp.body.projects:
@@ -50,8 +52,10 @@ class Sls(AlibabaCloudService):
offset=0, size=500
)
try:
alerts_resp = client.list_alerts(
project_name, list_alert_request
alerts_resp = self._call_with_retries(
client.list_alerts,
project_name,
list_alert_request,
)
if alerts_resp.body and alerts_resp.body.results:
for alert in alerts_resp.body.results:
@@ -90,7 +94,9 @@ class Sls(AlibabaCloudService):
try:
# List Projects
list_project_request = sls_models.ListProjectRequest(offset=0, size=500)
projects_resp = client.list_project(list_project_request)
projects_resp = self._call_with_retries(
client.list_project, list_project_request
)
if projects_resp.body and projects_resp.body.projects:
for project in projects_resp.body.projects:
@@ -101,14 +107,18 @@ class Sls(AlibabaCloudService):
offset=0, size=500
)
try:
logstores_resp = client.list_log_stores(
project_name, list_logstores_request
logstores_resp = self._call_with_retries(
client.list_log_stores,
project_name,
list_logstores_request,
)
if logstores_resp.body and logstores_resp.body.logstores:
for logstore_name in logstores_resp.body.logstores:
try:
logstore_resp = client.get_log_store(
project_name, logstore_name
logstore_resp = self._call_with_retries(
client.get_log_store,
project_name,
logstore_name,
)
if logstore_resp.body:
self.log_stores.append(
@@ -33,7 +33,7 @@ class VPC(AlibabaCloudService):
try:
request = vpc_models.DescribeVpcsRequest()
response = regional_client.describe_vpcs(request)
response = self._call_with_retries(regional_client.describe_vpcs, request)
if response and response.body and response.body.vpcs:
for vpc_data in response.body.vpcs.vpc:
@@ -70,7 +70,9 @@ class VPC(AlibabaCloudService):
request = vpc_models.DescribeFlowLogsRequest()
request.resource_id = vpc_id
request.resource_type = "VPC"
response = regional_client.describe_flow_logs(request)
response = self._call_with_retries(
regional_client.describe_flow_logs, request
)
if response and response.body and response.body.flow_logs:
flow_logs = response.body.flow_logs.flow_log
@@ -0,0 +1,64 @@
from unittest.mock import patch
import pytest
from prowler.providers.alibabacloud.models import (
AlibabaCloudCredentials,
AlibabaCloudSession,
)
def _build_session():
session = AlibabaCloudSession(cred_client=object())
session._credentials = AlibabaCloudCredentials(
access_key_id="test-access-key-id",
access_key_secret="test-access-key-secret",
)
return session
def test_securitycenter_client_uses_outside_china_endpoint():
session = _build_session()
with patch(
"prowler.providers.alibabacloud.models.SasClient",
side_effect=lambda config: config,
):
config = session.client("sas", "ap-northeast-1")
assert config.endpoint == "tds.ap-southeast-1.aliyuncs.com"
def test_securitycenter_client_uses_china_endpoint():
session = _build_session()
with patch(
"prowler.providers.alibabacloud.models.SasClient",
side_effect=lambda config: config,
):
config = session.client("securitycenter", "cn-hangzhou")
assert config.endpoint == "tds.cn-shanghai.aliyuncs.com"
@pytest.mark.parametrize(
("region", "expected_endpoint"),
[
("cn-beijing", "rds.aliyuncs.com"),
("cn-shanghai", "rds.aliyuncs.com"),
("cn-heyuan", "rds.aliyuncs.com"),
("cn-hongkong", "rds.aliyuncs.com"),
("ap-northeast-1", "rds.ap-northeast-1.aliyuncs.com"),
("cn-guangzhou", "rds.cn-guangzhou.aliyuncs.com"),
],
)
def test_rds_client_uses_documented_public_endpoints(region, expected_endpoint):
session = _build_session()
with patch(
"prowler.providers.alibabacloud.models.RdsClient",
side_effect=lambda config: config,
):
config = session.client("rds", region)
assert config.endpoint == expected_endpoint
@@ -1,4 +1,5 @@
from unittest.mock import patch
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from tests.providers.alibabacloud.alibabacloud_fixtures import (
set_mocked_alibabacloud_provider,
@@ -24,3 +25,53 @@ class TestActionTrailService:
assert actiontrail_client.service == "actiontrail"
assert actiontrail_client.provider == alibabacloud_provider
def test_describe_trails_retries_transient_connection_reset(self):
from prowler.providers.alibabacloud.services.actiontrail import (
actiontrail_service as actiontrail_service_module,
)
class ConnectionResetError(Exception):
pass
service = actiontrail_service_module.ActionTrail.__new__(
actiontrail_service_module.ActionTrail
)
service.audited_account = "1234567890"
service.audit_resources = []
service.trails = {}
regional_client = MagicMock()
regional_client.region = "cn-shenzhen"
regional_client.describe_trails.side_effect = [
ConnectionResetError(
"('Connection aborted.', ConnectionResetError(54, 'Connection reset by peer'))"
),
SimpleNamespace(
body=SimpleNamespace(
trail_list=[
SimpleNamespace(
name="trail-1",
trail_region="All",
home_region="cn-hangzhou",
status="Enable",
oss_bucket_name="bucket-1",
oss_bucket_location="cn-hangzhou",
sls_project_arn="",
event_rw="All",
create_time="2026-01-01T00:00:00Z",
)
]
)
),
]
with patch.object(
actiontrail_service_module,
"actiontrail_models",
SimpleNamespace(DescribeTrailsRequest=MagicMock(return_value=object())),
):
service._describe_trails(regional_client)
assert regional_client.describe_trails.call_count == 2
assert len(service.trails) == 1
@@ -1,4 +1,7 @@
from unittest.mock import patch
from datetime import datetime, timezone
from threading import Lock
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from tests.providers.alibabacloud.alibabacloud_fixtures import (
set_mocked_alibabacloud_provider,
@@ -22,3 +25,247 @@ class TestCSService:
assert cs_client.service == "cs"
assert cs_client.provider == alibabacloud_provider
def test_get_cluster_detail_uses_requestless_sdk_and_parses_response(self):
from prowler.providers.alibabacloud.services.cs import (
cs_service as cs_service_module,
)
service = cs_service_module.CS.__new__(cs_service_module.CS)
regional_client = MagicMock(region="cn-hangzhou")
regional_client.describe_cluster_detail.return_value = SimpleNamespace(
body=SimpleNamespace(
meta_data='{"AuditProjectName":"audit-project","Addons":[{"name":"terway","disabled":false}],"RBACEnabled":"true"}',
parameters={"authorization_mode": "RBAC", "endpoint_public": "false"},
master_url="",
)
)
with patch.object(cs_service_module, "cs_models", SimpleNamespace()):
result = service._get_cluster_detail(regional_client, "cluster-id")
regional_client.describe_cluster_detail.assert_called_once_with("cluster-id")
assert result == {
"meta_data": {
"AuditProjectName": "audit-project",
"Addons": [{"name": "terway", "disabled": False}],
"RBACEnabled": "true",
},
"parameters": {
"authorization_mode": "RBAC",
"endpoint_public": "false",
},
"master_url": "",
}
def test_get_last_cluster_check_uses_list_cluster_checks(self):
from prowler.providers.alibabacloud.services.cs import (
cs_service as cs_service_module,
)
service = cs_service_module.CS.__new__(cs_service_module.CS)
regional_client = MagicMock(region="cn-hangzhou")
request = object()
most_recent = datetime(2026, 4, 22, tzinfo=timezone.utc)
older = datetime(2026, 4, 20, tzinfo=timezone.utc)
regional_client.list_cluster_checks.return_value = SimpleNamespace(
body=SimpleNamespace(
checks=[
SimpleNamespace(status="Succeeded", finished_at=older),
SimpleNamespace(status="Failed", finished_at=None),
SimpleNamespace(status="Succeeded", finished_at=most_recent),
]
)
)
mock_models = SimpleNamespace(
ListClusterChecksRequest=MagicMock(return_value=request)
)
with patch.object(cs_service_module, "cs_models", mock_models):
result = service._get_last_cluster_check(regional_client, "cluster-id")
mock_models.ListClusterChecksRequest.assert_called_once_with()
regional_client.list_cluster_checks.assert_called_once_with(
"cluster-id", request
)
assert result == most_recent
def test_describe_clusters_populates_clusters_with_sdk_6_1_0_shape(self):
from prowler.providers.alibabacloud.services.cs import (
cs_service as cs_service_module,
)
service = cs_service_module.CS.__new__(cs_service_module.CS)
service.audit_resources = []
service.clusters = []
service.regional_clients = {}
service._cluster_ids_lock = Lock()
service._seen_cluster_ids = set()
describe_clusters_request = object()
describe_node_pools_request = object()
list_checks_request = object()
regional_client = MagicMock(region="cn-hangzhou")
regional_client.describe_clusters_v1.return_value = SimpleNamespace(
body=SimpleNamespace(
clusters=[
SimpleNamespace(
cluster_id="c-1",
name="test-cluster",
cluster_type="ManagedKubernetes",
state="running",
)
]
)
)
regional_client.describe_cluster_detail.return_value = SimpleNamespace(
body=SimpleNamespace(
meta_data='{"AuditProjectName":"audit-project","Addons":[{"name":"terway","disabled":false}],"RBACEnabled":"true"}',
parameters={"authorization_mode": "RBAC"},
master_url="",
)
)
regional_client.describe_cluster_node_pools.return_value = SimpleNamespace(
body=SimpleNamespace(
nodepools=[
SimpleNamespace(kubernetes_config=SimpleNamespace(cms_enabled=True))
]
)
)
regional_client.list_cluster_checks.return_value = SimpleNamespace(
body=SimpleNamespace(
checks=[
SimpleNamespace(
status="Succeeded",
finished_at=datetime(2026, 4, 22, tzinfo=timezone.utc),
)
]
)
)
mock_models = SimpleNamespace(
DescribeClustersV1Request=MagicMock(return_value=describe_clusters_request),
DescribeClusterNodePoolsRequest=MagicMock(
return_value=describe_node_pools_request
),
ListClusterChecksRequest=MagicMock(return_value=list_checks_request),
)
with patch.object(cs_service_module, "cs_models", mock_models):
service._describe_clusters(regional_client)
regional_client.describe_clusters_v1.assert_called_once_with(
describe_clusters_request
)
regional_client.describe_cluster_detail.assert_called_once_with("c-1")
regional_client.describe_cluster_node_pools.assert_called_once_with(
"c-1", describe_node_pools_request
)
regional_client.list_cluster_checks.assert_called_once_with(
"c-1", list_checks_request
)
assert len(service.clusters) == 1
cluster = service.clusters[0]
assert cluster.id == "c-1"
assert cluster.log_service_enabled is True
assert cluster.cloudmonitor_enabled is True
assert cluster.rbac_enabled is True
assert cluster.network_policy_enabled is True
assert cluster.eni_multiple_ip_enabled is True
assert cluster.private_cluster_enabled is True
def test_describe_clusters_uses_cluster_region_and_deduplicates(self):
from prowler.providers.alibabacloud.services.cs import (
cs_service as cs_service_module,
)
service = cs_service_module.CS.__new__(cs_service_module.CS)
service.audit_resources = []
service.clusters = []
service._cluster_ids_lock = Lock()
service._seen_cluster_ids = set()
list_request = object()
node_pools_request = object()
checks_request = object()
canonical_client = MagicMock(region="ap-southeast-1")
duplicate_client = MagicMock(region="cn-shenzhen")
service.regional_clients = {
"ap-southeast-1": canonical_client,
"cn-shenzhen": duplicate_client,
}
for client in (canonical_client, duplicate_client):
client.describe_clusters_v1.return_value = SimpleNamespace(
body=SimpleNamespace(
clusters=[
SimpleNamespace(
cluster_id="c-1",
name="test-cluster",
cluster_type="ManagedKubernetes",
state="running",
region_id="ap-southeast-1",
)
]
)
)
canonical_client.describe_cluster_detail.return_value = SimpleNamespace(
body=SimpleNamespace(
meta_data='{"AuditProjectName":"audit-project","Addons":[]}',
parameters={"authorization_mode": "RBAC"},
master_url="",
)
)
canonical_client.describe_cluster_node_pools.return_value = SimpleNamespace(
body=SimpleNamespace(nodepools=[])
)
canonical_client.list_cluster_checks.return_value = SimpleNamespace(
body=SimpleNamespace(checks=[])
)
mock_models = SimpleNamespace(
DescribeClustersV1Request=MagicMock(return_value=list_request),
DescribeClusterNodePoolsRequest=MagicMock(return_value=node_pools_request),
ListClusterChecksRequest=MagicMock(return_value=checks_request),
)
with patch.object(cs_service_module, "cs_models", mock_models):
service._describe_clusters(duplicate_client)
service._describe_clusters(canonical_client)
assert len(service.clusters) == 1
assert service.clusters[0].region == "ap-southeast-1"
canonical_client.describe_cluster_detail.assert_called_once_with("c-1")
duplicate_client.describe_cluster_detail.assert_not_called()
def test_check_cluster_addons_handles_null_addons_without_logging_error(self):
from prowler.providers.alibabacloud.services.cs import (
cs_service as cs_service_module,
)
service = cs_service_module.CS.__new__(cs_service_module.CS)
with patch.object(cs_service_module.logger, "error") as logger_error:
result = service._check_cluster_addons(
{"meta_data": {"Addons": None}},
"cn-hangzhou",
)
assert result == {
"dashboard_enabled": False,
"network_policy_enabled": False,
"eni_multiple_ip_enabled": False,
}
logger_error.assert_not_called()
def test_check_public_access_handles_false_string(self):
from prowler.providers.alibabacloud.services.cs.cs_service import CS
service = CS.__new__(CS)
result = service._check_public_access(
{"parameters": {"endpoint_public": "false"}, "master_url": ""},
"cn-hangzhou",
)
assert result is False
@@ -26,6 +26,8 @@ def _build_oss_service(audit_resources=None):
service.client = client
service.session = MagicMock()
service.session.get_credentials.return_value = _DummyCreds()
service._bucket_inventory_lock = Lock()
service._bucket_inventory_loaded = False
# Avoid real thread pool in tests
service.__threading_call__ = lambda call, iterator=None: [
call(item) for item in ((iterator or service.regional_clients.values()))
@@ -97,3 +99,37 @@ def test_list_buckets_rejects_xxe_payload():
oss._list_buckets()
assert oss.buckets == {}
def test_list_buckets_userdisable_is_not_logged_as_error():
oss = _build_oss_service()
with (
patch("requests.get") as get_mock,
patch(
"prowler.providers.alibabacloud.services.oss.oss_service.logger.error"
) as logger_error,
):
get_mock.return_value = MagicMock(
status_code=403,
text="<Error><Code>UserDisable</Code><Message>UserDisable</Message></Error>",
)
oss._list_buckets()
assert oss.buckets == {}
logger_error.assert_not_called()
def test_list_buckets_inventory_is_loaded_once_across_regions():
oss = _build_oss_service()
other_client = MagicMock()
other_client.region = "us-east-1"
oss.regional_clients["us-east-1"] = other_client
with patch("requests.get") as get_mock:
get_mock.return_value = MagicMock(
status_code=200, text=_fake_oss_list_response()
)
oss.__threading_call__(oss._list_buckets)
assert get_mock.call_count == 1
@@ -1,4 +1,5 @@
from unittest.mock import patch
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from tests.providers.alibabacloud.alibabacloud_fixtures import (
set_mocked_alibabacloud_provider,
@@ -22,3 +23,47 @@ class TestRDSService:
assert rds_client.service == "rds"
assert rds_client.provider == alibabacloud_provider
def test_describe_instances_sets_region_id_on_list_request(self):
from prowler.providers.alibabacloud.services.rds import (
rds_service as rds_service_module,
)
service = rds_service_module.RDS.__new__(rds_service_module.RDS)
service.audit_resources = []
service.instances = []
request = SimpleNamespace(region_id=None)
regional_client = MagicMock(region="cn-qingdao")
regional_client.describe_dbinstances.return_value = SimpleNamespace(
body=SimpleNamespace(items=None)
)
mock_models = SimpleNamespace(
DescribeDBInstancesRequest=MagicMock(return_value=request)
)
with patch.object(rds_service_module, "rds_models", mock_models):
service._describe_instances(regional_client)
assert request.region_id == "cn-qingdao"
def test_describe_db_instance_attribute_sets_region_id(self):
from prowler.providers.alibabacloud.services.rds import (
rds_service as rds_service_module,
)
service = rds_service_module.RDS.__new__(rds_service_module.RDS)
request = SimpleNamespace(dbinstance_id=None, region_id=None)
regional_client = MagicMock(region="cn-qingdao")
regional_client.describe_dbinstance_attribute.return_value = SimpleNamespace(
body=SimpleNamespace(items=None)
)
mock_models = SimpleNamespace(
DescribeDBInstanceAttributeRequest=MagicMock(return_value=request)
)
with patch.object(rds_service_module, "rds_models", mock_models):
service._describe_db_instance_attribute(regional_client, "rm-test")
assert request.dbinstance_id == "rm-test"
assert request.region_id == "cn-qingdao"
@@ -1,4 +1,5 @@
from unittest.mock import patch
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from tests.providers.alibabacloud.alibabacloud_fixtures import (
set_mocked_alibabacloud_provider,
@@ -24,3 +25,38 @@ class TestSecurityCenterService:
assert securitycenter_client.service == "securitycenter"
assert securitycenter_client.provider == alibabacloud_provider
def test_get_edition_retries_transient_service_unavailable(self):
from prowler.providers.alibabacloud.services.securitycenter import (
securitycenter_service as securitycenter_service_module,
)
class ServiceUnavailableError(Exception):
def __init__(self):
super().__init__("ServiceUnavailable")
self.code = "ServiceUnavailable"
self.statusCode = 503
service = securitycenter_service_module.SecurityCenter.__new__(
securitycenter_service_module.SecurityCenter
)
service.client = MagicMock()
service.client.describe_version_config.side_effect = [
ServiceUnavailableError(),
SimpleNamespace(body=SimpleNamespace(version=5)),
]
service.edition = None
service.version = None
with patch.object(
securitycenter_service_module,
"sas_models",
SimpleNamespace(
DescribeVersionConfigRequest=MagicMock(return_value=object())
),
):
service._get_edition()
assert service.edition == "Advanced"
assert service.version == 5
assert service.client.describe_version_config.call_count == 2
@@ -1,4 +1,5 @@
from unittest.mock import patch
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
from tests.providers.alibabacloud.alibabacloud_fixtures import (
set_mocked_alibabacloud_provider,
@@ -22,3 +23,47 @@ class TestSLSService:
assert sls_client.service == "sls"
assert sls_client.provider == alibabacloud_provider
def test_get_alerts_retries_transient_list_project_timeout(self):
from prowler.providers.alibabacloud.services.sls import (
sls_service as sls_service_module,
)
class ReadTimeoutError(Exception):
pass
service = sls_service_module.Sls.__new__(sls_service_module.Sls)
service.audited_account = "1234567890"
service.regional_clients = {
"cn-hangzhou": MagicMock(),
}
service.alerts = []
client = service.regional_clients["cn-hangzhou"]
client.list_project.side_effect = [
ReadTimeoutError(
"HTTPSConnectionPool(host='cn-hangzhou.log.aliyuncs.com', port=443): Read timed out. (read timeout=10.0)"
),
SimpleNamespace(
body=SimpleNamespace(
projects=[
SimpleNamespace(project_name="project-1"),
]
)
),
]
client.list_alerts.return_value = SimpleNamespace(
body=SimpleNamespace(results=[])
)
with patch.object(
sls_service_module,
"sls_models",
SimpleNamespace(
ListProjectRequest=MagicMock(return_value=object()),
ListAlertsRequest=MagicMock(return_value=object()),
),
):
service._get_alerts()
assert client.list_project.call_count == 2