Compare commits

..

12 Commits

Author SHA1 Message Date
pedrooot
7732281ecf feat(alibabacloud): change formating and improve checks 2025-10-27 10:42:37 +01:00
pedrooot
55e846b3d7 feat(provider): add alibabacloud provider 2025-10-24 13:10:49 +02:00
Andoni Alonso
e6d1b5639b chore(github): include roadmap in features request template (#9000) 2025-10-23 15:06:34 +02:00
Alan Buscaglia
b1856e42f0 chore: update changelog for release v5.13.0 (#8996) 2025-10-23 13:54:30 +02:00
Víctor Fernández Poyatos
ba8dbb0d28 fix(s3): file uploading for threatscore (#8993) 2025-10-23 16:07:06 +05:45
Daniel Barranquero
b436cc1cac chore(sdk): update changelog to released (#8994) 2025-10-23 15:55:50 +05:45
Josema Camacho
51baa88644 chore(api): Update changelog for API's version 1.14.0 to Prowler 5.13.0 (#8992) 2025-10-23 12:03:07 +02:00
Rubén De la Torre Vico
5098b12e97 chore(mcp): update changelog to released (#8991) 2025-10-23 11:47:58 +02:00
Daniel Barranquero
3d1e7015a6 fix(entra): value errors due tu enums (#8919) 2025-10-23 11:36:51 +02:00
Alejandro Bailo
0b7f02f7e4 feat: Check Findings component (#8976)
Co-authored-by: Alan Buscaglia <gentlemanprogramming@gmail.com>
2025-10-23 10:38:25 +02:00
Daniel Barranquero
c0396e97bf feat(docs): add new provider e2e guide (#8430)
Co-authored-by: HugoPBrito <hugopbrit@gmail.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-10-23 10:09:15 +02:00
Andoni Alonso
8d4fa46038 chore: script to generate AWS accounts list from AWS Org for bulk provisioning (#8903) 2025-10-22 16:23:14 -04:00
511 changed files with 20648 additions and 2824 deletions

View File

@@ -8,7 +8,7 @@ body:
attributes:
label: Feature search
options:
- label: I have searched the existing issues and this feature has not been requested yet
- label: I have searched the existing issues and this feature has not been requested yet or is already in our [Public Roadmap](https://roadmap.prowler.com/roadmap)
required: true
- type: dropdown
id: component

View File

@@ -2,7 +2,7 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.14.0] (Prowler UNRELEASED)
## [1.14.0] (Prowler 5.13.0)
### Added
- Default JWT keys are generated and stored if they are missing from configuration [(#8655)](https://github.com/prowler-cloud/prowler/pull/8655)

View File

@@ -24,7 +24,7 @@ class Migration(migrations.Migration):
(
"name",
models.CharField(
max_length=255,
max_length=100,
validators=[django.core.validators.MinLengthValidator(3)],
),
),

View File

@@ -2689,6 +2689,55 @@ class TestScanViewSet:
== "There is a problem with credentials."
)
@patch("api.v1.views.ScanViewSet._get_task_status")
@patch("api.v1.views.get_s3_client")
@patch("api.v1.views.env.str")
def test_threatscore_s3_wildcard(
self,
mock_env_str,
mock_get_s3_client,
mock_get_task_status,
authenticated_client,
scans_fixture,
):
"""
When the threatscore endpoint is called with an S3 output_location,
the view should list objects in S3 using wildcard pattern matching,
retrieve the matching PDF file, and return it with HTTP 200 and proper headers.
"""
scan = scans_fixture[0]
scan.state = StateChoices.COMPLETED
bucket = "test-bucket"
zip_key = "tenant-id/scan-id/prowler-output-foo.zip"
scan.output_location = f"s3://{bucket}/{zip_key}"
scan.save()
pdf_key = os.path.join(
os.path.dirname(zip_key),
"threatscore",
"prowler-output-123_threatscore_report.pdf",
)
mock_s3_client = Mock()
mock_s3_client.list_objects_v2.return_value = {"Contents": [{"Key": pdf_key}]}
mock_s3_client.get_object.return_value = {"Body": io.BytesIO(b"pdf-bytes")}
mock_env_str.return_value = bucket
mock_get_s3_client.return_value = mock_s3_client
mock_get_task_status.return_value = None
url = reverse("scan-threatscore", kwargs={"pk": scan.id})
response = authenticated_client.get(url)
assert response.status_code == status.HTTP_200_OK
assert response["Content-Type"] == "application/pdf"
assert response["Content-Disposition"].endswith(
'"prowler-output-123_threatscore_report.pdf"'
)
assert response.content == b"pdf-bytes"
mock_s3_client.list_objects_v2.assert_called_once()
mock_s3_client.get_object.assert_called_once_with(Bucket=bucket, Key=pdf_key)
def test_report_s3_success(self, authenticated_client, scans_fixture, monkeypatch):
"""
When output_location is an S3 URL and the S3 client returns the file successfully,

View File

@@ -1,3 +1,4 @@
import fnmatch
import glob
import logging
import os
@@ -1775,7 +1776,18 @@ class ScanViewSet(BaseRLSViewSet):
status=status.HTTP_502_BAD_GATEWAY,
)
contents = resp.get("Contents", [])
keys = [obj["Key"] for obj in contents if obj["Key"].endswith(suffix)]
keys = []
for obj in contents:
key = obj["Key"]
key_basename = os.path.basename(key)
if any(ch in suffix for ch in ("*", "?", "[")):
if fnmatch.fnmatch(key_basename, suffix):
keys.append(key)
elif key_basename == suffix:
keys.append(key)
elif key.endswith(suffix):
# Backward compatibility if suffix already includes directories
keys.append(key)
if not keys:
return Response(
{

View File

@@ -20,10 +20,10 @@ from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
AWSWellArchitected,
)
from prowler.lib.outputs.compliance.c5.c5_aws import AWSC5
from prowler.lib.outputs.compliance.ccc.ccc_aws import CCC_AWS
from prowler.lib.outputs.compliance.ccc.ccc_azure import CCC_Azure
from prowler.lib.outputs.compliance.ccc.ccc_gcp import CCC_GCP
from prowler.lib.outputs.compliance.c5.c5_aws import AWSC5
from prowler.lib.outputs.compliance.cis.cis_aws import AWSCIS
from prowler.lib.outputs.compliance.cis.cis_azure import AzureCIS
from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
@@ -183,18 +183,21 @@ def get_s3_client():
return s3_client
def _upload_to_s3(tenant_id: str, zip_path: str, scan_id: str) -> str | None:
def _upload_to_s3(
tenant_id: str, scan_id: str, local_path: str, relative_key: str
) -> str | None:
"""
Upload the specified ZIP file to an S3 bucket.
If the S3 bucket environment variables are not configured,
the function returns None without performing an upload.
Upload a local artifact to an S3 bucket under the tenant/scan prefix.
Args:
tenant_id (str): The tenant identifier, used as part of the S3 key prefix.
zip_path (str): The local file system path to the ZIP file to be uploaded.
scan_id (str): The scan identifier, used as part of the S3 key prefix.
tenant_id (str): The tenant identifier used as the first segment of the S3 key.
scan_id (str): The scan identifier used as the second segment of the S3 key.
local_path (str): Filesystem path to the artifact to upload.
relative_key (str): Object key relative to `<tenant_id>/<scan_id>/`.
Returns:
str: The S3 URI of the uploaded file (e.g., "s3://<bucket>/<key>") if successful.
None: If the required environment variables for the S3 bucket are not set.
str | None: S3 URI of the uploaded artifact, or None if the upload is skipped.
Raises:
botocore.exceptions.ClientError: If the upload attempt to S3 fails for any reason.
"""
@@ -202,27 +205,19 @@ def _upload_to_s3(tenant_id: str, zip_path: str, scan_id: str) -> str | None:
if not bucket:
return
if not relative_key:
return
if not os.path.isfile(local_path):
return
try:
s3 = get_s3_client()
# Upload the ZIP file (outputs) to the S3 bucket
zip_key = f"{tenant_id}/{scan_id}/{os.path.basename(zip_path)}"
s3.upload_file(
Filename=zip_path,
Bucket=bucket,
Key=zip_key,
)
s3_key = f"{tenant_id}/{scan_id}/{relative_key}"
s3.upload_file(Filename=local_path, Bucket=bucket, Key=s3_key)
# Upload the compliance directory to the S3 bucket
compliance_dir = os.path.join(os.path.dirname(zip_path), "compliance")
for filename in os.listdir(compliance_dir):
local_path = os.path.join(compliance_dir, filename)
if not os.path.isfile(local_path):
continue
file_key = f"{tenant_id}/{scan_id}/compliance/{filename}"
s3.upload_file(Filename=local_path, Bucket=bucket, Key=file_key)
return f"s3://{base.DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET}/{zip_key}"
return f"s3://{base.DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET}/{s3_key}"
except (ClientError, NoCredentialsError, ParamValidationError, ValueError) as e:
logger.error(f"S3 upload failed: {str(e)}")

View File

@@ -1317,7 +1317,12 @@ def generate_threatscore_report_job(
min_risk_level=4,
)
upload_uri = _upload_to_s3(tenant_id, pdf_path, scan_id)
upload_uri = _upload_to_s3(
tenant_id,
scan_id,
pdf_path,
f"threatscore/{Path(pdf_path).name}",
)
if upload_uri:
try:
rmtree(Path(pdf_path).parent, ignore_errors=True)

View File

@@ -1,3 +1,4 @@
import os
from datetime import datetime, timedelta, timezone
from pathlib import Path
from shutil import rmtree
@@ -413,7 +414,24 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str):
writer._data.clear()
compressed = _compress_output_files(out_dir)
upload_uri = _upload_to_s3(tenant_id, compressed, scan_id)
upload_uri = _upload_to_s3(
tenant_id,
scan_id,
compressed,
os.path.basename(compressed),
)
compliance_dir_path = Path(comp_dir).parent
if compliance_dir_path.exists():
for artifact_path in sorted(compliance_dir_path.iterdir()):
if artifact_path.is_file():
_upload_to_s3(
tenant_id,
scan_id,
str(artifact_path),
f"compliance/{artifact_path.name}",
)
# S3 integrations (need output_directory)
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):

View File

@@ -72,17 +72,26 @@ class TestOutputs:
client_mock = MagicMock()
mock_get_client.return_value = client_mock
result = _upload_to_s3("tenant-id", str(zip_path), "scan-id")
result = _upload_to_s3(
"tenant-id",
"scan-id",
str(zip_path),
"outputs.zip",
)
expected_uri = "s3://test-bucket/tenant-id/scan-id/outputs.zip"
assert result == expected_uri
assert client_mock.upload_file.call_count == 2
client_mock.upload_file.assert_called_once_with(
Filename=str(zip_path),
Bucket="test-bucket",
Key="tenant-id/scan-id/outputs.zip",
)
@patch("tasks.jobs.export.get_s3_client")
@patch("tasks.jobs.export.base")
def test_upload_to_s3_missing_bucket(self, mock_base, mock_get_client):
mock_base.DJANGO_OUTPUT_S3_AWS_OUTPUT_BUCKET = ""
result = _upload_to_s3("tenant", "/tmp/fake.zip", "scan")
result = _upload_to_s3("tenant", "scan", "/tmp/fake.zip", "fake.zip")
assert result is None
@patch("tasks.jobs.export.get_s3_client")
@@ -101,11 +110,15 @@ class TestOutputs:
client_mock = MagicMock()
mock_get_client.return_value = client_mock
result = _upload_to_s3("tenant", str(zip_path), "scan")
result = _upload_to_s3(
"tenant",
"scan",
str(compliance_dir / "subdir"),
"compliance/subdir",
)
expected_uri = "s3://test-bucket/tenant/scan/results.zip"
assert result == expected_uri
client_mock.upload_file.assert_called_once()
assert result is None
client_mock.upload_file.assert_not_called()
@patch(
"tasks.jobs.export.get_s3_client",
@@ -126,7 +139,12 @@ class TestOutputs:
compliance_dir.mkdir()
(compliance_dir / "report.csv").write_text("csv")
_upload_to_s3("tenant", str(zip_path), "scan")
_upload_to_s3(
"tenant",
"scan",
str(zip_path),
"zipfile.zip",
)
mock_logger.assert_called()
@patch("tasks.jobs.export.rls_transaction")

View File

@@ -85,6 +85,12 @@ class TestGenerateThreatscoreReport:
only_failed=True,
min_risk_level=4,
)
mock_upload.assert_called_once_with(
self.tenant_id,
self.scan_id,
"/tmp/threatscore_path_threatscore_report.pdf",
"threatscore/threatscore_path_threatscore_report.pdf",
)
mock_rmtree.assert_called_once_with(
Path("/tmp/threatscore_path_threatscore_report.pdf").parent,
ignore_errors=True,

View File

@@ -0,0 +1,75 @@
"""
CIS Alibaba Cloud Compliance Dashboard
This module generates compliance reports for the CIS Alibaba Cloud Foundations Benchmark.
"""
import warnings
from dashboard.common_methods import get_section_containers_3_levels
warnings.filterwarnings("ignore")
def get_table(data):
"""
Generate compliance table for CIS Alibaba Cloud framework
This function processes compliance data and formats it for dashboard display,
with sections, subsections, and individual requirements.
Args:
data: DataFrame containing compliance data with columns:
- REQUIREMENTS_ID: Requirement identifier (e.g., "2.1", "4.1")
- REQUIREMENTS_DESCRIPTION: Description of the requirement
- REQUIREMENTS_ATTRIBUTES_SECTION: Main section (e.g., "2. Storage")
- REQUIREMENTS_ATTRIBUTES_SUBSECTION: Subsection (e.g., "2.1 ECS Disk Encryption")
- CHECKID: Associated Prowler check ID
- STATUS: Check status (PASS/FAIL)
- REGION: Alibaba Cloud region
- ACCOUNTID: Alibaba Cloud account ID
- RESOURCEID: Resource identifier
Returns:
Dashboard table with hierarchical compliance structure
"""
# Format requirement descriptions with ID prefix and truncate if too long
data["REQUIREMENTS_DESCRIPTION"] = (
data["REQUIREMENTS_ID"] + " - " + data["REQUIREMENTS_DESCRIPTION"]
)
data["REQUIREMENTS_DESCRIPTION"] = data["REQUIREMENTS_DESCRIPTION"].apply(
lambda x: x[:150] + "..." if len(str(x)) > 150 else x
)
# Truncate section names if too long
data["REQUIREMENTS_ATTRIBUTES_SECTION"] = data[
"REQUIREMENTS_ATTRIBUTES_SECTION"
].apply(lambda x: x[:80] + "..." if len(str(x)) > 80 else x)
# Truncate subsection names if too long
data["REQUIREMENTS_ATTRIBUTES_SUBSECTION"] = data[
"REQUIREMENTS_ATTRIBUTES_SUBSECTION"
].apply(lambda x: x[:150] + "..." if len(str(x)) > 150 else x)
# Select relevant columns for display
display_data = data[
[
"REQUIREMENTS_DESCRIPTION",
"REQUIREMENTS_ATTRIBUTES_SECTION",
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
]
# Generate hierarchical table with 3 levels (Section > Subsection > Requirement)
return get_section_containers_3_levels(
display_data,
"REQUIREMENTS_ATTRIBUTES_SECTION",
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
"REQUIREMENTS_DESCRIPTION",
)

File diff suppressed because it is too large Load Diff

View File

@@ -5,8 +5,7 @@ title: 'Prowler Services'
Here you can find how to create a new service, or to complement an existing one, for a [Prowler Provider](/developer-guide/provider).
<Note>
First ensure that the provider you want to add the service is already created. It can be checked [here](https://github.com/prowler-cloud/prowler/tree/master/prowler/providers). If the provider is not present, please refer to the [Provider](/developer-guide/provider) documentation to create it from scratch.
First ensure that the provider you want to add the service is already created. It can be checked [here](https://github.com/prowler-cloud/prowler/tree/master/prowler/providers). If the provider is not present, please refer to the [Provider](./provider.md) documentation to create it from scratch.
</Note>
## Introduction
@@ -201,11 +200,11 @@ class <Item>(BaseModel):
#### Service Attributes
*Optimized Data Storage with Python Dictionaries*
_Optimized Data Storage with Python Dictionaries_
Each group of resources within a service should be structured as a Python [dictionary](https://docs.python.org/3/tutorial/datastructures.html#dictionaries) to enable efficient lookups. The dictionary lookup operation has [O(1) complexity](https://en.wikipedia.org/wiki/Big_O_notation#Orders_of_common_functions), and lookups are constantly executed.
*Assigning Unique Identifiers*
_Assigning Unique Identifiers_
Each dictionary key must be a unique ID to identify the resource in a univocal way.
@@ -241,6 +240,301 @@ Provider-Specific Permissions Documentation:
- [M365](/user-guide/providers/microsoft365/authentication#required-permissions)
- [GitHub](/user-guide/providers/github/authentication)
## Service Architecture and Cross-Service Communication
### Core Principle: Service Isolation with Client Communication
Each service must contain **ONLY** the information unique to that specific service. When a check requires information from multiple services, it must use the **client objects** of other services rather than directly accessing their data structures.
This architecture ensures:
- **Loose coupling** between services
- **Clear separation of concerns**
- **Maintainable and testable code**
- **Consistent data access patterns**
### Cross-Service Communication Pattern
Instead of services directly accessing each other's internal data, checks should import and use client objects:
**❌ INCORRECT - Direct data access:**
```python
# DON'T DO THIS
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import cloudtrail_service
from prowler.providers.aws.services.s3.s3_service import s3_service
class cloudtrail_bucket_requires_mfa_delete(Check):
def execute(self):
# WRONG: Directly accessing service data
for trail in cloudtrail_service.trails.values():
for bucket in s3_service.buckets.values():
# Direct access violates separation of concerns
```
**✅ CORRECT - Client-based communication:**
```python
# DO THIS INSTEAD
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
from prowler.providers.aws.services.s3.s3_client import s3_client
class cloudtrail_bucket_requires_mfa_delete(Check):
def execute(self):
# CORRECT: Using client objects for cross-service communication
for trail in cloudtrail_client.trails.values():
trail_bucket = trail.s3_bucket
for bucket in s3_client.buckets.values():
if trail_bucket == bucket.name:
# Use bucket properties through s3_client
if bucket.mfa_delete:
# Implementation logic
```
### Real-World Example: CloudTrail + S3 Integration
This example demonstrates how CloudTrail checks validate S3 bucket configurations:
```python
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.cloudtrail.cloudtrail_client import cloudtrail_client
from prowler.providers.aws.services.s3.s3_client import s3_client
class cloudtrail_bucket_requires_mfa_delete(Check):
def execute(self):
findings = []
if cloudtrail_client.trails is not None:
for trail in cloudtrail_client.trails.values():
if trail.is_logging:
trail_bucket_is_in_account = False
trail_bucket = trail.s3_bucket
# Cross-service communication: CloudTrail check uses S3 client
for bucket in s3_client.buckets.values():
if trail_bucket == bucket.name:
trail_bucket_is_in_account = True
if bucket.mfa_delete:
report.status = "PASS"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) has MFA delete enabled."
# Handle cross-account scenarios
if not trail_bucket_is_in_account:
report.status = "MANUAL"
report.status_extended = f"Trail {trail.name} bucket ({trail_bucket}) is a cross-account bucket or out of Prowler's audit scope, please check it manually."
findings.append(report)
return findings
```
**Key Benefits:**
- **CloudTrail service** only contains CloudTrail-specific data (trails, configurations)
- **S3 service** only contains S3-specific data (buckets, policies, ACLs)
- **Check logic** orchestrates between services using their public client interfaces
- **Cross-account detection** is handled gracefully when resources span accounts
### Service Consolidation Guidelines
**When to combine services in the same file:**
Implement multiple services as **separate classes in the same file** when two services are **practically the same** or one is a **direct extension** of another.
**Example: S3 and S3Control**
S3Control is an extension of S3 that provides account-level controls and access points. Both are implemented in `s3_service.py`:
```python
# File: prowler/providers/aws/services/s3/s3_service.py
class S3(AWSService):
"""Standard S3 service for bucket operations"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.buckets = {}
self.regions_with_buckets = []
# S3-specific initialization
self._list_buckets(provider)
self._get_bucket_versioning()
# ... other S3-specific operations
class S3Control(AWSService):
"""S3Control service for account-level and access point operations"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.account_public_access_block = None
self.access_points = {}
# S3Control-specific initialization
self._get_public_access_block()
self._list_access_points()
# ... other S3Control-specific operations
```
**Separate client files:**
```python
# File: prowler/providers/aws/services/s3/s3_client.py
from prowler.providers.aws.services.s3.s3_service import S3
s3_client = S3(Provider.get_global_provider())
# File: prowler/providers/aws/services/s3/s3control_client.py
from prowler.providers.aws.services.s3.s3_service import S3Control
s3control_client = S3Control(Provider.get_global_provider())
```
**When NOT to consolidate services:**
Keep services separate when they:
- **Operate on different resource types** (EC2 vs RDS)
- **Have different authentication mechanisms** (different API endpoints)
- **Serve different operational domains** (IAM vs CloudTrail)
- **Have different regional behaviors** (global vs regional services)
### Cross-Service Dependencies Guidelines
**1. Always use client imports:**
```python
# Correct pattern
from prowler.providers.aws.services.service_a.service_a_client import service_a_client
from prowler.providers.aws.services.service_b.service_b_client import service_b_client
```
**2. Handle missing resources gracefully:**
```python
# Handle cross-service scenarios
resource_found_in_account = False
for external_resource in other_service_client.resources.values():
if target_resource_id == external_resource.id:
resource_found_in_account = True
# Process found resource
break
if not resource_found_in_account:
# Handle cross-account or missing resource scenarios
report.status = "MANUAL"
report.status_extended = "Resource is cross-account or out of audit scope"
```
**3. Document cross-service dependencies:**
```python
class check_with_dependencies(Check):
"""
Check Description
Dependencies:
- service_a_client: For primary resource information
- service_b_client: For related resource validation
- service_c_client: For policy analysis
"""
```
## Regional Service Implementation
When implementing services for regional providers (like AWS, Azure, GCP), special considerations are needed to handle resource discovery across multiple geographic locations. This section provides a complete guide using AWS as the reference example.
### Regional vs Non-Regional Services
**Regional Services:** Require iteration across multiple geographic locations where resources may exist (e.g., EC2 instances, VPC, RDS databases).
**Non-Regional/Global Services:** Operate at a global or tenant level without regional concepts (e.g., IAM users, Route53 hosted zones).
### AWS Regional Implementation Example
AWS is the perfect example of a regional provider. Here's how Prowler handles AWS's regional architecture:
```python
# File: prowler/providers/aws/services/ec2/ec2_service.py
class EC2(AWSService):
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
self.instances = {}
self.security_groups = {}
# Regional resource discovery across all AWS regions
self.__threading_call__(self._describe_instances)
self.__threading_call__(self._describe_security_groups)
def _describe_instances(self, regional_client):
"""Discover EC2 instances in a specific region"""
try:
describe_instances_paginator = regional_client.get_paginator("describe_instances")
for page in describe_instances_paginator.paginate():
for reservation in page["Reservations"]:
for instance in reservation["Instances"]:
# Each instance includes its region
self.instances[instance["InstanceId"]] = Instance(
id=instance["InstanceId"],
region=regional_client.region,
state=instance["State"]["Name"],
# ... other properties
)
except Exception as error:
logger.error(f"Failed to describe instances in {regional_client.region}: {error}")
```
#### Regional Check Execution
```python
# File: prowler/providers/aws/services/ec2/ec2_instance_public_ip/ec2_instance_public_ip.py
class ec2_instance_public_ip(Check):
def execute(self):
findings = []
# Automatically iterates across ALL AWS regions where instances exist
for instance in ec2_client.instances.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=instance)
report.region = instance.region # Critical: region attribution
report.resource_arn = f"arn:aws:ec2:{instance.region}:{instance.account_id}:instance/{instance.id}"
if instance.public_ip:
report.status = "FAIL"
report.status_extended = f"Instance {instance.id} in {instance.region} has public IP {instance.public_ip}"
else:
report.status = "PASS"
report.status_extended = f"Instance {instance.id} in {instance.region} does not have a public IP"
findings.append(report)
return findings
```
#### Key AWS Regional Features
**Region-Specific ARNs:**
```
arn:aws:ec2:us-east-1:123456789012:instance/i-1234567890abcdef0
arn:aws:s3:eu-west-1:123456789012:bucket/my-bucket
arn:aws:rds:ap-southeast-2:123456789012:db:my-database
```
**Parallel Processing:**
- Each region processed independently in separate threads
- Failed regions don't affect other regions
- User can filter specific regions: `-f us-east-1`
**Global vs Regional Services:**
- **Regional**: EC2, RDS, VPC (require region iteration)
- **Global**: IAM, Route53, CloudFront (single `us-east-1` call)
This architecture allows Prowler to efficiently scan AWS accounts with resources spread across multiple regions while maintaining performance and error isolation.
### Regional Service Best Practices
1. **Use Threading for Regional Discovery**: Leverage the `__threading_call__` method to parallelize resource discovery across regions
2. **Store Region Information**: Always include region metadata in resource objects for proper attribution
3. **Handle Regional Failures Gracefully**: Ensure that failures in one region don't affect others
4. **Optimize for Performance**: Use paginated calls and efficient data structures for large-scale resource discovery
5. **Support Region Filtering**: Allow users to limit scans to specific regions for focused audits
## Best Practices
- When available in the provider, use threading or parallelization utilities for all methods that can be parallelized by to maximize performance and reduce scan time.
@@ -252,3 +546,5 @@ Provider-Specific Permissions Documentation:
- Collect and store resource tags and additional attributes to support richer checks and reporting.
- Leverage shared utility helpers for session setup, identifier parsing, and other cross-cutting concerns to avoid code duplication. This kind of code is typically stored in a `lib` folder in the service folder.
- Keep code modular, maintainable, and well-documented for ease of extension and troubleshooting.
- **Each service should contain only information unique to that specific service** - use client objects for cross-service communication.
- **Handle cross-account and missing resources gracefully** when checks span multiple services.

View File

@@ -114,7 +114,8 @@
"group": "Tutorials",
"pages": [
"user-guide/tutorials/prowler-app-sso-entra",
"user-guide/tutorials/bulk-provider-provisioning"
"user-guide/tutorials/bulk-provider-provisioning",
"user-guide/tutorials/aws-organizations-bulk-provisioning"
]
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 71 KiB

View File

@@ -0,0 +1,491 @@
---
title: 'AWS Organizations Bulk Provisioning in Prowler'
---
Prowler offers an automated tool to discover and provision all AWS accounts within an AWS Organization. This streamlines onboarding for organizations managing multiple AWS accounts by automatically generating the configuration needed for bulk provisioning.
The tool, `aws_org_generator.py`, complements the [Bulk Provider Provisioning](./bulk-provider-provisioning) tool and is available in the Prowler repository at: [util/prowler-bulk-provisioning](https://github.com/prowler-cloud/prowler/tree/master/util/prowler-bulk-provisioning)
<Note>
Native support for bulk provisioning AWS Organizations and similar multi-account structures directly in the Prowler UI/API is on the official roadmap.
Track progress and vote for this feature at: [Bulk Provisioning in the UI/API for AWS Organizations](https://roadmap.prowler.com/p/builk-provisioning-in-the-uiapi-for-aws-organizations-and-alike)
</Note>
{/* TODO: Add screenshot of the tool in action */}
## Overview
The AWS Organizations Bulk Provisioning tool simplifies multi-account onboarding by:
* Automatically discovering all active accounts in an AWS Organization
* Generating YAML configuration files for bulk provisioning
* Supporting account filtering and custom role configurations
* Eliminating manual entry of account IDs and role ARNs
## Prerequisites
### Requirements
* Python 3.7 or higher
* AWS credentials with Organizations read access
* ProwlerRole (or custom role) deployed across all target accounts
* Prowler API key (from Prowler Cloud or self-hosted Prowler App)
* For self-hosted Prowler App, remember to [point to your API base URL](./bulk-provider-provisioning#custom-api-endpoints)
* Learn how to create API keys: [Prowler App API Keys](../providers/prowler-app-api-keys)
### Deploying ProwlerRole Across AWS Organizations
Before using the AWS Organizations generator, deploy the ProwlerRole across all accounts in the organization using CloudFormation StackSets.
<Note>
**Follow the official documentation:**
[Deploying Prowler IAM Roles Across AWS Organizations](../providers/aws/organizations#deploying-prowler-iam-roles-across-aws-organizations)
**Key points:**
* Use CloudFormation StackSets from the management account
* Deploy to all organizational units (OUs) or specific OUs
* Use an external ID for enhanced security
* Ensure the role has necessary permissions for Prowler scans
</Note>
### Installation
Clone the repository and install required dependencies:
```bash
git clone https://github.com/prowler-cloud/prowler.git
cd prowler/util/prowler-bulk-provisioning
pip install -r requirements-aws-org.txt
```
### AWS Credentials Setup
Configure AWS credentials with Organizations read access:
* **Management account credentials**, or
* **Delegated administrator account** with `organizations:ListAccounts` permission
Required IAM permissions:
```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"organizations:ListAccounts",
"organizations:DescribeOrganization"
],
"Resource": "*"
}
]
}
```
### Prowler API Key Setup
Configure your Prowler API key:
```bash
export PROWLER_API_KEY="pk_example-api-key"
```
To create an API key:
1. Log in to Prowler Cloud or Prowler App
2. Click **Profile** → **Account**
3. Click **Create API Key**
4. Provide a descriptive name and optionally set an expiration date
5. Copy the generated API key (it will only be shown once)
For detailed instructions, see: [Prowler App API Keys](../providers/prowler-app-api-keys)
## Basic Usage
### Generate Configuration for All Accounts
To generate a YAML configuration file for all active accounts in the organization:
```bash
python aws_org_generator.py -o aws-accounts.yaml --external-id prowler-ext-id-2024
```
This command:
1. Lists all ACTIVE accounts in the organization
2. Generates YAML entries for each account
3. Saves the configuration to `aws-accounts.yaml`
**Output:**
```
Fetching accounts from AWS Organizations...
Found 47 active accounts in organization
Generated configuration for 47 accounts
Configuration written to: aws-accounts.yaml
Next steps:
1. Review the generated file: cat aws-accounts.yaml | head -n 20
2. Run bulk provisioning: python prowler_bulk_provisioning.py aws-accounts.yaml
```
### Review Generated Configuration
Review the generated YAML configuration:
```bash
head -n 20 aws-accounts.yaml
```
**Example output:**
```yaml
- provider: aws
uid: '111111111111'
alias: Production-Account
auth_method: role
credentials:
role_arn: arn:aws:iam::111111111111:role/ProwlerRole
external_id: prowler-ext-id-2024
- provider: aws
uid: '222222222222'
alias: Development-Account
auth_method: role
credentials:
role_arn: arn:aws:iam::222222222222:role/ProwlerRole
external_id: prowler-ext-id-2024
```
### Dry Run Mode
Test the configuration without writing a file:
```bash
python aws_org_generator.py \
--external-id prowler-ext-id-2024 \
--dry-run
```
## Advanced Configuration
### Using a Specific AWS Profile
Specify an AWS profile when multiple profiles are configured:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--profile org-management-admin \
--external-id prowler-ext-id-2024
```
### Excluding Specific Accounts
Exclude the management account or other accounts from provisioning:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--external-id prowler-ext-id-2024 \
--exclude 123456789012,210987654321
```
Common exclusion scenarios:
* Management account (requires different permissions)
* Break-glass accounts (emergency access)
* Suspended or archived accounts
### Including Only Specific Accounts
Generate configuration for specific accounts only:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--external-id prowler-ext-id-2024 \
--include 111111111111,222222222222,333333333333
```
### Custom Role Name
Specify a custom role name if not using the default `ProwlerRole`:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--role-name ProwlerExecutionRole \
--external-id prowler-ext-id-2024
```
### Custom Alias Format
Customize account aliases using template variables:
```bash
# Use account name and ID
python aws_org_generator.py \
-o aws-accounts.yaml \
--alias-format "{name}-{id}" \
--external-id prowler-ext-id-2024
# Use email prefix
python aws_org_generator.py \
-o aws-accounts.yaml \
--alias-format "{email}" \
--external-id prowler-ext-id-2024
```
Available template variables:
* `{name}` - Account name
* `{id}` - Account ID
* `{email}` - Account email
### Additional Role Assumption Options
Configure optional role assumption parameters:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--role-name ProwlerRole \
--external-id prowler-ext-id-2024 \
--session-name prowler-scan-session \
--duration-seconds 3600
```
## Complete Workflow Example
<Steps>
<Step title="Deploy ProwlerRole Using StackSets">
1. Log in to the AWS management account
2. Open CloudFormation → StackSets
3. Create a new StackSet using the [Prowler role template](https://github.com/prowler-cloud/prowler/blob/master/permissions/templates/cloudformation/prowler-scan-role.yml)
4. Deploy to all organizational units
5. Use a unique external ID (e.g., `prowler-org-2024-abc123`)
{/* TODO: Add screenshot of CloudFormation StackSets deployment */}
</Step>
<Step title="Generate YAML Configuration">
Configure AWS credentials and generate the YAML file:
```bash
# Using management account credentials
export AWS_PROFILE=org-management
# Generate configuration
python aws_org_generator.py \
-o aws-org-accounts.yaml \
--external-id prowler-org-2024-abc123 \
--exclude 123456789012
```
**Output:**
```
Fetching accounts from AWS Organizations...
Using AWS profile: org-management
Found 47 active accounts in organization
Generated configuration for 46 accounts
Configuration written to: aws-org-accounts.yaml
Next steps:
1. Review the generated file: cat aws-org-accounts.yaml | head -n 20
2. Run bulk provisioning: python prowler_bulk_provisioning.py aws-org-accounts.yaml
```
</Step>
<Step title="Review Generated Configuration">
Verify the generated YAML configuration:
```bash
# View first 20 lines
head -n 20 aws-org-accounts.yaml
# Check for unexpected accounts
grep "uid:" aws-org-accounts.yaml
# Verify role ARNs
grep "role_arn:" aws-org-accounts.yaml | head -5
# Count accounts
grep "provider: aws" aws-org-accounts.yaml | wc -l
```
</Step>
<Step title="Run Bulk Provisioning">
Provision all accounts to Prowler Cloud or Prowler App:
```bash
# Set Prowler API key
export PROWLER_API_KEY="pk_example-api-key"
# Run bulk provisioning with connection testing
python prowler_bulk_provisioning.py aws-org-accounts.yaml
```
**With custom options:**
```bash
python prowler_bulk_provisioning.py aws-org-accounts.yaml \
--concurrency 10 \
--timeout 120
```
**Successful output:**
```
[1] ✅ Created provider (id=db9a8985-f9ec-4dd8-b5a0-e05ab3880bed)
[1] ✅ Created secret (id=466f76c6-5878-4602-a4bc-13f9522c1fd2)
[1] ✅ Connection test: Connected
[2] ✅ Created provider (id=7a99f789-0cf5-4329-8279-2d443a962676)
[2] ✅ Created secret (id=c5702180-f7c4-40fd-be0e-f6433479b126)
[2] ✅ Connection test: Connected
Done. Success: 47 Failures: 0
```
{/* TODO: Add screenshot of successful bulk provisioning output */}
</Step>
</Steps>
## Command Reference
### Full Command-Line Options
```bash
python aws_org_generator.py \
-o OUTPUT_FILE \
--role-name ROLE_NAME \
--external-id EXTERNAL_ID \
--session-name SESSION_NAME \
--duration-seconds SECONDS \
--alias-format FORMAT \
--exclude ACCOUNT_IDS \
--include ACCOUNT_IDS \
--profile AWS_PROFILE \
--region AWS_REGION \
--dry-run
```
## Troubleshooting
### Error: "No AWS credentials found"
**Solution:** Configure AWS credentials using one of these methods:
```bash
# Method 1: AWS CLI configure
aws configure
# Method 2: Environment variables
export AWS_ACCESS_KEY_ID=your-key-id
export AWS_SECRET_ACCESS_KEY=your-secret-key
# Method 3: Use AWS profile
export AWS_PROFILE=org-management
```
### Error: "Access denied to AWS Organizations API"
**Cause:** Current credentials don't have permission to list organization accounts.
**Solution:**
* Ensure management account credentials are used
* Verify IAM permissions include `organizations:ListAccounts`
* Check IAM policies for Organizations access
### Error: "AWS Organizations is not enabled"
**Cause:** The account is not part of an organization.
**Solution:** This tool requires an AWS Organization. Create one in the AWS Organizations console or use standard bulk provisioning for standalone accounts.
### No Accounts Generated After Filters
**Cause:** All accounts were filtered out by `--exclude` or `--include` options.
**Solution:** Review filter options and verify account IDs are correct:
```bash
# List all accounts in organization
aws organizations list-accounts --query "Accounts[?Status=='ACTIVE'].[Id,Name]" --output table
```
### Connection Test Failures During Bulk Provisioning
**Cause:** ProwlerRole may not be deployed correctly or credentials are invalid.
**Solution:**
* Verify StackSet deployment status in CloudFormation
* Check role trust policy includes correct external ID
* Test role assumption manually:
```bash
aws sts assume-role \
--role-arn arn:aws:iam::123456789012:role/ProwlerRole \
--role-session-name test \
--external-id prowler-ext-id-2024
```
## Security Best Practices
### Use External ID
Always use an external ID when assuming cross-account roles:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--external-id $(uuidgen | tr '[:upper:]' '[:lower:]')
```
The external ID must match the one configured in the ProwlerRole trust policy across all accounts.
### Exclude Sensitive Accounts
Exclude accounts that shouldn't be scanned or require special handling:
```bash
python aws_org_generator.py \
-o aws-accounts.yaml \
--external-id prowler-ext-id \
--exclude 123456789012,111111111111 # management, break-glass accounts
```
### Review Generated Configuration
Always review the generated YAML before provisioning:
```bash
# Check for unexpected accounts
grep "uid:" aws-org-accounts.yaml
# Verify role ARNs
grep "role_arn:" aws-org-accounts.yaml | head -5
# Count accounts
grep "provider: aws" aws-org-accounts.yaml | wc -l
```
## Next Steps
<Columns cols={2}>
<Card title="Bulk Provider Provisioning" icon="terminal" href="/user-guide/tutorials/bulk-provider-provisioning">
Learn how to bulk provision providers in Prowler.
</Card>
<Card title="Prowler App" icon="pen-to-square" href="/user-guide/tutorials/prowler-app">
Detailed instructions on how to use Prowler.
</Card>
</Columns>

View File

@@ -17,14 +17,18 @@ The Bulk Provider Provisioning tool automates the creation of cloud providers in
* Testing connections to verify successful authentication
* Processing multiple providers concurrently for efficiency
<Tip>
**Using AWS Organizations?** For organizations with many AWS accounts, use the automated [AWS Organizations Bulk Provisioning](./aws-organizations-bulk-provisioning) tool to automatically discover and generate configuration for all accounts in your organization.
</Tip>
## Prerequisites
### Requirements
* Python 3.7 or higher
* Prowler API token (from Prowler Cloud or self-hosted Prowler App)
* Prowler API key (from Prowler Cloud or self-hosted Prowler App)
* For self-hosted Prowler App, remember to [point to your API base URL](#custom-api-endpoints)
* Learn how to create API keys: [Prowler App API Keys](../providers/prowler-app-api-keys)
* Authentication credentials for target cloud providers
### Installation
@@ -39,28 +43,21 @@ pip install -r requirements.txt
### Authentication Setup
Configure your Prowler API token:
Configure your Prowler API key:
```bash
export PROWLER_API_TOKEN="your-prowler-api-token"
export PROWLER_API_KEY="pk_example-api-key"
```
To obtain an API token programmatically:
To create an API key:
```bash
export PROWLER_API_TOKEN=$(curl --location 'https://api.prowler.com/api/v1/tokens' \
--header 'Content-Type: application/vnd.api+json' \
--header 'Accept: application/vnd.api+json' \
--data-raw '{
"data": {
"type": "tokens",
"attributes": {
"email": "your@email.com",
"password": "your-password"
}
}
}' | jq -r .data.attributes.access)
```
1. Log in to Prowler Cloud or Prowler App
2. Click **Profile** → **Account**
3. Click **Create API Key**
4. Provide a descriptive name and optionally set an expiration date
5. Copy the generated API key (it will only be shown once)
For detailed instructions, see: [Prowler App API Keys](../providers/prowler-app-api-keys)
## Configuration File Structure
@@ -340,11 +337,11 @@ Done. Success: 2 Failures: 0
## Troubleshooting
### Invalid API Token
### Invalid API Key
```
Error: 401 Unauthorized
Solution: Verify your PROWLER_API_TOKEN or --token parameter
Solution: Verify your PROWLER_API_KEY environment variable or --api-key parameter
```
### Network Timeouts

View File

@@ -2,7 +2,7 @@
All notable changes to the **Prowler MCP Server** are documented in this file.
## [0.1.0] (Prowler UNRELEASED)
## [0.1.0] (Prowler 5.13.0)
### Added
- Initial release of Prowler MCP Server [(#8695)](https://github.com/prowler-cloud/prowler/pull/8695)

View File

@@ -2,7 +2,7 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [v5.13.0] (Prowler UNRELEASED)
## [v5.13.0] (Prowler v5.13.0)
### Added
- Support for AdditionalURLs in outputs [(#8651)](https://github.com/prowler-cloud/prowler/pull/8651)
@@ -17,6 +17,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Oracle Cloud provider with CIS 3.0 benchmark [(#8893)](https://github.com/prowler-cloud/prowler/pull/8893)
- Support for Atlassian Document Format (ADF) in Jira integration [(#8878)](https://github.com/prowler-cloud/prowler/pull/8878)
- Add Common Cloud Controls for AWS, Azure and GCP [(#8000)](https://github.com/prowler-cloud/prowler/pull/8000)
- Improve Provider documentation guide [(#8430)](https://github.com/prowler-cloud/prowler/pull/8430)
- `cloudstorage_bucket_lifecycle_management_enabled` check for GCP provider [(#8936)](https://github.com/prowler-cloud/prowler/pull/8936)
### Changed
@@ -51,6 +52,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Prowler ThreatScore scoring calculation CLI [(#8582)](https://github.com/prowler-cloud/prowler/pull/8582)
- Add missing attributes for Mitre Attack AWS, Azure and GCP [(#8907)](https://github.com/prowler-cloud/prowler/pull/8907)
- Fix KeyError in CloudSQL and Monitoring services in GCP provider [(#8909)](https://github.com/prowler-cloud/prowler/pull/8909)
- Fix Value Errors in Entra service for M365 provider [(#8919)](https://github.com/prowler-cloud/prowler/pull/8919)
- Fix ResourceName in GCP provider [(#8928)](https://github.com/prowler-cloud/prowler/pull/8928)
- Fix KeyError in `elb_ssl_listeners_use_acm_certificate` check and handle None cluster version in `eks_cluster_uses_a_supported_version` check [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)
- Fix file extension parsing for compliance reports [(#8791)](https://github.com/prowler-cloud/prowler/pull/8791)

View File

@@ -49,10 +49,10 @@ from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.aws_well_architected.aws_well_architected import (
AWSWellArchitected,
)
from prowler.lib.outputs.compliance.c5.c5_aws import AWSC5
from prowler.lib.outputs.compliance.ccc.ccc_aws import CCC_AWS
from prowler.lib.outputs.compliance.ccc.ccc_azure import CCC_Azure
from prowler.lib.outputs.compliance.ccc.ccc_gcp import CCC_GCP
from prowler.lib.outputs.compliance.c5.c5_aws import AWSC5
from prowler.lib.outputs.compliance.cis.cis_aws import AWSCIS
from prowler.lib.outputs.compliance.cis.cis_azure import AzureCIS
from prowler.lib.outputs.compliance.cis.cis_gcp import GCPCIS
@@ -102,7 +102,6 @@ from prowler.providers.aws.lib.s3.s3 import S3
from prowler.providers.aws.lib.security_hub.security_hub import SecurityHub
from prowler.providers.aws.models import AWSOutputOptions
from prowler.providers.azure.models import AzureOutputOptions
from prowler.providers.cloudflare.models import CloudflareOutputOptions
from prowler.providers.common.provider import Provider
from prowler.providers.common.quick_inventory import run_provider_quick_inventory
from prowler.providers.gcp.models import GCPOutputOptions
@@ -114,6 +113,7 @@ from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
from prowler.providers.oraclecloud.models import OCIOutputOptions
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
def prowler():
@@ -337,8 +337,8 @@ def prowler():
output_options = OCIOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "cloudflare":
output_options = CloudflareOutputOptions(
elif provider == "alibabacloud":
output_options = AlibabaCloudOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)

View File

@@ -0,0 +1,56 @@
{
"Framework": "CIS",
"Provider": "alibabacloud",
"Version": "1.0",
"Description": "CIS Alibaba Cloud Foundations Benchmark",
"Requirements": [
{
"Id": "2.1",
"Description": "Ensure that encryption is enabled for ECS disks",
"Attributes": [
{
"Section": "2. Storage",
"SubSection": "2.1 ECS Disk Encryption",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "ECS disk encryption provides an additional layer of security by encrypting data stored on ECS instances. This helps protect sensitive data from unauthorized access if the physical storage media is compromised or if snapshots are inadvertently shared.",
"RationaleStatement": "Encrypting data at rest reduces the risk of unauthorized access to sensitive information stored on ECS disks. Without encryption, data stored on disks is vulnerable if an attacker gains physical access to the storage media or if disk snapshots are exposed.",
"ImpactStatement": "Enabling encryption may have a minimal performance impact on disk I/O operations. Additionally, encrypted disks cannot be converted to unencrypted disks, so this decision should be made during the initial disk creation or through migration.",
"RemediationProcedure": "To enable encryption on new ECS disks:\n1. Create a KMS key in the Alibaba Cloud KMS console\n2. When creating a new ECS disk, enable the 'Encrypted' option\n3. Select the KMS key to use for encryption\n\nFor existing unencrypted disks:\n1. Create a snapshot of the unencrypted disk\n2. Create a new encrypted disk from the snapshot using a KMS key\n3. Detach the old disk and attach the new encrypted disk\n4. Delete the old unencrypted disk and snapshot",
"AuditProcedure": "Using the Prowler CLI: prowler alibabacloud --checks ecs_disk_encryption_enabled\n\nUsing the Alibaba Cloud Console:\n1. Log in to the ECS console\n2. Navigate to Storage & Snapshots > Disks\n3. For each disk, check the 'Encrypted' column\n4. Verify that all disks show 'Yes' for encryption",
"AdditionalInformation": "References:\n- Alibaba Cloud ECS Disk Encryption: https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview\n- Alibaba Cloud KMS: https://www.alibabacloud.com/help/en/kms/",
"References": [
"https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview"
]
}
],
"Checks": [
"ecs_disk_encryption_enabled"
]
},
{
"Id": "4.1",
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 to port 22",
"Attributes": [
{
"Section": "4. Networking",
"SubSection": "4.1 Security Groups",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "Security groups should restrict SSH access (port 22) to specific trusted IP addresses rather than allowing access from the entire internet (0.0.0.0/0). Unrestricted SSH access increases the attack surface and risk of unauthorized access.",
"RationaleStatement": "Allowing unrestricted SSH access from the internet exposes ECS instances to brute force attacks, unauthorized access attempts, and potential compromise. Attackers routinely scan the internet for open SSH ports to exploit weak credentials or known vulnerabilities.",
"ImpactStatement": "Restricting SSH access to specific IP addresses or ranges may require maintaining an allowlist of trusted IPs. Organizations should implement bastion hosts, VPN access, or use Alibaba Cloud's Session Manager for secure remote access without exposing SSH ports to the internet.",
"RemediationProcedure": "To restrict SSH access:\n1. Identify the trusted IP addresses or ranges that require SSH access\n2. In the ECS console, navigate to Network & Security > Security Groups\n3. Select the security group\n4. Find the rule allowing 0.0.0.0/0 access to port 22\n5. Modify the rule to specify the trusted IP address or range\n6. Consider implementing a bastion host or VPN for centralized access control",
"AuditProcedure": "Using the Prowler CLI: prowler alibabacloud --checks ecs_instance_ssh_access_restricted\n\nUsing the Alibaba Cloud Console:\n1. Log in to the ECS console\n2. Navigate to Network & Security > Security Groups\n3. For each security group, review the inbound rules\n4. Verify that no rule allows access from 0.0.0.0/0 to port 22",
"AdditionalInformation": "Best practices:\n- Use bastion hosts or jump servers for SSH access\n- Implement VPN or Direct Connect for secure remote access\n- Consider using Alibaba Cloud Session Manager\n- Enable Multi-Factor Authentication (MFA) for SSH access\n- Regularly review and update security group rules\n\nReferences:\n- Alibaba Cloud Security Groups: https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview",
"References": [
"https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview"
]
}
],
"Checks": [
"ecs_instance_ssh_access_restricted"
]
}
]
}

View File

@@ -659,6 +659,33 @@ class Check_Report_Kubernetes(Check_Report):
self.namespace = "cluster-wide"
@dataclass
class Check_Report_AlibabaCloud(Check_Report):
"""Contains the Alibaba Cloud Check's finding information."""
resource_name: str
resource_id: str
resource_arn: str
region: str
account_uid: str
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialize the Alibaba Cloud Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource.
"""
super().__init__(metadata, resource)
self.resource_id = (
getattr(resource, "id", None) or getattr(resource, "name", None) or ""
)
self.resource_name = getattr(resource, "name", "")
self.resource_arn = getattr(resource, "arn", "")
self.region = getattr(resource, "region", "")
self.account_uid = ""
@dataclass
class CheckReportGithub(Check_Report):
"""Contains the GitHub Check's finding information."""
@@ -694,37 +721,6 @@ class CheckReportGithub(Check_Report):
)
@dataclass
class CheckReportCloudflare(Check_Report):
"""Contains the Cloudflare Check's finding information."""
resource_name: str
resource_id: str
zone_name: str
def __init__(
self,
metadata: Dict,
resource: Any,
resource_name: str = None,
resource_id: str = None,
zone_name: str = None,
) -> None:
"""Initialize the Cloudflare Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource. Defaults to None.
resource_name: The name of the resource related with the finding.
resource_id: The id of the resource related with the finding.
zone_name: The zone name of the resource related with the finding.
"""
super().__init__(metadata, resource)
self.resource_name = resource_name or getattr(resource, "name", "")
self.resource_id = resource_id or getattr(resource, "id", "")
self.zone_name = zone_name or getattr(resource, "zone_name", "")
@dataclass
class CheckReportM365(Check_Report):
"""Contains the M365 Check's finding information."""

View File

@@ -26,7 +26,7 @@ def recover_checks_from_provider(
# We need to exclude common shared libraries in services
if (
check_module_name.count(".") == 6
and "lib" not in check_module_name
and ".lib." not in check_module_name # Exclude .lib. directories, not "lib" substring
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path

View File

@@ -337,20 +337,25 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.region
elif provider.type == "cloudflare":
output_data["auth_method"] = provider.auth_method
elif provider.type == "alibabacloud":
output_data["auth_method"] = (
"STS Token"
if get_nested_attribute(
provider, "session.credentials.security_token"
)
else "AccessKey"
)
output_data["account_uid"] = get_nested_attribute(
provider, "identity.account_id"
)
output_data["account_name"] = get_nested_attribute(
provider, "identity.account_name"
)
output_data["account_email"] = get_nested_attribute(
provider, "identity.account_email"
)
# Use account_name if available, otherwise use account_id
account_name = get_nested_attribute(provider, "identity.account_name")
if not account_name:
account_name = get_nested_attribute(provider, "identity.account_id")
output_data["account_name"] = account_name
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.zone_name
output_data["resource_uid"] = check_output.resource_arn
output_data["region"] = check_output.region
# check_output Unique ID
# TODO: move this to a function

View File

@@ -1020,6 +1020,70 @@ class HTML(Output):
)
return ""
@staticmethod
def get_alibabacloud_assessment_summary(provider: Provider) -> str:
"""
get_alibabacloud_assessment_summary gets the HTML assessment summary for the Alibaba Cloud provider
Args:
provider (Provider): the Alibaba Cloud provider object
Returns:
str: the HTML assessment summary
"""
try:
# Get audited regions from provider, not identity
if hasattr(provider, "audited_regions") and provider.audited_regions:
if isinstance(provider.audited_regions, list):
audited_regions = ", ".join(provider.audited_regions)
else:
audited_regions = str(provider.audited_regions)
else:
audited_regions = "All Regions"
auth_method = (
"STS Token"
if provider.session.credentials.security_token
else "AccessKey"
)
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Alibaba Cloud Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Account ID:</b> {provider.identity.account_id}
</li>
<li class="list-group-item">
<b>Audited Regions:</b> {audited_regions}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Alibaba Cloud Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Authentication Method:</b> {auth_method}
</li>
<li class="list-group-item">
<b>Account ARN:</b> {provider.identity.account_arn}
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -67,9 +67,6 @@ def display_summary_table(
elif provider.type == "llm":
entity_type = "LLM"
audited_entities = provider.model
elif provider.type == "cloudflare":
entity_type = "Account"
audited_entities = provider.identity.account_name
elif provider.type == "oci":
entity_type = "Tenancy"
audited_entities = (
@@ -77,6 +74,9 @@ def display_summary_table(
if provider.identity.tenancy_name != "unknown"
else provider.identity.tenancy_id
)
elif provider.type == "alibabacloud":
entity_type = "Account"
audited_entities = provider.identity.account_id
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):

View File

@@ -0,0 +1,506 @@
"""
Alibaba Cloud Provider
This module implements the Alibaba Cloud provider for Prowler security auditing.
"""
import sys
from typing import Optional
from colorama import Fore, Style
from prowler.config.config import (
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.alibabacloud.config import (
ALIBABACLOUD_REGIONS,
ALIBABACLOUD_RAM_SESSION_NAME,
)
from prowler.providers.alibabacloud.exceptions.exceptions import (
AlibabaCloudAccountNotFoundError,
AlibabaCloudAssumeRoleError,
AlibabaCloudAuthenticationError,
AlibabaCloudNoCredentialsError,
AlibabaCloudSetUpSessionError,
)
from prowler.providers.alibabacloud.lib.mutelist.mutelist import AlibabaCloudMutelist
from prowler.providers.alibabacloud.models import (
AlibabaCloudAssumeRoleInfo,
AlibabaCloudCredentials,
AlibabaCloudIdentityInfo,
AlibabaCloudSession,
)
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
class AlibabacloudProvider(Provider):
"""
AlibabacloudProvider class implements the Alibaba Cloud provider for Prowler
This class handles:
- Authentication with Alibaba Cloud using AccessKey credentials or STS tokens
- RAM role assumption for cross-account auditing
- Region management and filtering
- Resource discovery and auditing
- Mutelist management for finding suppression
Attributes:
_type: Provider type identifier ("alibabacloud")
_identity: Alibaba Cloud account identity information
_session: Alibaba Cloud session with credentials
_regions: List of regions to audit
_mutelist: Mutelist for finding suppression
_audit_config: Audit configuration dictionary
_fixer_config: Fixer configuration dictionary
audit_metadata: Audit execution metadata
"""
_type: str = "alibabacloud"
_identity: AlibabaCloudIdentityInfo
_session: AlibabaCloudSession
_regions: list = []
_mutelist: AlibabaCloudMutelist
_audit_config: dict
_fixer_config: dict
audit_metadata: Audit_Metadata
def __init__(
self,
access_key_id: str = None,
access_key_secret: str = None,
security_token: str = None,
region_ids: list = None,
filter_regions: list = None,
ram_role_arn: str = None,
ram_session_name: str = None,
ram_session_duration: int = 3600,
ram_external_id: str = None,
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
resource_tags: list[str] = [],
resource_ids: list[str] = [],
):
"""
Initialize Alibaba Cloud provider
Args:
access_key_id: Alibaba Cloud AccessKey ID
access_key_secret: Alibaba Cloud AccessKey Secret
security_token: STS security token for temporary credentials
region_ids: List of region IDs to audit
filter_regions: List of region IDs to exclude from audit
ram_role_arn: RAM role ARN to assume
ram_session_name: Session name for RAM role assumption
ram_session_duration: Session duration in seconds (900-43200)
ram_external_id: External ID for RAM role assumption
config_path: Path to audit configuration file
config_content: Configuration content dictionary
fixer_config: Fixer configuration dictionary
mutelist_path: Path to mutelist file
mutelist_content: Mutelist content dictionary
resource_tags: List of resource tags to filter (key=value format)
resource_ids: List of specific resource IDs to audit
"""
logger.info("Initializing Alibaba Cloud provider...")
# Validate and load configuration
self._audit_config = load_and_validate_config_file(
self._type, config_path
)
if self._audit_config is None:
self._audit_config = {}
# Override with config_content if provided
if config_content:
self._audit_config.update(config_content)
self._fixer_config = fixer_config
# Setup session and authenticate
try:
self._session = self.setup_session(
access_key_id,
access_key_secret,
security_token,
ram_role_arn,
ram_session_name or ALIBABACLOUD_RAM_SESSION_NAME,
ram_session_duration,
ram_external_id,
)
except Exception as error:
logger.critical(f"Failed to set up Alibaba Cloud session: {error}")
raise AlibabaCloudSetUpSessionError(str(error))
# Get account identity
try:
self._identity = self._set_identity()
except Exception as error:
logger.critical(
f"Failed to retrieve Alibaba Cloud account identity: {error}"
)
raise AlibabaCloudAccountNotFoundError(str(error))
# Setup regions
self._regions = self._setup_regions(region_ids, filter_regions)
# Setup mutelist
self._mutelist = self._setup_mutelist(mutelist_path, mutelist_content)
# Set audit metadata
self.audit_metadata = Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
)
# Set as global provider
Provider.set_global_provider(self)
logger.info(
f"Alibaba Cloud provider initialized for account {self._identity.account_id}"
)
@property
def type(self) -> str:
"""Return provider type"""
return self._type
@property
def identity(self) -> AlibabaCloudIdentityInfo:
"""Return provider identity"""
return self._identity
@property
def session(self) -> AlibabaCloudSession:
"""Return provider session"""
return self._session
@property
def audit_config(self) -> dict:
"""Return audit configuration"""
return self._audit_config
@property
def fixer_config(self) -> dict:
"""Return fixer configuration"""
return self._fixer_config
@property
def mutelist(self) -> AlibabaCloudMutelist:
"""Return mutelist"""
return self._mutelist
def setup_session(
self,
access_key_id: str,
access_key_secret: str,
security_token: str = None,
ram_role_arn: str = None,
ram_session_name: str = ALIBABACLOUD_RAM_SESSION_NAME,
ram_session_duration: int = 3600,
ram_external_id: str = None,
) -> AlibabaCloudSession:
"""
Setup Alibaba Cloud session with authentication
Args:
access_key_id: AccessKey ID
access_key_secret: AccessKey Secret
security_token: STS security token (optional)
ram_role_arn: RAM role to assume (optional)
ram_session_name: Session name for role assumption
ram_session_duration: Session duration in seconds
ram_external_id: External ID for role assumption
Returns:
AlibabaCloudSession: Configured session object
Raises:
AlibabaCloudNoCredentialsError: If credentials are missing
AlibabaCloudAuthenticationError: If authentication fails
AlibabaCloudAssumeRoleError: If role assumption fails
"""
# Validate credentials
if not access_key_id or not access_key_secret:
logger.critical("Alibaba Cloud credentials are required")
raise AlibabaCloudNoCredentialsError()
try:
# Create credentials object
credentials = AlibabaCloudCredentials(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
security_token=security_token,
)
# If RAM role is specified, assume the role
if ram_role_arn:
logger.info(f"Assuming RAM role: {ram_role_arn}")
credentials = self._assume_role(
credentials,
ram_role_arn,
ram_session_name,
ram_session_duration,
ram_external_id,
)
# Create session
session = AlibabaCloudSession(
credentials=credentials,
region_id="cn-hangzhou", # Default region for global APIs
)
logger.info("Alibaba Cloud session established successfully")
return session
except Exception as error:
logger.critical(f"Authentication failed: {error}")
raise AlibabaCloudAuthenticationError(str(error))
def _assume_role(
self,
credentials: AlibabaCloudCredentials,
role_arn: str,
session_name: str,
session_duration: int,
external_id: str = None,
) -> AlibabaCloudCredentials:
"""
Assume a RAM role and return temporary credentials
Args:
credentials: Current credentials
role_arn: RAM role ARN to assume
session_name: Session name
session_duration: Session duration in seconds
external_id: External ID (optional)
Returns:
AlibabaCloudCredentials: Temporary credentials from STS
Raises:
AlibabaCloudAssumeRoleError: If role assumption fails
"""
try:
# Note: In a real implementation, this would use Alibaba Cloud STS SDK
# to call AssumeRole API and get temporary credentials
# For now, we'll return the original credentials as a placeholder
logger.warning(
"RAM role assumption not yet fully implemented - using provided credentials"
)
# TODO: Implement actual STS AssumeRole call
# from alibabacloud_sts20150401.client import Client as StsClient
# from alibabacloud_sts20150401.models import AssumeRoleRequest
#
# sts_client = StsClient(config)
# request = AssumeRoleRequest(
# role_arn=role_arn,
# role_session_name=session_name,
# duration_seconds=session_duration,
# external_id=external_id
# )
# response = sts_client.assume_role(request)
#
# return AlibabaCloudCredentials(
# access_key_id=response.body.credentials.access_key_id,
# access_key_secret=response.body.credentials.access_key_secret,
# security_token=response.body.credentials.security_token,
# expiration=response.body.credentials.expiration,
# )
return credentials
except Exception as error:
logger.critical(f"Failed to assume RAM role {role_arn}: {error}")
raise AlibabaCloudAssumeRoleError(role_arn, str(error))
def _set_identity(self) -> AlibabaCloudIdentityInfo:
"""
Retrieve Alibaba Cloud account identity information
Returns:
AlibabaCloudIdentityInfo: Account identity details
Raises:
AlibabaCloudAccountNotFoundError: If identity cannot be retrieved
"""
try:
logger.info("Retrieving Alibaba Cloud account identity...")
# Derive account ID from AccessKey ID
# Alibaba Cloud AccessKey IDs follow the format: LTAI{account_id_hash}...
# For a more accurate implementation, you would call STS GetCallerIdentity API
# For now, we'll use the AccessKey ID as a unique identifier
access_key_id = self._session.credentials.access_key_id
# Simple implementation: Use the first 12 characters of AccessKey ID as account identifier
# In production, you should call:
# from alibabacloud_sts20150401.client import Client as StsClient
# sts_client = StsClient(config)
# response = sts_client.get_caller_identity()
# account_id = response.body.account_id
# For now, create a unique identifier from the AccessKey
account_id = access_key_id[:20] if access_key_id else "unknown"
account_arn = f"acs:ram::{account_id}:root"
identity = AlibabaCloudIdentityInfo(
account_id=account_id,
account_arn=account_arn,
)
logger.info(f"Account ID: {identity.account_id}")
return identity
except Exception as error:
logger.critical(f"Failed to get account identity: {error}")
raise AlibabaCloudAccountNotFoundError(str(error))
def _setup_regions(
self, region_ids: list = None, filter_regions: list = None
) -> list:
"""
Setup regions to audit
Args:
region_ids: Specific regions to audit (None = all regions)
filter_regions: Regions to exclude from audit
Returns:
list: Final list of regions to audit
"""
# Start with specified regions or all regions
if region_ids:
regions = [r for r in region_ids if r in ALIBABACLOUD_REGIONS]
logger.info(f"Auditing specified regions: {', '.join(regions)}")
else:
regions = ALIBABACLOUD_REGIONS.copy()
logger.info("Auditing all Alibaba Cloud regions")
# Apply filters
if filter_regions:
regions = [r for r in regions if r not in filter_regions]
logger.info(f"Excluded regions: {', '.join(filter_regions)}")
logger.info(f"Total regions to audit: {len(regions)}")
return regions
def _setup_mutelist(
self, mutelist_path: str = None, mutelist_content: dict = None
) -> AlibabaCloudMutelist:
"""
Setup mutelist for finding suppression
Args:
mutelist_path: Path to mutelist file
mutelist_content: Mutelist content dictionary
Returns:
AlibabaCloudMutelist: Configured mutelist instance
"""
try:
# Use default path if not provided
if not mutelist_path and not mutelist_content:
mutelist_path = get_default_mute_file_path(self._type)
mutelist = AlibabaCloudMutelist(
mutelist_path=mutelist_path,
mutelist_content=mutelist_content,
provider=self._type,
identity=self._identity,
)
logger.info("Mutelist loaded successfully")
return mutelist
except Exception as error:
logger.warning(f"Error loading mutelist: {error}")
# Return empty mutelist on error
return AlibabaCloudMutelist()
def print_credentials(self) -> None:
"""
Display Alibaba Cloud credentials and configuration in CLI
This method prints the current provider configuration including:
- Account ID
- Regions being audited
- Authentication method
"""
# Account information
report_lines = []
report_lines.append(
f"{Fore.CYAN}Account ID:{Style.RESET_ALL} {self._identity.account_id}"
)
if self._identity.user_name:
report_lines.append(
f"{Fore.CYAN}User:{Style.RESET_ALL} {self._identity.user_name}"
)
# Regions
region_count = len(self._regions)
regions_display = (
", ".join(self._regions[:5])
+ (f" ... (+{region_count - 5} more)" if region_count > 5 else "")
)
report_lines.append(
f"{Fore.CYAN}Regions ({region_count}):{Style.RESET_ALL} {regions_display}"
)
# Authentication method
auth_method = (
"STS Token" if self._session.credentials.security_token else "AccessKey"
)
report_lines.append(
f"{Fore.CYAN}Authentication:{Style.RESET_ALL} {auth_method}"
)
# Print formatted box
print_boxes(report_lines, "Alibaba Cloud Provider Configuration")
def test_connection(self) -> Connection:
"""
Test connection to Alibaba Cloud
Returns:
Connection: Connection test result with status and error (if any)
"""
try:
logger.info("Testing connection to Alibaba Cloud...")
# TODO: Implement actual connection test with a simple API call
# For example, call DescribeRegions or GetCallerIdentity
# from alibabacloud_ecs20140526.client import Client as EcsClient
#
# ecs_client = EcsClient(config)
# ecs_client.describe_regions()
logger.info("Connection test successful")
return Connection(is_connected=True)
except Exception as error:
logger.error(f"Connection test failed: {error}")
return Connection(is_connected=False, error=str(error))
def validate_arguments(self) -> None:
"""
Validate provider arguments
This method is called after initialization to ensure all arguments
and configurations are valid.
"""
# Validation is handled in the CLI arguments parser
# This method can be used for additional runtime validations
pass

View File

@@ -0,0 +1,45 @@
"""
Alibaba Cloud Provider Configuration
This module contains configuration constants for the Alibaba Cloud provider.
"""
# Default Alibaba Cloud regions
ALIBABACLOUD_REGIONS = [
"cn-hangzhou", # China (Hangzhou)
"cn-shanghai", # China (Shanghai)
"cn-qingdao", # China (Qingdao)
"cn-beijing", # China (Beijing)
"cn-zhangjiakou", # China (Zhangjiakou)
"cn-huhehaote", # China (Hohhot)
"cn-wulanchabu", # China (Ulanqab)
"cn-shenzhen", # China (Shenzhen)
"cn-heyuan", # China (Heyuan)
"cn-guangzhou", # China (Guangzhou)
"cn-chengdu", # China (Chengdu)
"cn-hongkong", # China (Hong Kong)
"ap-northeast-1", # Japan (Tokyo)
"ap-southeast-1", # Singapore
"ap-southeast-2", # Australia (Sydney)
"ap-southeast-3", # Malaysia (Kuala Lumpur)
"ap-southeast-5", # Indonesia (Jakarta)
"ap-southeast-6", # Philippines (Manila)
"ap-southeast-7", # Thailand (Bangkok)
"ap-south-1", # India (Mumbai)
"us-west-1", # US (Silicon Valley)
"us-east-1", # US (Virginia)
"eu-west-1", # UK (London)
"eu-central-1", # Germany (Frankfurt)
"me-east-1", # UAE (Dubai)
]
# Alibaba Cloud SDK configuration
ALIBABACLOUD_SDK_USER_AGENT = "Prowler"
ALIBABACLOUD_SDK_MAX_RETRIES = 3
ALIBABACLOUD_SDK_TIMEOUT = 30 # seconds
# Alibaba Cloud ARN format
ALIBABACLOUD_ARN_FORMAT = "acs:{service}:{region}:{account_id}:{resource}"
# Default RAM role session name
ALIBABACLOUD_RAM_SESSION_NAME = "ProwlerSession"

View File

@@ -0,0 +1,90 @@
"""
Alibaba Cloud Provider Exceptions
This module contains exception classes for the Alibaba Cloud provider.
"""
class AlibabaCloudException(Exception):
"""Base exception for Alibaba Cloud provider errors"""
pass
class AlibabaCloudAuthenticationError(AlibabaCloudException):
"""
AlibabaCloudAuthenticationError is raised when authentication fails
This can occur due to:
- Invalid AccessKey credentials
- Expired STS tokens
- Insufficient permissions
"""
def __init__(self, message="Authentication to Alibaba Cloud failed"):
super().__init__(message)
class AlibabaCloudSetUpSessionError(AlibabaCloudException):
"""
AlibabaCloudSetUpSessionError is raised when session setup fails
"""
def __init__(self, message="Failed to set up Alibaba Cloud session"):
super().__init__(message)
class AlibabaCloudAPIError(AlibabaCloudException):
"""
AlibabaCloudAPIError is raised when an API call fails
"""
def __init__(self, service: str, operation: str, error: str):
message = f"Alibaba Cloud API Error - Service: {service}, Operation: {operation}, Error: {error}"
super().__init__(message)
class AlibabaCloudNoCredentialsError(AlibabaCloudException):
"""
AlibabaCloudNoCredentialsError is raised when no credentials are found
"""
def __init__(self, message="No Alibaba Cloud credentials found"):
super().__init__(message)
class AlibabaCloudInvalidRegionError(AlibabaCloudException):
"""
AlibabaCloudInvalidRegionError is raised when an invalid region is specified
"""
def __init__(self, region: str):
message = f"Invalid Alibaba Cloud region: {region}"
super().__init__(message)
class AlibabaCloudAssumeRoleError(AlibabaCloudException):
"""
AlibabaCloudAssumeRoleError is raised when assuming a RAM role fails
"""
def __init__(self, role_arn: str, error: str):
message = f"Failed to assume RAM role {role_arn}: {error}"
super().__init__(message)
class AlibabaCloudInvalidAccessKeyError(AlibabaCloudException):
"""
AlibabaCloudInvalidAccessKeyError is raised when AccessKey credentials are invalid
"""
def __init__(self, message="Invalid Alibaba Cloud AccessKey credentials"):
super().__init__(message)
class AlibabaCloudAccountNotFoundError(AlibabaCloudException):
"""
AlibabaCloudAccountNotFoundError is raised when account information cannot be retrieved
"""
def __init__(self, message="Unable to retrieve Alibaba Cloud account information"):
super().__init__(message)
class AlibabaCloudConfigValidationError(AlibabaCloudException):
"""
AlibabaCloudConfigValidationError is raised when configuration validation fails
"""
def __init__(self, message="Alibaba Cloud configuration validation failed"):
super().__init__(message)

View File

@@ -0,0 +1,203 @@
"""
Alibaba Cloud Provider CLI Arguments
This module defines the command-line interface arguments for the Alibaba Cloud provider.
"""
import os
from argparse import ArgumentTypeError, Namespace
from prowler.providers.alibabacloud.config import ALIBABACLOUD_REGIONS
def init_parser(self):
"""
Initialize Alibaba Cloud provider CLI argument parser
This function creates the argument parser for Alibaba Cloud provider and defines
all the command-line arguments that can be used when auditing Alibaba Cloud.
Args:
self: The ProwlerArgumentParser instance
"""
# Create Alibaba Cloud subparser
alibabacloud_parser = self.subparsers.add_parser(
"alibabacloud",
parents=[self.common_providers_parser],
help="Alibaba Cloud Provider",
)
# Authentication group
alibabacloud_auth = alibabacloud_parser.add_argument_group(
"Alibaba Cloud Authentication"
)
alibabacloud_auth.add_argument(
"--access-key-id",
nargs="?",
default=os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID"),
help="Alibaba Cloud AccessKey ID (also reads from ALIBABA_CLOUD_ACCESS_KEY_ID environment variable)",
)
alibabacloud_auth.add_argument(
"--access-key-secret",
nargs="?",
default=os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
help="Alibaba Cloud AccessKey Secret (also reads from ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable)",
)
alibabacloud_auth.add_argument(
"--security-token",
nargs="?",
default=os.environ.get("ALIBABA_CLOUD_SECURITY_TOKEN"),
help="Alibaba Cloud STS Security Token for temporary credentials (also reads from ALIBABA_CLOUD_SECURITY_TOKEN environment variable)",
)
# RAM Role assumption group
alibabacloud_role = alibabacloud_parser.add_argument_group(
"Alibaba Cloud RAM Role Assumption"
)
alibabacloud_role.add_argument(
"--ram-role-arn",
nargs="?",
default=None,
help="RAM Role ARN to assume for the audit (format: acs:ram::account-id:role/role-name)",
)
alibabacloud_role.add_argument(
"--ram-session-name",
nargs="?",
default="ProwlerAuditSession",
help="Session name for RAM role assumption (default: ProwlerAuditSession)",
)
alibabacloud_role.add_argument(
"--ram-session-duration",
nargs="?",
type=int,
default=3600,
help="Session duration in seconds for RAM role assumption (900-43200, default: 3600)",
)
alibabacloud_role.add_argument(
"--ram-external-id",
nargs="?",
default=None,
help="External ID for RAM role assumption (optional)",
)
# Regions group
alibabacloud_regions = alibabacloud_parser.add_argument_group(
"Alibaba Cloud Regions"
)
alibabacloud_regions.add_argument(
"--region-id",
"--region-ids",
nargs="+",
default=[],
dest="region_ids",
help=f"Alibaba Cloud Region IDs to audit (space-separated). Available regions: {', '.join(ALIBABACLOUD_REGIONS[:10])}... (default: all regions)",
)
alibabacloud_regions.add_argument(
"--filter-region",
"--filter-regions",
nargs="+",
default=[],
dest="filter_regions",
help="Alibaba Cloud Region IDs to exclude from the audit (space-separated)",
)
# Resource filtering group
alibabacloud_resources = alibabacloud_parser.add_argument_group(
"Alibaba Cloud Resource Filtering"
)
alibabacloud_resources.add_argument(
"--resource-tags",
nargs="+",
default=[],
help="Filter resources by tags (format: key=value)",
)
alibabacloud_resources.add_argument(
"--resource-ids",
nargs="+",
default=[],
help="Specific resource IDs to audit (space-separated)",
)
def validate_arguments(arguments: Namespace) -> tuple[bool, str]:
"""
Validate Alibaba Cloud provider arguments
This function validates the command-line arguments provided for the Alibaba Cloud provider.
It checks for required credentials, validates region specifications, and ensures
configuration coherence.
Args:
arguments: Parsed command-line arguments
Returns:
tuple: (is_valid: bool, error_message: str)
- is_valid: True if arguments are valid, False otherwise
- error_message: Error description if invalid, empty string if valid
"""
# Check for required credentials
if not arguments.access_key_id or not arguments.access_key_secret:
return (
False,
"Alibaba Cloud AccessKey credentials are required. "
"Provide --access-key-id and --access-key-secret, "
"or set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables.",
)
# Validate RAM role session duration
if arguments.ram_role_arn:
if not (900 <= arguments.ram_session_duration <= 43200):
return (
False,
f"RAM role session duration must be between 900 and 43200 seconds. "
f"Provided: {arguments.ram_session_duration}",
)
# Validate region IDs
if arguments.region_ids:
invalid_regions = [
region for region in arguments.region_ids
if region not in ALIBABACLOUD_REGIONS
]
if invalid_regions:
return (
False,
f"Invalid Alibaba Cloud region(s): {', '.join(invalid_regions)}. "
f"Valid regions are: {', '.join(ALIBABACLOUD_REGIONS)}",
)
# Validate filter regions
if arguments.filter_regions:
invalid_filter_regions = [
region for region in arguments.filter_regions
if region not in ALIBABACLOUD_REGIONS
]
if invalid_filter_regions:
return (
False,
f"Invalid Alibaba Cloud filter region(s): {', '.join(invalid_filter_regions)}. "
f"Valid regions are: {', '.join(ALIBABACLOUD_REGIONS)}",
)
# Validate resource tags format
if arguments.resource_tags:
for tag in arguments.resource_tags:
if "=" not in tag:
return (
False,
f"Invalid resource tag format: '{tag}'. Expected format: key=value",
)
# All validations passed
return (True, "")

View File

@@ -0,0 +1,13 @@
"""Utility classes for Alibaba Cloud checks"""
from dataclasses import dataclass
@dataclass
class GenericAlibabaCloudResource:
"""Generic resource for checks that don't have a specific resource type"""
id: str
name: str
arn: str
region: str

View File

@@ -0,0 +1,82 @@
"""
Alibaba Cloud Provider Mutelist
This module implements the mutelist functionality for suppressing findings
in Alibaba Cloud audits.
"""
from prowler.lib.mutelist.mutelist import Mutelist
class AlibabaCloudMutelist(Mutelist):
"""
AlibabaCloudMutelist handles finding suppression for Alibaba Cloud resources
This class extends the base Mutelist class to provide Alibaba Cloud-specific
mutelist functionality, allowing users to suppress specific findings based on
resource identifiers, regions, checks, or other criteria.
Example mutelist entry:
{
"Accounts": ["1234567890"],
"Checks": {
"ecs_*": {
"Regions": ["cn-hangzhou", "cn-shanghai"],
"Resources": ["i-abc123", "i-def456"]
}
}
}
"""
def __init__(
self,
mutelist_path: str = None,
mutelist_content: dict = None,
provider: str = "alibabacloud",
identity: dict = None,
):
"""
Initialize Alibaba Cloud mutelist
Args:
mutelist_path: Path to the mutelist file (YAML or JSON)
mutelist_content: Mutelist content as a dictionary
provider: Provider name (default: "alibabacloud")
identity: Alibaba Cloud identity information
"""
super().__init__(
mutelist_path=mutelist_path,
mutelist_content=mutelist_content,
)
self.identity = identity
self.provider = provider
def is_finding_muted(
self,
finding,
account_id: str = None,
region: str = None,
check_id: str = None,
resource_id: str = None,
) -> bool:
"""
Check if a finding should be muted based on mutelist rules
Args:
finding: The finding object to check
account_id: Alibaba Cloud account ID
region: Alibaba Cloud region ID
check_id: Check identifier
resource_id: Resource identifier
Returns:
bool: True if the finding is muted, False otherwise
"""
# Use the parent class implementation which handles the core logic
return super().is_muted(
account_uid=account_id or getattr(finding, "account_uid", None),
region=region or getattr(finding, "region", None),
check_id=check_id or getattr(finding, "check_metadata", {}).get("CheckID"),
resource_id=resource_id or getattr(finding, "resource_uid", None),
finding_tags=getattr(finding, "resource_tags", []),
)

View File

@@ -0,0 +1,146 @@
"""
Alibaba Cloud Service Base Class
This module provides the base class for all Alibaba Cloud service implementations.
"""
import threading
from prowler.lib.logger import logger
class AlibabaCloudService:
"""
Base class for Alibaba Cloud service implementations
This class provides common functionality for all Alibaba Cloud services, including:
- Regional client management
- Multi-threading support for API calls
- Error handling patterns
- Resource auditing metadata
Attributes:
provider: The Alibaba Cloud provider instance
service: Service name (e.g., "ecs", "oss", "ram")
account_id: Alibaba Cloud account ID
regions: List of regions to audit
audit_config: Audit configuration dictionary
regional_clients: Dictionary of regional SDK clients
"""
def __init__(self, service: str, provider):
"""
Initialize the Alibaba Cloud service
Args:
service: Service identifier (e.g., "ecs", "oss")
provider: AlibabaCloudProvider instance
"""
self.provider = provider
self.service = service
self.account_id = provider.identity.account_id
self.regions = provider._regions
self.audit_config = provider.audit_config
self.regional_clients = {}
logger.info(f"Initializing Alibaba Cloud {service.upper()} service")
def __threading_call__(self, call, iterator):
"""
Execute function calls in parallel using threading
This method is used to parallelize API calls across regions or resources
to improve audit performance.
Args:
call: Function to execute
iterator: Iterable of arguments to pass to the function
Example:
self.__threading_call__(self._describe_instances, self.regions)
"""
threads = []
for item in iterator:
thread = threading.Thread(target=call, args=(item,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
def _create_regional_client(self, region: str, _endpoint_override: str = None):
"""
Create a regional Alibaba Cloud SDK client
This method should be overridden by service implementations to create
service-specific clients.
Args:
region: Region identifier
_endpoint_override: Optional endpoint override URL (unused in base)
Returns:
SDK client instance for the specified region
"""
raise NotImplementedError(
f"Service {self.service} must implement _create_regional_client()"
)
def _list_resources(self):
"""
List all resources for this service
This method should be overridden by service implementations to list
service-specific resources.
"""
raise NotImplementedError(
f"Service {self.service} must implement _list_resources()"
)
def _get_resource_details(self, resource):
"""
Get detailed information about a resource
This method can be overridden by service implementations to fetch
additional resource details.
Args:
resource: Resource identifier or object
"""
def _handle_api_error(self, error: Exception, operation: str, region: str = None):
"""
Handle Alibaba Cloud API errors with consistent logging
Args:
error: The exception that occurred
operation: The API operation that failed
region: The region where the error occurred (if applicable)
"""
region_info = f" in region {region}" if region else ""
logger.warning(
f"{self.service.upper()} {operation} failed{region_info}: {str(error)}"
)
def generate_resource_arn(
self, resource_type: str, resource_id: str, region: str = ""
) -> str:
"""
Generate Alibaba Cloud Resource Name (ARN) in ACS format
Format: acs:{service}:{region}:{account-id}:{resource-type}/{resource-id}
Args:
resource_type: Type of resource (e.g., "instance", "bucket")
resource_id: Resource identifier
region: Region identifier (optional for global resources)
Returns:
str: Formatted ARN string
Example:
arn = self.generate_resource_arn("instance", "i-abc123", "cn-hangzhou")
# Returns: "acs:ecs:cn-hangzhou:123456789:instance/i-abc123"
"""
return f"acs:{self.service}:{region}:{self.account_id}:{resource_type}/{resource_id}"

View File

@@ -0,0 +1,129 @@
"""
Alibaba Cloud Provider Models
This module contains data models for the Alibaba Cloud provider.
"""
from argparse import Namespace
from dataclasses import dataclass
from typing import Optional
from prowler.providers.common.models import ProviderOutputOptions
@dataclass
class AlibabaCloudIdentityInfo:
"""
AlibabaCloudIdentityInfo contains the Alibaba Cloud identity information
Attributes:
account_id: Alibaba Cloud account ID
account_arn: Alibaba Cloud account ARN
user_id: RAM user ID (optional)
user_name: RAM user name (optional)
account_name: Account alias/name (optional)
"""
account_id: str
account_arn: str
user_id: Optional[str] = None
user_name: Optional[str] = None
account_name: Optional[str] = None
@dataclass
class AlibabaCloudRegion:
"""
AlibabaCloudRegion contains region information
Attributes:
region_id: Region identifier (e.g., "cn-hangzhou")
local_name: Localized region name
region_endpoint: Region endpoint URL
"""
region_id: str
local_name: str
region_endpoint: Optional[str] = None
@dataclass
class AlibabaCloudCredentials:
"""
AlibabaCloudCredentials contains authentication credentials
Attributes:
access_key_id: AccessKey ID
access_key_secret: AccessKey Secret
security_token: STS security token (optional)
expiration: Token expiration timestamp (optional)
"""
access_key_id: str
access_key_secret: str
security_token: Optional[str] = None
expiration: Optional[str] = None
@dataclass
class AlibabaCloudAssumeRoleInfo:
"""
AlibabaCloudAssumeRoleInfo contains RAM role assumption information
Attributes:
role_arn: RAM role ARN to assume
role_session_name: Session name for the assumed role
external_id: External ID for role assumption (optional)
session_duration: Duration in seconds (900-43200, default 3600)
"""
role_arn: str
role_session_name: str
external_id: Optional[str] = None
session_duration: int = 3600
@dataclass
class AlibabaCloudSession:
"""
AlibabaCloudSession contains the session configuration
Attributes:
credentials: Alibaba Cloud credentials
region_id: Default region for the session
"""
credentials: AlibabaCloudCredentials
region_id: str = "cn-hangzhou"
class AlibabaCloudOutputOptions(ProviderOutputOptions):
"""
AlibabaCloudOutputOptions contains the output configuration for Alibaba Cloud
This class extends ProviderOutputOptions to provide Alibaba Cloud-specific
output filename generation based on the account ID.
"""
def __init__(self, arguments: Namespace, bulk_checks_metadata: dict, identity: AlibabaCloudIdentityInfo):
"""
Initialize Alibaba Cloud output options
Args:
arguments: Command-line arguments
bulk_checks_metadata: Metadata for all checks
identity: Alibaba Cloud identity information
"""
# Call parent class init
super().__init__(arguments, bulk_checks_metadata)
# Import here to avoid circular dependency
from prowler.config.config import output_file_timestamp
# Check if custom output filename was provided
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
# Use account ID for output filename
account_identifier = identity.account_id
self.output_filename = (
f"prowler-output-{account_identifier}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename

View File

@@ -0,0 +1 @@
"""Alibaba Cloud ACK (Container Service for Kubernetes) module"""

View File

@@ -0,0 +1,6 @@
"""Alibaba Cloud ACK Client Singleton"""
from prowler.providers.alibabacloud.services.ack.ack_service import ACK
from prowler.providers.common.provider import Provider
ack_client = ACK(Provider.get_global_provider())

View File

@@ -0,0 +1 @@
"""ACK Audit Logging Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_audit_log",
"CheckTitle": "Check ACK Audit Logging",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK Audit Logging.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_audit_log(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have audit logging enabled."
if cluster.audit_log_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has audit logging enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Encryption Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_encryption",
"CheckTitle": "Check ACK Encryption",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Checks ACK Encryption.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_encryption(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} does not have encryption enabled."
)
if cluster.encryption_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has encryption enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Network Policy Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_network_policy",
"CheckTitle": "Check ACK Network Policy",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK Network Policy.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,18 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_network_policy(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have network policy support enabled."
if cluster.network_policy_enabled:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} has network policy support enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Private Zone Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_private_zone",
"CheckTitle": "Check ACK Private Zone",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "low",
"ResourceType": "Cluster",
"Description": "Checks ACK Private Zone.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_private_zone(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} does not have PrivateZone enabled."
)
if cluster.private_zone_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has PrivateZone enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Public Access Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_public_access",
"CheckTitle": "Check ACK Public Access",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Checks ACK Public Access.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_public_access(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} API server is publicly accessible."
)
if not cluster.public_access:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} API server is not publicly accessible."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK RBAC Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_rbac",
"CheckTitle": "Check ACK RBAC",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK RBAC.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_rbac(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} does not have RBAC enabled."
)
if cluster.rbac_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has RBAC enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Security Group Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_security_group",
"CheckTitle": "Check ACK Security Group",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK Security Group.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,18 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_security_group(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have a security group configured."
if cluster.security_group_id:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} has security group {cluster.security_group_id} configured."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK VPC Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_vpc",
"CheckTitle": "Check ACK VPC",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK VPC.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_vpc(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} is not deployed in a VPC."
)
if cluster.vpc_id:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} is deployed in VPC {cluster.vpc_id}."
findings.append(report)
return findings

View File

@@ -0,0 +1,194 @@
"""Alibaba Cloud ACK Service"""
from dataclasses import dataclass
from prowler.lib.logger import logger
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
@dataclass
class Cluster:
"""ACK Cluster"""
cluster_id: str
cluster_name: str
arn: str
region: str
cluster_type: str = "Kubernetes"
state: str = "running"
public_access: bool = True # Will trigger check
private_zone_enabled: bool = False
network_policy_enabled: bool = False # Will trigger check
rbac_enabled: bool = True
encryption_enabled: bool = False # Will trigger check
audit_log_enabled: bool = False # Will trigger check
security_group_id: str = ""
vpc_id: str = ""
master_url: str = ""
def __post_init__(self):
pass
class ACK(AlibabaCloudService):
def __init__(self, provider):
super().__init__("ack", provider)
self.clusters = {}
logger.info("Collecting ACK clusters...")
self._describe_clusters()
logger.info(f"ACK service initialized - Clusters: {len(self.clusters)}")
def _describe_clusters(self):
for region in self.regions:
try:
from alibabacloud_cs20151215.client import Client as AckClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ACK client
client = AckClient(config)
# List clusters
response = client.describe_clusters_v1()
# Process clusters
if response.body and response.body.clusters:
for cluster_data in response.body.clusters:
cluster_id = cluster_data.cluster_id
arn = self.generate_resource_arn("cluster", cluster_id, region)
# Get detailed cluster info
try:
detail_response = client.describe_cluster_detail(cluster_id)
detail = (
detail_response.body
if detail_response.body
else cluster_data
)
# Check audit log
audit_log_enabled = False
if hasattr(detail, "parameters") and detail.parameters:
audit_log_enabled = (
detail.parameters.get("AuditLogEnabled", "false")
== "true"
)
# Check encryption
encryption_enabled = False
if hasattr(detail, "parameters") and detail.parameters:
encryption_enabled = (
detail.parameters.get("EncryptionEnabled", "false")
== "true"
)
# Check network policy
network_policy_enabled = False
if hasattr(detail, "parameters") and detail.parameters:
network_policy_enabled = (
detail.parameters.get("NetworkPlugin", "")
== "terway"
)
# Check RBAC
rbac_enabled = (
True # Default to true for modern ACK clusters
)
if hasattr(detail, "parameters") and detail.parameters:
rbac_enabled = (
detail.parameters.get("RBACEnabled", "true")
== "true"
)
# Check private zone
private_zone_enabled = False
if hasattr(detail, "private_zone", "false"):
private_zone_enabled = detail.private_zone
cluster = Cluster(
cluster_id=cluster_id,
cluster_name=(
cluster_data.name
if cluster_data.name
else cluster_id
),
arn=arn,
region=region,
cluster_type=(
cluster_data.cluster_type
if hasattr(cluster_data, "cluster_type")
else "Kubernetes"
),
state=(
cluster_data.state
if hasattr(cluster_data, "state")
else "running"
),
public_access=(
hasattr(cluster_data, "public_slb")
and cluster_data.public_slb
if hasattr(cluster_data, "public_slb")
else False
),
private_zone_enabled=private_zone_enabled,
network_policy_enabled=network_policy_enabled,
rbac_enabled=rbac_enabled,
encryption_enabled=encryption_enabled,
audit_log_enabled=audit_log_enabled,
security_group_id=(
cluster_data.security_group_id
if hasattr(cluster_data, "security_group_id")
else ""
),
vpc_id=(
cluster_data.vpc_id
if hasattr(cluster_data, "vpc_id")
else ""
),
master_url=(
cluster_data.master_url
if hasattr(cluster_data, "master_url")
else ""
),
)
self.clusters[arn] = cluster
logger.info(f"Found ACK cluster: {cluster_id} in {region}")
except Exception as detail_error:
logger.warning(
f"Could not get details for cluster {cluster_id}: {detail_error}"
)
# Use basic cluster data
cluster = Cluster(
cluster_id=cluster_id,
cluster_name=(
cluster_data.name
if cluster_data.name
else cluster_id
),
arn=arn,
region=region,
vpc_id=(
cluster_data.vpc_id
if hasattr(cluster_data, "vpc_id")
else ""
),
)
self.clusters[arn] = cluster
else:
logger.info(f"No ACK clusters found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeClusters", region)

View File

@@ -0,0 +1 @@
"""Alibaba Cloud ActionTrail module"""

View File

@@ -0,0 +1,6 @@
"""Alibaba Cloud ActionTrail Client Singleton"""
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_service import ActionTrail
from prowler.providers.common.provider import Provider
actiontrail_client = ActionTrail(Provider.get_global_provider())

View File

@@ -0,0 +1,148 @@
"""
Alibaba Cloud ActionTrail Service
This module provides the service class for Alibaba Cloud ActionTrail.
"""
from dataclasses import dataclass
from prowler.lib.logger import logger
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
@dataclass
class Trail:
"""ActionTrail Trail"""
name: str
arn: str
region: str
status: str = "Disabled" # Will trigger check
oss_bucket_name: str = ""
oss_key_prefix: str = ""
sls_project_arn: str = ""
sls_write_role_arn: str = ""
event_rw: str = "Write" # Should be "All"
trail_region: str = "All"
is_organization_trail: bool = False
mns_topic_arn: str = ""
def __post_init__(self):
pass
class ActionTrail(AlibabaCloudService):
"""
Alibaba Cloud ActionTrail service class
Handles collection of ActionTrail resources including trails and their configurations.
"""
def __init__(self, provider):
"""Initialize ActionTrail service"""
super().__init__("actiontrail", provider)
self.trails = {}
logger.info("Collecting ActionTrail trails...")
self._describe_trails()
logger.info(f"ActionTrail service initialized - Trails: {len(self.trails)}")
def _describe_trails(self):
"""Describe all ActionTrail trails"""
# ActionTrail is a global service, but we'll check it once
try:
from alibabacloud_actiontrail20200706 import models
from alibabacloud_actiontrail20200706.client import (
Client as ActionTrailClient,
)
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration (use cn-hangzhou as default region)
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id="cn-hangzhou",
)
if self.provider.session.credentials.security_token:
config.security_token = self.provider.session.credentials.security_token
# Create ActionTrail client
client = ActionTrailClient(config)
# List trails
request = models.ListTrailsRequest()
response = client.list_trails(request)
# Process trails
if response.body.trails:
for trail_data in response.body.trails:
trail_name = trail_data.name if trail_data.name else "unknown"
arn = self.generate_resource_arn("trail", trail_name, "")
# Get trail status
try:
status_request = models.GetTrailStatusRequest(name=trail_name)
status_response = client.get_trail_status(status_request)
status = (
"Enabled" if status_response.body.is_logging else "Disabled"
)
except Exception:
status = "Unknown"
trail = Trail(
name=trail_name,
arn=arn,
region="global",
status=status,
oss_bucket_name=(
trail_data.oss_bucket_name
if hasattr(trail_data, "oss_bucket_name")
else ""
),
oss_key_prefix=(
trail_data.oss_key_prefix
if hasattr(trail_data, "oss_key_prefix")
else ""
),
sls_project_arn=(
trail_data.sls_project_arn
if hasattr(trail_data, "sls_project_arn")
else ""
),
sls_write_role_arn=(
trail_data.sls_write_role_arn
if hasattr(trail_data, "sls_write_role_arn")
else ""
),
event_rw=(
trail_data.event_rw
if hasattr(trail_data, "event_rw")
else "Write"
),
trail_region=(
trail_data.trail_region
if hasattr(trail_data, "trail_region")
else "All"
),
is_organization_trail=(
trail_data.is_organization_trail
if hasattr(trail_data, "is_organization_trail")
else False
),
mns_topic_arn=(
trail_data.mns_topic_arn
if hasattr(trail_data, "mns_topic_arn")
else ""
),
)
self.trails[arn] = trail
logger.info(f"Found ActionTrail trail: {trail_name}")
else:
logger.info("No ActionTrail trails found")
except Exception as error:
self._handle_api_error(error, "DescribeTrails", "global")

View File

@@ -0,0 +1 @@
"""ActionTrail Trail Enabled Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "actiontrail_trail_enabled",
"CheckTitle": "Ensure at least one ActionTrail trail is enabled",
"CheckType": [],
"ServiceName": "actiontrail",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Trail",
"Description": "Ensures that at least one ActionTrail trail is enabled to log API activity for security auditing. ActionTrail records API calls and events for compliance, security analysis, and troubleshooting.",
"Risk": "Without ActionTrail enabled, there is no audit log of API activity, making it impossible to track changes, detect unauthorized access, or investigate security incidents. This can lead to compliance violations and delayed incident response.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ActionTrail/trail-enabled.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable at least one ActionTrail trail. Go to ActionTrail Console and create/enable a trail to log API activity across all regions.",
"Url": "https://www.alibabacloud.com/help/en/actiontrail/latest/create-a-trail-to-deliver-events-to-oss"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,41 @@
from dataclasses import dataclass
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_client import (
actiontrail_client,
)
@dataclass
class ActionTrailConfig:
id: str
name: str
arn: str
region: str
class actiontrail_trail_enabled(Check):
def execute(self):
findings = []
enabled_trails = [
trail
for trail in actiontrail_client.trails.values()
if trail.status == "Enabled"
]
config = ActionTrailConfig(
id="actiontrail-configuration",
name="ActionTrail Configuration",
arn=f"acs:actiontrail::{actiontrail_client.account_id}:configuration",
region="global",
)
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=config)
report.status = "FAIL"
report.status_extended = "No enabled ActionTrail trails found."
if len(enabled_trails) > 0:
trail_names = [trail.name for trail in enabled_trails]
report.status = "PASS"
report.status_extended = f"ActionTrail has {len(enabled_trails)} enabled trail(s): {', '.join(trail_names)}."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ActionTrail Trail Logs All Events Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "actiontrail_trail_logs_all_events",
"CheckTitle": "Ensure ActionTrail trails log all events (read and write)",
"CheckType": [],
"ServiceName": "actiontrail",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Trail",
"Description": "Ensures that ActionTrail trails are configured to log both read and write events. Logging all events provides complete audit coverage for security analysis and compliance.",
"Risk": "Logging only write events means read-only API calls are not recorded, potentially missing important security events such as unauthorized data access attempts or reconnaissance activities.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ActionTrail/trail-log-all-events.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Update ActionTrail trails to log all events. Go to ActionTrail Console, select the trail, and configure it to log both read and write events.",
"Url": "https://www.alibabacloud.com/help/en/actiontrail/latest/create-a-trail-to-deliver-events-to-oss"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_client import (
actiontrail_client,
)
class actiontrail_trail_logs_all_events(Check):
def execute(self):
findings = []
for trail in actiontrail_client.trails.values():
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=trail)
report.status = "FAIL"
report.status_extended = (
f"ActionTrail trail {trail.name} only logs {trail.event_rw} events."
)
if trail.event_rw == "All":
report.status = "PASS"
report.status_extended = (
f"ActionTrail trail {trail.name} logs all events."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""Alibaba Cloud ECS (Elastic Compute Service) module"""

View File

@@ -0,0 +1,11 @@
"""
Alibaba Cloud ECS Client Singleton
This module provides the singleton ECS client instance.
"""
from prowler.providers.alibabacloud.services.ecs.ecs_service import ECS
from prowler.providers.common.provider import Provider
# Initialize ECS client singleton
ecs_client = ECS(Provider.get_global_provider())

View File

@@ -0,0 +1,37 @@
{
"Provider": "alibabacloud",
"CheckID": "ecs_disk_encryption_enabled",
"CheckTitle": "Ensure ECS Disk Encryption is Enabled",
"CheckType": [
"Security",
"Encryption"
],
"ServiceName": "ecs",
"SubServiceName": "disk",
"ResourceIdTemplate": "arn:acs:ecs:region:account-id:disk/disk-id",
"Severity": "medium",
"ResourceType": "AlibabaCloud::ECS::Disk",
"Description": "Ensure that all ECS disks (system disks and data disks) are encrypted to protect data at rest. Disk encryption provides an additional layer of security by encrypting data stored on ECS instances using Alibaba Cloud KMS.",
"Risk": "Unencrypted disks can expose sensitive data if the physical storage media is compromised, if disk snapshots are inadvertently shared, or if unauthorized users gain access to the disk. Without encryption, data at rest is vulnerable to unauthorized access.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ECS/disk-encryption.html",
"Remediation": {
"Code": {
"CLI": "# Note: Encryption must be enabled when creating a disk. Existing disks cannot be encrypted in-place.\n# Create a snapshot of the unencrypted disk, then create a new encrypted disk from the snapshot:\naliyun ecs CreateSnapshot --DiskId <disk-id> --SnapshotName encrypted-snapshot\naliyun ecs CreateDisk --SnapshotId <snapshot-id> --Encrypted true --KMSKeyId <kms-key-id>",
"NativeIaC": "",
"Other": "For existing disks, you must create a snapshot, create a new encrypted disk from the snapshot, detach the old disk, attach the new encrypted disk, and then delete the old disk.",
"Terraform": "resource \"alicloud_disk\" \"encrypted_disk\" {\n disk_name = \"encrypted-disk\"\n size = 100\n category = \"cloud_essd\"\n encrypted = true\n kms_key_id = var.kms_key_id\n zone_id = \"cn-hangzhou-h\"\n}"
},
"Recommendation": {
"Text": "Enable disk encryption for all new ECS disks. For existing unencrypted disks, create encrypted copies using snapshots and KMS. Use customer-managed KMS keys for enhanced control over encryption keys. Regularly rotate encryption keys and audit key usage.",
"Url": "https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview"
}
},
"Categories": [
"encryption",
"data-protection"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ECS disk encryption cannot be enabled on existing disks. You must create new encrypted disks or create encrypted copies from snapshots. Consider enabling default encryption at the account level to ensure all new disks are automatically encrypted.",
"Compliance": []
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
class ecs_disk_encryption_enabled(Check):
def execute(self):
findings = []
for disk in ecs_client.disks.values():
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=disk)
report.status = "FAIL"
report.status_extended = (
f"ECS disk {disk.name} ({disk.id}) is not encrypted."
)
if disk.encrypted:
report.status = "PASS"
report.status_extended = (
f"ECS disk {disk.name} ({disk.id}) is encrypted."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ECS Instance Public IP Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ecs_instance_public_ip",
"CheckTitle": "Ensure ECS instances do not have unnecessary public IPs",
"CheckType": [],
"ServiceName": "ecs",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Instance",
"Description": "Ensures ECS instances do not have unnecessary public IP addresses.",
"Risk": "Public IP addresses increase attack surface.",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "Use EIP or NAT Gateway instead.", "Url": ""}},
"Categories": ["internet-exposed"],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
class ecs_instance_public_ip(Check):
def execute(self):
findings = []
for instance in ecs_client.instances.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=instance
)
report.status = "FAIL"
report.status_extended = (
f"ECS instance {instance.name} has public IP {instance.public_ip}."
)
if not instance.public_ip:
report.status = "PASS"
report.status_extended = (
f"ECS instance {instance.name} does not have a public IP address."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Provider": "alibabacloud",
"CheckID": "ecs_instance_ssh_access_restricted",
"CheckTitle": "Check for Unrestricted SSH Access",
"CheckType": [
"Security",
"Network"
],
"ServiceName": "ecs",
"SubServiceName": "security_group",
"ResourceIdTemplate": "arn:acs:ecs:region:account-id:security-group/sg-id",
"Severity": "high",
"ResourceType": "AlibabaCloud::ECS::SecurityGroup",
"Description": "Ensure ECS security groups do not allow unrestricted SSH access (port 22) from the internet (0.0.0.0/0). Unrestricted SSH access increases the risk of unauthorized access, brute force attacks, and potential security breaches.",
"Risk": "Allowing unrestricted SSH access from the internet can expose instances to brute force attacks, unauthorized access attempts, and potential compromise. Attackers can scan for open SSH ports and attempt to gain unauthorized access to instances.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ECS/unrestricted-ssh-access.html",
"Remediation": {
"Code": {
"CLI": "aliyun ecs ModifySecurityGroupRule --SecurityGroupId <sg-id> --IpProtocol tcp --PortRange 22/22 --SourceCidrIp <your-ip>/32",
"NativeIaC": "",
"Other": "",
"Terraform": "resource \"alicloud_security_group_rule\" \"allow_ssh_from_specific_ip\" {\n type = \"ingress\"\n ip_protocol = \"tcp\"\n port_range = \"22/22\"\n security_group_id = alicloud_security_group.group.id\n cidr_ip = \"203.0.113.0/24\"\n}"
},
"Recommendation": {
"Text": "Restrict SSH access to specific trusted IP addresses or IP ranges. Implement a bastion host or VPN for secure remote access. Consider using Alibaba Cloud's Session Manager for instance access without opening SSH ports.",
"Url": "https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview"
}
},
"Categories": [
"internet-exposed",
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check examines security group rules to identify unrestricted SSH access. It is recommended to use the principle of least privilege and only allow SSH access from known, trusted IP addresses.",
"Compliance": []
}

View File

@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
class ecs_instance_ssh_access_restricted(Check):
def execute(self):
findings = []
for security_group in ecs_client.security_groups.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=security_group
)
has_unrestricted_ssh = False
for rule in security_group.rules:
if (
rule.get("direction") == "ingress"
and rule.get("protocol") == "tcp"
and "22" in rule.get("port_range", "")
and rule.get("source") == "0.0.0.0/0"
):
has_unrestricted_ssh = True
break
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) allows unrestricted SSH access."
if not has_unrestricted_ssh:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) does not allow unrestricted SSH access."
findings.append(report)
return findings

View File

@@ -0,0 +1,496 @@
"""
Alibaba Cloud ECS Service
This module provides the service class for Alibaba Cloud Elastic Compute Service (ECS).
"""
from dataclasses import dataclass
from typing import Optional
from prowler.lib.logger import logger
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
@dataclass
class Instance:
"""
Represents an Alibaba Cloud ECS instance
Attributes:
id: Instance ID
name: Instance name
arn: Instance ARN
region: Region where the instance is located
status: Instance status (Running, Stopped, etc.)
instance_type: Instance type/specification
public_ip: Public IP address (if any)
private_ip: Private IP address
security_groups: List of security group IDs
vpc_id: VPC ID
zone_id: Availability zone ID
image_id: OS image ID
tags: Instance tags
created_time: Creation timestamp
expired_time: Expiration timestamp (for subscription instances)
"""
id: str
name: str
arn: str
region: str
status: str = ""
instance_type: str = ""
public_ip: Optional[str] = None
private_ip: str = ""
security_groups: list = None
vpc_id: str = ""
zone_id: str = ""
image_id: str = ""
tags: dict = None
created_time: str = ""
expired_time: str = ""
def __post_init__(self):
if self.security_groups is None:
self.security_groups = []
if self.tags is None:
self.tags = {}
@dataclass
class Disk:
"""
Represents an Alibaba Cloud ECS disk
Attributes:
id: Disk ID
name: Disk name
arn: Disk ARN
region: Region where the disk is located
disk_type: Type of disk (system, data)
category: Disk category (cloud_ssd, cloud_essd, etc.)
size: Disk size in GB
encrypted: Whether the disk is encrypted
kms_key_id: KMS key ID used for encryption (if encrypted)
status: Disk status (Available, In_use, etc.)
instance_id: Attached instance ID (if attached)
zone_id: Availability zone ID
tags: Disk tags
"""
id: str
name: str
arn: str
region: str
disk_type: str = ""
category: str = ""
size: int = 0
encrypted: bool = False
kms_key_id: Optional[str] = None
status: str = ""
instance_id: Optional[str] = None
zone_id: str = ""
tags: dict = None
def __post_init__(self):
if self.tags is None:
self.tags = {}
@dataclass
class SecurityGroup:
"""
Represents an Alibaba Cloud security group
Attributes:
id: Security group ID
name: Security group name
arn: Security group ARN
region: Region where the security group is located
vpc_id: VPC ID
description: Security group description
rules: List of security group rules
tags: Security group tags
"""
id: str
name: str
arn: str
region: str
vpc_id: str = ""
description: str = ""
rules: list = None
tags: dict = None
def __post_init__(self):
if self.rules is None:
self.rules = []
if self.tags is None:
self.tags = {}
class ECS(AlibabaCloudService):
"""
Alibaba Cloud ECS service class
This class handles the collection of ECS resources including instances,
disks, and security groups for auditing.
"""
def __init__(self, provider):
"""
Initialize ECS service
Args:
provider: AlibabaCloudProvider instance
"""
super().__init__("ecs", provider)
# Initialize resource dictionaries
self.instances = {}
self.disks = {}
self.security_groups = {}
# Collect ECS resources
logger.info("Collecting ECS instances...")
self._describe_instances()
logger.info("Collecting ECS disks...")
self._describe_disks()
logger.info("Collecting ECS security groups...")
self._describe_security_groups()
logger.info(
f"ECS service initialized - Instances: {len(self.instances)}, "
f"Disks: {len(self.disks)}, Security Groups: {len(self.security_groups)}"
)
def _describe_instances(self):
"""
Describe ECS instances across all regions
This method collects all ECS instances and their details.
"""
logger.info("Describing ECS instances across regions...")
for region in self.regions:
try:
from alibabacloud_ecs20140526 import models
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
# Add security token if present (for STS)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ECS client
client = EcsClient(config)
# Describe instances
request = models.DescribeInstancesRequest(
page_size=100, region_id=region
)
response = client.describe_instances(request)
# Process instances
if response.body.instances and response.body.instances.instance:
for instance_data in response.body.instances.instance:
instance_id = instance_data.instance_id
arn = self.generate_resource_arn(
"instance", instance_id, region
)
# Get public IP
public_ip = None
if (
instance_data.public_ip_address
and instance_data.public_ip_address.ip_address
):
public_ip = (
instance_data.public_ip_address.ip_address[0]
if instance_data.public_ip_address.ip_address
else None
)
elif (
instance_data.eip_address
and instance_data.eip_address.ip_address
):
public_ip = instance_data.eip_address.ip_address
# Get private IP
private_ip = ""
if (
instance_data.vpc_attributes
and instance_data.vpc_attributes.private_ip_address
):
private_ip = (
instance_data.vpc_attributes.private_ip_address.ip_address[
0
]
if instance_data.vpc_attributes.private_ip_address.ip_address
else ""
)
# Get security groups
security_groups = []
if (
instance_data.security_group_ids
and instance_data.security_group_ids.security_group_id
):
security_groups = (
instance_data.security_group_ids.security_group_id
)
# Get VPC ID
vpc_id = (
instance_data.vpc_attributes.vpc_id
if instance_data.vpc_attributes
else ""
)
# Get tags
tags = {}
if instance_data.tags and instance_data.tags.tag:
for tag in instance_data.tags.tag:
tags[tag.tag_key] = tag.tag_value
instance = Instance(
id=instance_id,
name=instance_data.instance_name or instance_id,
arn=arn,
region=region,
status=instance_data.status,
instance_type=instance_data.instance_type,
public_ip=public_ip,
private_ip=private_ip,
security_groups=security_groups,
vpc_id=vpc_id,
zone_id=instance_data.zone_id,
image_id=instance_data.image_id,
tags=tags,
created_time=instance_data.creation_time,
expired_time=(
instance_data.expired_time
if hasattr(instance_data, "expired_time")
else ""
),
)
self.instances[arn] = instance
logger.info(f"Found ECS instance: {instance_id} in {region}")
else:
logger.info(f"No ECS instances found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeInstances", region)
def _describe_disks(self):
"""
Describe ECS disks across all regions
This method collects all ECS disks and their encryption status.
"""
logger.info("Describing ECS disks across regions...")
for region in self.regions:
try:
from alibabacloud_ecs20140526 import models
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ECS client
client = EcsClient(config)
# Describe disks
request = models.DescribeDisksRequest(page_size=100, region_id=region)
response = client.describe_disks(request)
# Process disks
if response.body.disks and response.body.disks.disk:
for disk_data in response.body.disks.disk:
disk_id = disk_data.disk_id
arn = self.generate_resource_arn("disk", disk_id, region)
# Get tags
tags = {}
if disk_data.tags and disk_data.tags.tag:
for tag in disk_data.tags.tag:
tags[tag.tag_key] = tag.tag_value
disk = Disk(
id=disk_id,
name=disk_data.disk_name or disk_id,
arn=arn,
region=region,
disk_type=disk_data.type if disk_data.type else "",
category=disk_data.category if disk_data.category else "",
size=disk_data.size if disk_data.size else 0,
encrypted=(
disk_data.encrypted
if hasattr(disk_data, "encrypted")
else False
),
kms_key_id=(
disk_data.kms_key_id
if hasattr(disk_data, "kms_key_id")
else None
),
status=disk_data.status if disk_data.status else "",
instance_id=(
disk_data.instance_id if disk_data.instance_id else None
),
zone_id=disk_data.zone_id if disk_data.zone_id else "",
tags=tags,
)
self.disks[arn] = disk
logger.info(f"Found ECS disk: {disk_id} in {region}")
else:
logger.info(f"No ECS disks found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeDisks", region)
def _describe_security_groups(self):
"""
Describe security groups and their rules
This method collects security groups and analyzes their rules.
"""
logger.info("Describing security groups across regions...")
for region in self.regions:
try:
from alibabacloud_ecs20140526 import models
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ECS client
client = EcsClient(config)
# Describe security groups
request = models.DescribeSecurityGroupsRequest(
page_size=100, region_id=region
)
response = client.describe_security_groups(request)
# Process security groups
if (
response.body.security_groups
and response.body.security_groups.security_group
):
for sg_data in response.body.security_groups.security_group:
sg_id = sg_data.security_group_id
arn = self.generate_resource_arn(
"security-group", sg_id, region
)
# Get tags
tags = {}
if sg_data.tags and sg_data.tags.tag:
for tag in sg_data.tags.tag:
tags[tag.tag_key] = tag.tag_value
# Get security group rules
rules = []
try:
rules_request = (
models.DescribeSecurityGroupAttributeRequest(
security_group_id=sg_id, region_id=region
)
)
rules_response = client.describe_security_group_attribute(
rules_request
)
# Process ingress rules
if (
rules_response.body.permissions
and rules_response.body.permissions.permission
):
for perm in rules_response.body.permissions.permission:
rule = {
"direction": (
perm.direction
if perm.direction
else "ingress"
),
"protocol": (
perm.ip_protocol if perm.ip_protocol else ""
),
"port_range": (
perm.port_range if perm.port_range else ""
),
"source": (
perm.source_cidr_ip
if hasattr(perm, "source_cidr_ip")
and perm.source_cidr_ip
else ""
),
"source_group_id": (
perm.source_group_id
if hasattr(perm, "source_group_id")
and perm.source_group_id
else ""
),
}
rules.append(rule)
except Exception as rules_error:
logger.warning(
f"Could not retrieve rules for security group {sg_id}: {rules_error}"
)
security_group = SecurityGroup(
id=sg_id,
name=sg_data.security_group_name or sg_id,
arn=arn,
region=region,
vpc_id=sg_data.vpc_id if sg_data.vpc_id else "",
description=(
sg_data.description if sg_data.description else ""
),
rules=rules,
tags=tags,
)
self.security_groups[arn] = security_group
logger.info(f"Found security group: {sg_id} in {region}")
else:
logger.info(f"No security groups found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeSecurityGroups", region)

View File

@@ -0,0 +1 @@
"""Alibaba Cloud OSS (Object Storage Service) module"""

View File

@@ -0,0 +1 @@
"""OSS Bucket CORS Not Overly Permissive Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_cors_not_overly_permissive",
"CheckTitle": "Ensure OSS bucket CORS rules are not overly permissive",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Bucket",
"Description": "Ensures that OSS bucket CORS (Cross-Origin Resource Sharing) rules are not overly permissive. Overly permissive CORS rules can allow unauthorized websites to access bucket resources.",
"Risk": "Overly permissive CORS rules allowing all origins (*) can enable malicious websites to access bucket resources, potentially leading to data leakage or unauthorized operations.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-cors.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Update CORS rules to allow only specific trusted origins instead of '*'. Go to OSS Console > Buckets, select the bucket, and configure CORS rules with specific allowed origins.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/cors"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,30 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_cors_not_overly_permissive(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
if not bucket.cors_rules or len(bucket.cors_rules) == 0:
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} does not have CORS rules configured."
)
else:
overly_permissive = False
for rule in bucket.cors_rules:
allowed_origins = rule.get("AllowedOrigin", [])
if "*" in allowed_origins:
overly_permissive = True
break
report.status = "FAIL"
report.status_extended = f"OSS bucket {bucket.name} has overly permissive CORS rules allowing all origins."
if not overly_permissive:
report.status = "PASS"
report.status_extended = f"OSS bucket {bucket.name} has appropriately restrictive CORS rules."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Default Encryption KMS Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_default_encryption_kms",
"CheckTitle": "Ensure OSS buckets use KMS for default encryption",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "low",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets use KMS (Key Management Service) for encryption instead of AES256. KMS provides better key management, rotation, and audit capabilities compared to AES256.",
"Risk": "Using AES256 encryption instead of KMS means missing out on centralized key management, automatic key rotation, and comprehensive audit logging capabilities.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-encryption-kms.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Update OSS bucket encryption to use KMS instead of AES256. Go to OSS Console > Buckets, select the bucket, and configure server-side encryption with KMS.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/server-side-encryption"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "KMS encryption may have additional costs compared to AES256."
}

View File

@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_default_encryption_kms(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
if bucket.encryption_enabled and bucket.encryption_algorithm:
if "KMS" in bucket.encryption_algorithm.upper():
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} uses KMS for encryption."
)
else:
report.status_extended = f"OSS bucket {bucket.name} uses {bucket.encryption_algorithm} for encryption."
else:
report.status_extended = (
f"OSS bucket {bucket.name} does not have encryption enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Encryption Enabled Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_encryption_enabled",
"CheckTitle": "Ensure OSS buckets have encryption enabled",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets have server-side encryption enabled to protect data at rest. Encryption protects sensitive data from unauthorized access.",
"Risk": "Unencrypted data at rest is vulnerable to unauthorized access if storage media is compromised. Encryption ensures data confidentiality even if physical access is obtained.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-encryption.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable server-side encryption for OSS buckets. Go to OSS Console > Buckets, select the bucket, and enable server-side encryption with AES256 or KMS.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/server-side-encryption"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_encryption_enabled(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = (
f"OSS bucket {bucket.name} does not have encryption enabled."
)
if bucket.encryption_enabled:
report.status = "PASS"
report.status_extended = f"OSS bucket {bucket.name} has encryption enabled with {bucket.encryption_algorithm}."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Lifecycle Rules Check"""

Some files were not shown because too many files have changed in this diff Show More