Compare commits

...

2 Commits

Author SHA1 Message Date
pedrooot
7732281ecf feat(alibabacloud): change formating and improve checks 2025-10-27 10:42:37 +01:00
pedrooot
55e846b3d7 feat(provider): add alibabacloud provider 2025-10-24 13:10:49 +02:00
423 changed files with 15395 additions and 1 deletions

View File

@@ -0,0 +1,75 @@
"""
CIS Alibaba Cloud Compliance Dashboard
This module generates compliance reports for the CIS Alibaba Cloud Foundations Benchmark.
"""
import warnings
from dashboard.common_methods import get_section_containers_3_levels
warnings.filterwarnings("ignore")
def get_table(data):
"""
Generate compliance table for CIS Alibaba Cloud framework
This function processes compliance data and formats it for dashboard display,
with sections, subsections, and individual requirements.
Args:
data: DataFrame containing compliance data with columns:
- REQUIREMENTS_ID: Requirement identifier (e.g., "2.1", "4.1")
- REQUIREMENTS_DESCRIPTION: Description of the requirement
- REQUIREMENTS_ATTRIBUTES_SECTION: Main section (e.g., "2. Storage")
- REQUIREMENTS_ATTRIBUTES_SUBSECTION: Subsection (e.g., "2.1 ECS Disk Encryption")
- CHECKID: Associated Prowler check ID
- STATUS: Check status (PASS/FAIL)
- REGION: Alibaba Cloud region
- ACCOUNTID: Alibaba Cloud account ID
- RESOURCEID: Resource identifier
Returns:
Dashboard table with hierarchical compliance structure
"""
# Format requirement descriptions with ID prefix and truncate if too long
data["REQUIREMENTS_DESCRIPTION"] = (
data["REQUIREMENTS_ID"] + " - " + data["REQUIREMENTS_DESCRIPTION"]
)
data["REQUIREMENTS_DESCRIPTION"] = data["REQUIREMENTS_DESCRIPTION"].apply(
lambda x: x[:150] + "..." if len(str(x)) > 150 else x
)
# Truncate section names if too long
data["REQUIREMENTS_ATTRIBUTES_SECTION"] = data[
"REQUIREMENTS_ATTRIBUTES_SECTION"
].apply(lambda x: x[:80] + "..." if len(str(x)) > 80 else x)
# Truncate subsection names if too long
data["REQUIREMENTS_ATTRIBUTES_SUBSECTION"] = data[
"REQUIREMENTS_ATTRIBUTES_SUBSECTION"
].apply(lambda x: x[:150] + "..." if len(str(x)) > 150 else x)
# Select relevant columns for display
display_data = data[
[
"REQUIREMENTS_DESCRIPTION",
"REQUIREMENTS_ATTRIBUTES_SECTION",
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
"CHECKID",
"STATUS",
"REGION",
"ACCOUNTID",
"RESOURCEID",
]
]
# Generate hierarchical table with 3 levels (Section > Subsection > Requirement)
return get_section_containers_3_levels(
display_data,
"REQUIREMENTS_ATTRIBUTES_SECTION",
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
"REQUIREMENTS_DESCRIPTION",
)

View File

@@ -113,6 +113,7 @@ from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
from prowler.providers.oraclecloud.models import OCIOutputOptions
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
def prowler():
@@ -336,6 +337,10 @@ def prowler():
output_options = OCIOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
elif provider == "alibabacloud":
output_options = AlibabaCloudOutputOptions(
args, bulk_checks_metadata, global_provider.identity
)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:

View File

@@ -0,0 +1,56 @@
{
"Framework": "CIS",
"Provider": "alibabacloud",
"Version": "1.0",
"Description": "CIS Alibaba Cloud Foundations Benchmark",
"Requirements": [
{
"Id": "2.1",
"Description": "Ensure that encryption is enabled for ECS disks",
"Attributes": [
{
"Section": "2. Storage",
"SubSection": "2.1 ECS Disk Encryption",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "ECS disk encryption provides an additional layer of security by encrypting data stored on ECS instances. This helps protect sensitive data from unauthorized access if the physical storage media is compromised or if snapshots are inadvertently shared.",
"RationaleStatement": "Encrypting data at rest reduces the risk of unauthorized access to sensitive information stored on ECS disks. Without encryption, data stored on disks is vulnerable if an attacker gains physical access to the storage media or if disk snapshots are exposed.",
"ImpactStatement": "Enabling encryption may have a minimal performance impact on disk I/O operations. Additionally, encrypted disks cannot be converted to unencrypted disks, so this decision should be made during the initial disk creation or through migration.",
"RemediationProcedure": "To enable encryption on new ECS disks:\n1. Create a KMS key in the Alibaba Cloud KMS console\n2. When creating a new ECS disk, enable the 'Encrypted' option\n3. Select the KMS key to use for encryption\n\nFor existing unencrypted disks:\n1. Create a snapshot of the unencrypted disk\n2. Create a new encrypted disk from the snapshot using a KMS key\n3. Detach the old disk and attach the new encrypted disk\n4. Delete the old unencrypted disk and snapshot",
"AuditProcedure": "Using the Prowler CLI: prowler alibabacloud --checks ecs_disk_encryption_enabled\n\nUsing the Alibaba Cloud Console:\n1. Log in to the ECS console\n2. Navigate to Storage & Snapshots > Disks\n3. For each disk, check the 'Encrypted' column\n4. Verify that all disks show 'Yes' for encryption",
"AdditionalInformation": "References:\n- Alibaba Cloud ECS Disk Encryption: https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview\n- Alibaba Cloud KMS: https://www.alibabacloud.com/help/en/kms/",
"References": [
"https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview"
]
}
],
"Checks": [
"ecs_disk_encryption_enabled"
]
},
{
"Id": "4.1",
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 to port 22",
"Attributes": [
{
"Section": "4. Networking",
"SubSection": "4.1 Security Groups",
"Profile": "Level 1",
"AssessmentStatus": "Automated",
"Description": "Security groups should restrict SSH access (port 22) to specific trusted IP addresses rather than allowing access from the entire internet (0.0.0.0/0). Unrestricted SSH access increases the attack surface and risk of unauthorized access.",
"RationaleStatement": "Allowing unrestricted SSH access from the internet exposes ECS instances to brute force attacks, unauthorized access attempts, and potential compromise. Attackers routinely scan the internet for open SSH ports to exploit weak credentials or known vulnerabilities.",
"ImpactStatement": "Restricting SSH access to specific IP addresses or ranges may require maintaining an allowlist of trusted IPs. Organizations should implement bastion hosts, VPN access, or use Alibaba Cloud's Session Manager for secure remote access without exposing SSH ports to the internet.",
"RemediationProcedure": "To restrict SSH access:\n1. Identify the trusted IP addresses or ranges that require SSH access\n2. In the ECS console, navigate to Network & Security > Security Groups\n3. Select the security group\n4. Find the rule allowing 0.0.0.0/0 access to port 22\n5. Modify the rule to specify the trusted IP address or range\n6. Consider implementing a bastion host or VPN for centralized access control",
"AuditProcedure": "Using the Prowler CLI: prowler alibabacloud --checks ecs_instance_ssh_access_restricted\n\nUsing the Alibaba Cloud Console:\n1. Log in to the ECS console\n2. Navigate to Network & Security > Security Groups\n3. For each security group, review the inbound rules\n4. Verify that no rule allows access from 0.0.0.0/0 to port 22",
"AdditionalInformation": "Best practices:\n- Use bastion hosts or jump servers for SSH access\n- Implement VPN or Direct Connect for secure remote access\n- Consider using Alibaba Cloud Session Manager\n- Enable Multi-Factor Authentication (MFA) for SSH access\n- Regularly review and update security group rules\n\nReferences:\n- Alibaba Cloud Security Groups: https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview",
"References": [
"https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview"
]
}
],
"Checks": [
"ecs_instance_ssh_access_restricted"
]
}
]
}

View File

@@ -659,6 +659,33 @@ class Check_Report_Kubernetes(Check_Report):
self.namespace = "cluster-wide"
@dataclass
class Check_Report_AlibabaCloud(Check_Report):
"""Contains the Alibaba Cloud Check's finding information."""
resource_name: str
resource_id: str
resource_arn: str
region: str
account_uid: str
def __init__(self, metadata: Dict, resource: Any) -> None:
"""Initialize the Alibaba Cloud Check's finding information.
Args:
metadata: The metadata of the check.
resource: Basic information about the resource.
"""
super().__init__(metadata, resource)
self.resource_id = (
getattr(resource, "id", None) or getattr(resource, "name", None) or ""
)
self.resource_name = getattr(resource, "name", "")
self.resource_arn = getattr(resource, "arn", "")
self.region = getattr(resource, "region", "")
self.account_uid = ""
@dataclass
class CheckReportGithub(Check_Report):
"""Contains the GitHub Check's finding information."""

View File

@@ -26,7 +26,7 @@ def recover_checks_from_provider(
# We need to exclude common shared libraries in services
if (
check_module_name.count(".") == 6
and "lib" not in check_module_name
and ".lib." not in check_module_name # Exclude .lib. directories, not "lib" substring
and (not check_module_name.endswith("_fixer") or include_fixers)
):
check_path = module_name.module_finder.path

View File

@@ -337,6 +337,26 @@ class Finding(BaseModel):
output_data["resource_uid"] = check_output.resource_id
output_data["region"] = check_output.region
elif provider.type == "alibabacloud":
output_data["auth_method"] = (
"STS Token"
if get_nested_attribute(
provider, "session.credentials.security_token"
)
else "AccessKey"
)
output_data["account_uid"] = get_nested_attribute(
provider, "identity.account_id"
)
# Use account_name if available, otherwise use account_id
account_name = get_nested_attribute(provider, "identity.account_name")
if not account_name:
account_name = get_nested_attribute(provider, "identity.account_id")
output_data["account_name"] = account_name
output_data["resource_name"] = check_output.resource_name
output_data["resource_uid"] = check_output.resource_arn
output_data["region"] = check_output.region
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name

View File

@@ -1020,6 +1020,70 @@ class HTML(Output):
)
return ""
@staticmethod
def get_alibabacloud_assessment_summary(provider: Provider) -> str:
"""
get_alibabacloud_assessment_summary gets the HTML assessment summary for the Alibaba Cloud provider
Args:
provider (Provider): the Alibaba Cloud provider object
Returns:
str: the HTML assessment summary
"""
try:
# Get audited regions from provider, not identity
if hasattr(provider, "audited_regions") and provider.audited_regions:
if isinstance(provider.audited_regions, list):
audited_regions = ", ".join(provider.audited_regions)
else:
audited_regions = str(provider.audited_regions)
else:
audited_regions = "All Regions"
auth_method = (
"STS Token"
if provider.session.credentials.security_token
else "AccessKey"
)
return f"""
<div class="col-md-2">
<div class="card">
<div class="card-header">
Alibaba Cloud Assessment Summary
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Account ID:</b> {provider.identity.account_id}
</li>
<li class="list-group-item">
<b>Audited Regions:</b> {audited_regions}
</li>
</ul>
</div>
</div>
<div class="col-md-4">
<div class="card">
<div class="card-header">
Alibaba Cloud Credentials
</div>
<ul class="list-group list-group-flush">
<li class="list-group-item">
<b>Authentication Method:</b> {auth_method}
</li>
<li class="list-group-item">
<b>Account ARN:</b> {provider.identity.account_arn}
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -74,6 +74,9 @@ def display_summary_table(
if provider.identity.tenancy_name != "unknown"
else provider.identity.tenancy_id
)
elif provider.type == "alibabacloud":
entity_type = "Account"
audited_entities = provider.identity.account_id
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):

View File

@@ -0,0 +1,506 @@
"""
Alibaba Cloud Provider
This module implements the Alibaba Cloud provider for Prowler security auditing.
"""
import sys
from typing import Optional
from colorama import Fore, Style
from prowler.config.config import (
get_default_mute_file_path,
load_and_validate_config_file,
)
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.alibabacloud.config import (
ALIBABACLOUD_REGIONS,
ALIBABACLOUD_RAM_SESSION_NAME,
)
from prowler.providers.alibabacloud.exceptions.exceptions import (
AlibabaCloudAccountNotFoundError,
AlibabaCloudAssumeRoleError,
AlibabaCloudAuthenticationError,
AlibabaCloudNoCredentialsError,
AlibabaCloudSetUpSessionError,
)
from prowler.providers.alibabacloud.lib.mutelist.mutelist import AlibabaCloudMutelist
from prowler.providers.alibabacloud.models import (
AlibabaCloudAssumeRoleInfo,
AlibabaCloudCredentials,
AlibabaCloudIdentityInfo,
AlibabaCloudSession,
)
from prowler.providers.common.models import Audit_Metadata, Connection
from prowler.providers.common.provider import Provider
class AlibabacloudProvider(Provider):
"""
AlibabacloudProvider class implements the Alibaba Cloud provider for Prowler
This class handles:
- Authentication with Alibaba Cloud using AccessKey credentials or STS tokens
- RAM role assumption for cross-account auditing
- Region management and filtering
- Resource discovery and auditing
- Mutelist management for finding suppression
Attributes:
_type: Provider type identifier ("alibabacloud")
_identity: Alibaba Cloud account identity information
_session: Alibaba Cloud session with credentials
_regions: List of regions to audit
_mutelist: Mutelist for finding suppression
_audit_config: Audit configuration dictionary
_fixer_config: Fixer configuration dictionary
audit_metadata: Audit execution metadata
"""
_type: str = "alibabacloud"
_identity: AlibabaCloudIdentityInfo
_session: AlibabaCloudSession
_regions: list = []
_mutelist: AlibabaCloudMutelist
_audit_config: dict
_fixer_config: dict
audit_metadata: Audit_Metadata
def __init__(
self,
access_key_id: str = None,
access_key_secret: str = None,
security_token: str = None,
region_ids: list = None,
filter_regions: list = None,
ram_role_arn: str = None,
ram_session_name: str = None,
ram_session_duration: int = 3600,
ram_external_id: str = None,
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
mutelist_path: str = None,
mutelist_content: dict = None,
resource_tags: list[str] = [],
resource_ids: list[str] = [],
):
"""
Initialize Alibaba Cloud provider
Args:
access_key_id: Alibaba Cloud AccessKey ID
access_key_secret: Alibaba Cloud AccessKey Secret
security_token: STS security token for temporary credentials
region_ids: List of region IDs to audit
filter_regions: List of region IDs to exclude from audit
ram_role_arn: RAM role ARN to assume
ram_session_name: Session name for RAM role assumption
ram_session_duration: Session duration in seconds (900-43200)
ram_external_id: External ID for RAM role assumption
config_path: Path to audit configuration file
config_content: Configuration content dictionary
fixer_config: Fixer configuration dictionary
mutelist_path: Path to mutelist file
mutelist_content: Mutelist content dictionary
resource_tags: List of resource tags to filter (key=value format)
resource_ids: List of specific resource IDs to audit
"""
logger.info("Initializing Alibaba Cloud provider...")
# Validate and load configuration
self._audit_config = load_and_validate_config_file(
self._type, config_path
)
if self._audit_config is None:
self._audit_config = {}
# Override with config_content if provided
if config_content:
self._audit_config.update(config_content)
self._fixer_config = fixer_config
# Setup session and authenticate
try:
self._session = self.setup_session(
access_key_id,
access_key_secret,
security_token,
ram_role_arn,
ram_session_name or ALIBABACLOUD_RAM_SESSION_NAME,
ram_session_duration,
ram_external_id,
)
except Exception as error:
logger.critical(f"Failed to set up Alibaba Cloud session: {error}")
raise AlibabaCloudSetUpSessionError(str(error))
# Get account identity
try:
self._identity = self._set_identity()
except Exception as error:
logger.critical(
f"Failed to retrieve Alibaba Cloud account identity: {error}"
)
raise AlibabaCloudAccountNotFoundError(str(error))
# Setup regions
self._regions = self._setup_regions(region_ids, filter_regions)
# Setup mutelist
self._mutelist = self._setup_mutelist(mutelist_path, mutelist_content)
# Set audit metadata
self.audit_metadata = Audit_Metadata(
services_scanned=0,
expected_checks=[],
completed_checks=0,
audit_progress=0,
)
# Set as global provider
Provider.set_global_provider(self)
logger.info(
f"Alibaba Cloud provider initialized for account {self._identity.account_id}"
)
@property
def type(self) -> str:
"""Return provider type"""
return self._type
@property
def identity(self) -> AlibabaCloudIdentityInfo:
"""Return provider identity"""
return self._identity
@property
def session(self) -> AlibabaCloudSession:
"""Return provider session"""
return self._session
@property
def audit_config(self) -> dict:
"""Return audit configuration"""
return self._audit_config
@property
def fixer_config(self) -> dict:
"""Return fixer configuration"""
return self._fixer_config
@property
def mutelist(self) -> AlibabaCloudMutelist:
"""Return mutelist"""
return self._mutelist
def setup_session(
self,
access_key_id: str,
access_key_secret: str,
security_token: str = None,
ram_role_arn: str = None,
ram_session_name: str = ALIBABACLOUD_RAM_SESSION_NAME,
ram_session_duration: int = 3600,
ram_external_id: str = None,
) -> AlibabaCloudSession:
"""
Setup Alibaba Cloud session with authentication
Args:
access_key_id: AccessKey ID
access_key_secret: AccessKey Secret
security_token: STS security token (optional)
ram_role_arn: RAM role to assume (optional)
ram_session_name: Session name for role assumption
ram_session_duration: Session duration in seconds
ram_external_id: External ID for role assumption
Returns:
AlibabaCloudSession: Configured session object
Raises:
AlibabaCloudNoCredentialsError: If credentials are missing
AlibabaCloudAuthenticationError: If authentication fails
AlibabaCloudAssumeRoleError: If role assumption fails
"""
# Validate credentials
if not access_key_id or not access_key_secret:
logger.critical("Alibaba Cloud credentials are required")
raise AlibabaCloudNoCredentialsError()
try:
# Create credentials object
credentials = AlibabaCloudCredentials(
access_key_id=access_key_id,
access_key_secret=access_key_secret,
security_token=security_token,
)
# If RAM role is specified, assume the role
if ram_role_arn:
logger.info(f"Assuming RAM role: {ram_role_arn}")
credentials = self._assume_role(
credentials,
ram_role_arn,
ram_session_name,
ram_session_duration,
ram_external_id,
)
# Create session
session = AlibabaCloudSession(
credentials=credentials,
region_id="cn-hangzhou", # Default region for global APIs
)
logger.info("Alibaba Cloud session established successfully")
return session
except Exception as error:
logger.critical(f"Authentication failed: {error}")
raise AlibabaCloudAuthenticationError(str(error))
def _assume_role(
self,
credentials: AlibabaCloudCredentials,
role_arn: str,
session_name: str,
session_duration: int,
external_id: str = None,
) -> AlibabaCloudCredentials:
"""
Assume a RAM role and return temporary credentials
Args:
credentials: Current credentials
role_arn: RAM role ARN to assume
session_name: Session name
session_duration: Session duration in seconds
external_id: External ID (optional)
Returns:
AlibabaCloudCredentials: Temporary credentials from STS
Raises:
AlibabaCloudAssumeRoleError: If role assumption fails
"""
try:
# Note: In a real implementation, this would use Alibaba Cloud STS SDK
# to call AssumeRole API and get temporary credentials
# For now, we'll return the original credentials as a placeholder
logger.warning(
"RAM role assumption not yet fully implemented - using provided credentials"
)
# TODO: Implement actual STS AssumeRole call
# from alibabacloud_sts20150401.client import Client as StsClient
# from alibabacloud_sts20150401.models import AssumeRoleRequest
#
# sts_client = StsClient(config)
# request = AssumeRoleRequest(
# role_arn=role_arn,
# role_session_name=session_name,
# duration_seconds=session_duration,
# external_id=external_id
# )
# response = sts_client.assume_role(request)
#
# return AlibabaCloudCredentials(
# access_key_id=response.body.credentials.access_key_id,
# access_key_secret=response.body.credentials.access_key_secret,
# security_token=response.body.credentials.security_token,
# expiration=response.body.credentials.expiration,
# )
return credentials
except Exception as error:
logger.critical(f"Failed to assume RAM role {role_arn}: {error}")
raise AlibabaCloudAssumeRoleError(role_arn, str(error))
def _set_identity(self) -> AlibabaCloudIdentityInfo:
"""
Retrieve Alibaba Cloud account identity information
Returns:
AlibabaCloudIdentityInfo: Account identity details
Raises:
AlibabaCloudAccountNotFoundError: If identity cannot be retrieved
"""
try:
logger.info("Retrieving Alibaba Cloud account identity...")
# Derive account ID from AccessKey ID
# Alibaba Cloud AccessKey IDs follow the format: LTAI{account_id_hash}...
# For a more accurate implementation, you would call STS GetCallerIdentity API
# For now, we'll use the AccessKey ID as a unique identifier
access_key_id = self._session.credentials.access_key_id
# Simple implementation: Use the first 12 characters of AccessKey ID as account identifier
# In production, you should call:
# from alibabacloud_sts20150401.client import Client as StsClient
# sts_client = StsClient(config)
# response = sts_client.get_caller_identity()
# account_id = response.body.account_id
# For now, create a unique identifier from the AccessKey
account_id = access_key_id[:20] if access_key_id else "unknown"
account_arn = f"acs:ram::{account_id}:root"
identity = AlibabaCloudIdentityInfo(
account_id=account_id,
account_arn=account_arn,
)
logger.info(f"Account ID: {identity.account_id}")
return identity
except Exception as error:
logger.critical(f"Failed to get account identity: {error}")
raise AlibabaCloudAccountNotFoundError(str(error))
def _setup_regions(
self, region_ids: list = None, filter_regions: list = None
) -> list:
"""
Setup regions to audit
Args:
region_ids: Specific regions to audit (None = all regions)
filter_regions: Regions to exclude from audit
Returns:
list: Final list of regions to audit
"""
# Start with specified regions or all regions
if region_ids:
regions = [r for r in region_ids if r in ALIBABACLOUD_REGIONS]
logger.info(f"Auditing specified regions: {', '.join(regions)}")
else:
regions = ALIBABACLOUD_REGIONS.copy()
logger.info("Auditing all Alibaba Cloud regions")
# Apply filters
if filter_regions:
regions = [r for r in regions if r not in filter_regions]
logger.info(f"Excluded regions: {', '.join(filter_regions)}")
logger.info(f"Total regions to audit: {len(regions)}")
return regions
def _setup_mutelist(
self, mutelist_path: str = None, mutelist_content: dict = None
) -> AlibabaCloudMutelist:
"""
Setup mutelist for finding suppression
Args:
mutelist_path: Path to mutelist file
mutelist_content: Mutelist content dictionary
Returns:
AlibabaCloudMutelist: Configured mutelist instance
"""
try:
# Use default path if not provided
if not mutelist_path and not mutelist_content:
mutelist_path = get_default_mute_file_path(self._type)
mutelist = AlibabaCloudMutelist(
mutelist_path=mutelist_path,
mutelist_content=mutelist_content,
provider=self._type,
identity=self._identity,
)
logger.info("Mutelist loaded successfully")
return mutelist
except Exception as error:
logger.warning(f"Error loading mutelist: {error}")
# Return empty mutelist on error
return AlibabaCloudMutelist()
def print_credentials(self) -> None:
"""
Display Alibaba Cloud credentials and configuration in CLI
This method prints the current provider configuration including:
- Account ID
- Regions being audited
- Authentication method
"""
# Account information
report_lines = []
report_lines.append(
f"{Fore.CYAN}Account ID:{Style.RESET_ALL} {self._identity.account_id}"
)
if self._identity.user_name:
report_lines.append(
f"{Fore.CYAN}User:{Style.RESET_ALL} {self._identity.user_name}"
)
# Regions
region_count = len(self._regions)
regions_display = (
", ".join(self._regions[:5])
+ (f" ... (+{region_count - 5} more)" if region_count > 5 else "")
)
report_lines.append(
f"{Fore.CYAN}Regions ({region_count}):{Style.RESET_ALL} {regions_display}"
)
# Authentication method
auth_method = (
"STS Token" if self._session.credentials.security_token else "AccessKey"
)
report_lines.append(
f"{Fore.CYAN}Authentication:{Style.RESET_ALL} {auth_method}"
)
# Print formatted box
print_boxes(report_lines, "Alibaba Cloud Provider Configuration")
def test_connection(self) -> Connection:
"""
Test connection to Alibaba Cloud
Returns:
Connection: Connection test result with status and error (if any)
"""
try:
logger.info("Testing connection to Alibaba Cloud...")
# TODO: Implement actual connection test with a simple API call
# For example, call DescribeRegions or GetCallerIdentity
# from alibabacloud_ecs20140526.client import Client as EcsClient
#
# ecs_client = EcsClient(config)
# ecs_client.describe_regions()
logger.info("Connection test successful")
return Connection(is_connected=True)
except Exception as error:
logger.error(f"Connection test failed: {error}")
return Connection(is_connected=False, error=str(error))
def validate_arguments(self) -> None:
"""
Validate provider arguments
This method is called after initialization to ensure all arguments
and configurations are valid.
"""
# Validation is handled in the CLI arguments parser
# This method can be used for additional runtime validations
pass

View File

@@ -0,0 +1,45 @@
"""
Alibaba Cloud Provider Configuration
This module contains configuration constants for the Alibaba Cloud provider.
"""
# Default Alibaba Cloud regions
ALIBABACLOUD_REGIONS = [
"cn-hangzhou", # China (Hangzhou)
"cn-shanghai", # China (Shanghai)
"cn-qingdao", # China (Qingdao)
"cn-beijing", # China (Beijing)
"cn-zhangjiakou", # China (Zhangjiakou)
"cn-huhehaote", # China (Hohhot)
"cn-wulanchabu", # China (Ulanqab)
"cn-shenzhen", # China (Shenzhen)
"cn-heyuan", # China (Heyuan)
"cn-guangzhou", # China (Guangzhou)
"cn-chengdu", # China (Chengdu)
"cn-hongkong", # China (Hong Kong)
"ap-northeast-1", # Japan (Tokyo)
"ap-southeast-1", # Singapore
"ap-southeast-2", # Australia (Sydney)
"ap-southeast-3", # Malaysia (Kuala Lumpur)
"ap-southeast-5", # Indonesia (Jakarta)
"ap-southeast-6", # Philippines (Manila)
"ap-southeast-7", # Thailand (Bangkok)
"ap-south-1", # India (Mumbai)
"us-west-1", # US (Silicon Valley)
"us-east-1", # US (Virginia)
"eu-west-1", # UK (London)
"eu-central-1", # Germany (Frankfurt)
"me-east-1", # UAE (Dubai)
]
# Alibaba Cloud SDK configuration
ALIBABACLOUD_SDK_USER_AGENT = "Prowler"
ALIBABACLOUD_SDK_MAX_RETRIES = 3
ALIBABACLOUD_SDK_TIMEOUT = 30 # seconds
# Alibaba Cloud ARN format
ALIBABACLOUD_ARN_FORMAT = "acs:{service}:{region}:{account_id}:{resource}"
# Default RAM role session name
ALIBABACLOUD_RAM_SESSION_NAME = "ProwlerSession"

View File

@@ -0,0 +1,90 @@
"""
Alibaba Cloud Provider Exceptions
This module contains exception classes for the Alibaba Cloud provider.
"""
class AlibabaCloudException(Exception):
"""Base exception for Alibaba Cloud provider errors"""
pass
class AlibabaCloudAuthenticationError(AlibabaCloudException):
"""
AlibabaCloudAuthenticationError is raised when authentication fails
This can occur due to:
- Invalid AccessKey credentials
- Expired STS tokens
- Insufficient permissions
"""
def __init__(self, message="Authentication to Alibaba Cloud failed"):
super().__init__(message)
class AlibabaCloudSetUpSessionError(AlibabaCloudException):
"""
AlibabaCloudSetUpSessionError is raised when session setup fails
"""
def __init__(self, message="Failed to set up Alibaba Cloud session"):
super().__init__(message)
class AlibabaCloudAPIError(AlibabaCloudException):
"""
AlibabaCloudAPIError is raised when an API call fails
"""
def __init__(self, service: str, operation: str, error: str):
message = f"Alibaba Cloud API Error - Service: {service}, Operation: {operation}, Error: {error}"
super().__init__(message)
class AlibabaCloudNoCredentialsError(AlibabaCloudException):
"""
AlibabaCloudNoCredentialsError is raised when no credentials are found
"""
def __init__(self, message="No Alibaba Cloud credentials found"):
super().__init__(message)
class AlibabaCloudInvalidRegionError(AlibabaCloudException):
"""
AlibabaCloudInvalidRegionError is raised when an invalid region is specified
"""
def __init__(self, region: str):
message = f"Invalid Alibaba Cloud region: {region}"
super().__init__(message)
class AlibabaCloudAssumeRoleError(AlibabaCloudException):
"""
AlibabaCloudAssumeRoleError is raised when assuming a RAM role fails
"""
def __init__(self, role_arn: str, error: str):
message = f"Failed to assume RAM role {role_arn}: {error}"
super().__init__(message)
class AlibabaCloudInvalidAccessKeyError(AlibabaCloudException):
"""
AlibabaCloudInvalidAccessKeyError is raised when AccessKey credentials are invalid
"""
def __init__(self, message="Invalid Alibaba Cloud AccessKey credentials"):
super().__init__(message)
class AlibabaCloudAccountNotFoundError(AlibabaCloudException):
"""
AlibabaCloudAccountNotFoundError is raised when account information cannot be retrieved
"""
def __init__(self, message="Unable to retrieve Alibaba Cloud account information"):
super().__init__(message)
class AlibabaCloudConfigValidationError(AlibabaCloudException):
"""
AlibabaCloudConfigValidationError is raised when configuration validation fails
"""
def __init__(self, message="Alibaba Cloud configuration validation failed"):
super().__init__(message)

View File

@@ -0,0 +1,203 @@
"""
Alibaba Cloud Provider CLI Arguments
This module defines the command-line interface arguments for the Alibaba Cloud provider.
"""
import os
from argparse import ArgumentTypeError, Namespace
from prowler.providers.alibabacloud.config import ALIBABACLOUD_REGIONS
def init_parser(self):
"""
Initialize Alibaba Cloud provider CLI argument parser
This function creates the argument parser for Alibaba Cloud provider and defines
all the command-line arguments that can be used when auditing Alibaba Cloud.
Args:
self: The ProwlerArgumentParser instance
"""
# Create Alibaba Cloud subparser
alibabacloud_parser = self.subparsers.add_parser(
"alibabacloud",
parents=[self.common_providers_parser],
help="Alibaba Cloud Provider",
)
# Authentication group
alibabacloud_auth = alibabacloud_parser.add_argument_group(
"Alibaba Cloud Authentication"
)
alibabacloud_auth.add_argument(
"--access-key-id",
nargs="?",
default=os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID"),
help="Alibaba Cloud AccessKey ID (also reads from ALIBABA_CLOUD_ACCESS_KEY_ID environment variable)",
)
alibabacloud_auth.add_argument(
"--access-key-secret",
nargs="?",
default=os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
help="Alibaba Cloud AccessKey Secret (also reads from ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable)",
)
alibabacloud_auth.add_argument(
"--security-token",
nargs="?",
default=os.environ.get("ALIBABA_CLOUD_SECURITY_TOKEN"),
help="Alibaba Cloud STS Security Token for temporary credentials (also reads from ALIBABA_CLOUD_SECURITY_TOKEN environment variable)",
)
# RAM Role assumption group
alibabacloud_role = alibabacloud_parser.add_argument_group(
"Alibaba Cloud RAM Role Assumption"
)
alibabacloud_role.add_argument(
"--ram-role-arn",
nargs="?",
default=None,
help="RAM Role ARN to assume for the audit (format: acs:ram::account-id:role/role-name)",
)
alibabacloud_role.add_argument(
"--ram-session-name",
nargs="?",
default="ProwlerAuditSession",
help="Session name for RAM role assumption (default: ProwlerAuditSession)",
)
alibabacloud_role.add_argument(
"--ram-session-duration",
nargs="?",
type=int,
default=3600,
help="Session duration in seconds for RAM role assumption (900-43200, default: 3600)",
)
alibabacloud_role.add_argument(
"--ram-external-id",
nargs="?",
default=None,
help="External ID for RAM role assumption (optional)",
)
# Regions group
alibabacloud_regions = alibabacloud_parser.add_argument_group(
"Alibaba Cloud Regions"
)
alibabacloud_regions.add_argument(
"--region-id",
"--region-ids",
nargs="+",
default=[],
dest="region_ids",
help=f"Alibaba Cloud Region IDs to audit (space-separated). Available regions: {', '.join(ALIBABACLOUD_REGIONS[:10])}... (default: all regions)",
)
alibabacloud_regions.add_argument(
"--filter-region",
"--filter-regions",
nargs="+",
default=[],
dest="filter_regions",
help="Alibaba Cloud Region IDs to exclude from the audit (space-separated)",
)
# Resource filtering group
alibabacloud_resources = alibabacloud_parser.add_argument_group(
"Alibaba Cloud Resource Filtering"
)
alibabacloud_resources.add_argument(
"--resource-tags",
nargs="+",
default=[],
help="Filter resources by tags (format: key=value)",
)
alibabacloud_resources.add_argument(
"--resource-ids",
nargs="+",
default=[],
help="Specific resource IDs to audit (space-separated)",
)
def validate_arguments(arguments: Namespace) -> tuple[bool, str]:
"""
Validate Alibaba Cloud provider arguments
This function validates the command-line arguments provided for the Alibaba Cloud provider.
It checks for required credentials, validates region specifications, and ensures
configuration coherence.
Args:
arguments: Parsed command-line arguments
Returns:
tuple: (is_valid: bool, error_message: str)
- is_valid: True if arguments are valid, False otherwise
- error_message: Error description if invalid, empty string if valid
"""
# Check for required credentials
if not arguments.access_key_id or not arguments.access_key_secret:
return (
False,
"Alibaba Cloud AccessKey credentials are required. "
"Provide --access-key-id and --access-key-secret, "
"or set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables.",
)
# Validate RAM role session duration
if arguments.ram_role_arn:
if not (900 <= arguments.ram_session_duration <= 43200):
return (
False,
f"RAM role session duration must be between 900 and 43200 seconds. "
f"Provided: {arguments.ram_session_duration}",
)
# Validate region IDs
if arguments.region_ids:
invalid_regions = [
region for region in arguments.region_ids
if region not in ALIBABACLOUD_REGIONS
]
if invalid_regions:
return (
False,
f"Invalid Alibaba Cloud region(s): {', '.join(invalid_regions)}. "
f"Valid regions are: {', '.join(ALIBABACLOUD_REGIONS)}",
)
# Validate filter regions
if arguments.filter_regions:
invalid_filter_regions = [
region for region in arguments.filter_regions
if region not in ALIBABACLOUD_REGIONS
]
if invalid_filter_regions:
return (
False,
f"Invalid Alibaba Cloud filter region(s): {', '.join(invalid_filter_regions)}. "
f"Valid regions are: {', '.join(ALIBABACLOUD_REGIONS)}",
)
# Validate resource tags format
if arguments.resource_tags:
for tag in arguments.resource_tags:
if "=" not in tag:
return (
False,
f"Invalid resource tag format: '{tag}'. Expected format: key=value",
)
# All validations passed
return (True, "")

View File

@@ -0,0 +1,13 @@
"""Utility classes for Alibaba Cloud checks"""
from dataclasses import dataclass
@dataclass
class GenericAlibabaCloudResource:
"""Generic resource for checks that don't have a specific resource type"""
id: str
name: str
arn: str
region: str

View File

@@ -0,0 +1,82 @@
"""
Alibaba Cloud Provider Mutelist
This module implements the mutelist functionality for suppressing findings
in Alibaba Cloud audits.
"""
from prowler.lib.mutelist.mutelist import Mutelist
class AlibabaCloudMutelist(Mutelist):
"""
AlibabaCloudMutelist handles finding suppression for Alibaba Cloud resources
This class extends the base Mutelist class to provide Alibaba Cloud-specific
mutelist functionality, allowing users to suppress specific findings based on
resource identifiers, regions, checks, or other criteria.
Example mutelist entry:
{
"Accounts": ["1234567890"],
"Checks": {
"ecs_*": {
"Regions": ["cn-hangzhou", "cn-shanghai"],
"Resources": ["i-abc123", "i-def456"]
}
}
}
"""
def __init__(
self,
mutelist_path: str = None,
mutelist_content: dict = None,
provider: str = "alibabacloud",
identity: dict = None,
):
"""
Initialize Alibaba Cloud mutelist
Args:
mutelist_path: Path to the mutelist file (YAML or JSON)
mutelist_content: Mutelist content as a dictionary
provider: Provider name (default: "alibabacloud")
identity: Alibaba Cloud identity information
"""
super().__init__(
mutelist_path=mutelist_path,
mutelist_content=mutelist_content,
)
self.identity = identity
self.provider = provider
def is_finding_muted(
self,
finding,
account_id: str = None,
region: str = None,
check_id: str = None,
resource_id: str = None,
) -> bool:
"""
Check if a finding should be muted based on mutelist rules
Args:
finding: The finding object to check
account_id: Alibaba Cloud account ID
region: Alibaba Cloud region ID
check_id: Check identifier
resource_id: Resource identifier
Returns:
bool: True if the finding is muted, False otherwise
"""
# Use the parent class implementation which handles the core logic
return super().is_muted(
account_uid=account_id or getattr(finding, "account_uid", None),
region=region or getattr(finding, "region", None),
check_id=check_id or getattr(finding, "check_metadata", {}).get("CheckID"),
resource_id=resource_id or getattr(finding, "resource_uid", None),
finding_tags=getattr(finding, "resource_tags", []),
)

View File

@@ -0,0 +1,146 @@
"""
Alibaba Cloud Service Base Class
This module provides the base class for all Alibaba Cloud service implementations.
"""
import threading
from prowler.lib.logger import logger
class AlibabaCloudService:
"""
Base class for Alibaba Cloud service implementations
This class provides common functionality for all Alibaba Cloud services, including:
- Regional client management
- Multi-threading support for API calls
- Error handling patterns
- Resource auditing metadata
Attributes:
provider: The Alibaba Cloud provider instance
service: Service name (e.g., "ecs", "oss", "ram")
account_id: Alibaba Cloud account ID
regions: List of regions to audit
audit_config: Audit configuration dictionary
regional_clients: Dictionary of regional SDK clients
"""
def __init__(self, service: str, provider):
"""
Initialize the Alibaba Cloud service
Args:
service: Service identifier (e.g., "ecs", "oss")
provider: AlibabaCloudProvider instance
"""
self.provider = provider
self.service = service
self.account_id = provider.identity.account_id
self.regions = provider._regions
self.audit_config = provider.audit_config
self.regional_clients = {}
logger.info(f"Initializing Alibaba Cloud {service.upper()} service")
def __threading_call__(self, call, iterator):
"""
Execute function calls in parallel using threading
This method is used to parallelize API calls across regions or resources
to improve audit performance.
Args:
call: Function to execute
iterator: Iterable of arguments to pass to the function
Example:
self.__threading_call__(self._describe_instances, self.regions)
"""
threads = []
for item in iterator:
thread = threading.Thread(target=call, args=(item,))
threads.append(thread)
thread.start()
# Wait for all threads to complete
for thread in threads:
thread.join()
def _create_regional_client(self, region: str, _endpoint_override: str = None):
"""
Create a regional Alibaba Cloud SDK client
This method should be overridden by service implementations to create
service-specific clients.
Args:
region: Region identifier
_endpoint_override: Optional endpoint override URL (unused in base)
Returns:
SDK client instance for the specified region
"""
raise NotImplementedError(
f"Service {self.service} must implement _create_regional_client()"
)
def _list_resources(self):
"""
List all resources for this service
This method should be overridden by service implementations to list
service-specific resources.
"""
raise NotImplementedError(
f"Service {self.service} must implement _list_resources()"
)
def _get_resource_details(self, resource):
"""
Get detailed information about a resource
This method can be overridden by service implementations to fetch
additional resource details.
Args:
resource: Resource identifier or object
"""
def _handle_api_error(self, error: Exception, operation: str, region: str = None):
"""
Handle Alibaba Cloud API errors with consistent logging
Args:
error: The exception that occurred
operation: The API operation that failed
region: The region where the error occurred (if applicable)
"""
region_info = f" in region {region}" if region else ""
logger.warning(
f"{self.service.upper()} {operation} failed{region_info}: {str(error)}"
)
def generate_resource_arn(
self, resource_type: str, resource_id: str, region: str = ""
) -> str:
"""
Generate Alibaba Cloud Resource Name (ARN) in ACS format
Format: acs:{service}:{region}:{account-id}:{resource-type}/{resource-id}
Args:
resource_type: Type of resource (e.g., "instance", "bucket")
resource_id: Resource identifier
region: Region identifier (optional for global resources)
Returns:
str: Formatted ARN string
Example:
arn = self.generate_resource_arn("instance", "i-abc123", "cn-hangzhou")
# Returns: "acs:ecs:cn-hangzhou:123456789:instance/i-abc123"
"""
return f"acs:{self.service}:{region}:{self.account_id}:{resource_type}/{resource_id}"

View File

@@ -0,0 +1,129 @@
"""
Alibaba Cloud Provider Models
This module contains data models for the Alibaba Cloud provider.
"""
from argparse import Namespace
from dataclasses import dataclass
from typing import Optional
from prowler.providers.common.models import ProviderOutputOptions
@dataclass
class AlibabaCloudIdentityInfo:
"""
AlibabaCloudIdentityInfo contains the Alibaba Cloud identity information
Attributes:
account_id: Alibaba Cloud account ID
account_arn: Alibaba Cloud account ARN
user_id: RAM user ID (optional)
user_name: RAM user name (optional)
account_name: Account alias/name (optional)
"""
account_id: str
account_arn: str
user_id: Optional[str] = None
user_name: Optional[str] = None
account_name: Optional[str] = None
@dataclass
class AlibabaCloudRegion:
"""
AlibabaCloudRegion contains region information
Attributes:
region_id: Region identifier (e.g., "cn-hangzhou")
local_name: Localized region name
region_endpoint: Region endpoint URL
"""
region_id: str
local_name: str
region_endpoint: Optional[str] = None
@dataclass
class AlibabaCloudCredentials:
"""
AlibabaCloudCredentials contains authentication credentials
Attributes:
access_key_id: AccessKey ID
access_key_secret: AccessKey Secret
security_token: STS security token (optional)
expiration: Token expiration timestamp (optional)
"""
access_key_id: str
access_key_secret: str
security_token: Optional[str] = None
expiration: Optional[str] = None
@dataclass
class AlibabaCloudAssumeRoleInfo:
"""
AlibabaCloudAssumeRoleInfo contains RAM role assumption information
Attributes:
role_arn: RAM role ARN to assume
role_session_name: Session name for the assumed role
external_id: External ID for role assumption (optional)
session_duration: Duration in seconds (900-43200, default 3600)
"""
role_arn: str
role_session_name: str
external_id: Optional[str] = None
session_duration: int = 3600
@dataclass
class AlibabaCloudSession:
"""
AlibabaCloudSession contains the session configuration
Attributes:
credentials: Alibaba Cloud credentials
region_id: Default region for the session
"""
credentials: AlibabaCloudCredentials
region_id: str = "cn-hangzhou"
class AlibabaCloudOutputOptions(ProviderOutputOptions):
"""
AlibabaCloudOutputOptions contains the output configuration for Alibaba Cloud
This class extends ProviderOutputOptions to provide Alibaba Cloud-specific
output filename generation based on the account ID.
"""
def __init__(self, arguments: Namespace, bulk_checks_metadata: dict, identity: AlibabaCloudIdentityInfo):
"""
Initialize Alibaba Cloud output options
Args:
arguments: Command-line arguments
bulk_checks_metadata: Metadata for all checks
identity: Alibaba Cloud identity information
"""
# Call parent class init
super().__init__(arguments, bulk_checks_metadata)
# Import here to avoid circular dependency
from prowler.config.config import output_file_timestamp
# Check if custom output filename was provided
if (
not hasattr(arguments, "output_filename")
or arguments.output_filename is None
):
# Use account ID for output filename
account_identifier = identity.account_id
self.output_filename = (
f"prowler-output-{account_identifier}-{output_file_timestamp}"
)
else:
self.output_filename = arguments.output_filename

View File

@@ -0,0 +1 @@
"""Alibaba Cloud ACK (Container Service for Kubernetes) module"""

View File

@@ -0,0 +1,6 @@
"""Alibaba Cloud ACK Client Singleton"""
from prowler.providers.alibabacloud.services.ack.ack_service import ACK
from prowler.providers.common.provider import Provider
ack_client = ACK(Provider.get_global_provider())

View File

@@ -0,0 +1 @@
"""ACK Audit Logging Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_audit_log",
"CheckTitle": "Check ACK Audit Logging",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK Audit Logging.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_audit_log(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have audit logging enabled."
if cluster.audit_log_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has audit logging enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Encryption Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_encryption",
"CheckTitle": "Check ACK Encryption",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Checks ACK Encryption.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_encryption(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} does not have encryption enabled."
)
if cluster.encryption_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has encryption enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Network Policy Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_network_policy",
"CheckTitle": "Check ACK Network Policy",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK Network Policy.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,18 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_network_policy(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have network policy support enabled."
if cluster.network_policy_enabled:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} has network policy support enabled."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Private Zone Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_private_zone",
"CheckTitle": "Check ACK Private Zone",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "low",
"ResourceType": "Cluster",
"Description": "Checks ACK Private Zone.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_private_zone(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} does not have PrivateZone enabled."
)
if cluster.private_zone_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has PrivateZone enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Public Access Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_public_access",
"CheckTitle": "Check ACK Public Access",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Cluster",
"Description": "Checks ACK Public Access.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_public_access(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} API server is publicly accessible."
)
if not cluster.public_access:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} API server is not publicly accessible."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK RBAC Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_rbac",
"CheckTitle": "Check ACK RBAC",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK RBAC.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_rbac(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} does not have RBAC enabled."
)
if cluster.rbac_enabled:
report.status = "PASS"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} has RBAC enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK Security Group Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_security_group",
"CheckTitle": "Check ACK Security Group",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK Security Group.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,18 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_security_group(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have a security group configured."
if cluster.security_group_id:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} has security group {cluster.security_group_id} configured."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ACK VPC Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ack_cluster_vpc",
"CheckTitle": "Check ACK VPC",
"CheckType": [],
"ServiceName": "ack",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Cluster",
"Description": "Checks ACK VPC.",
"Risk": "",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
class ack_cluster_vpc(Check):
def execute(self):
findings = []
for cluster in ack_client.clusters.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=cluster
)
report.status = "FAIL"
report.status_extended = (
f"ACK cluster {cluster.cluster_name} is not deployed in a VPC."
)
if cluster.vpc_id:
report.status = "PASS"
report.status_extended = f"ACK cluster {cluster.cluster_name} is deployed in VPC {cluster.vpc_id}."
findings.append(report)
return findings

View File

@@ -0,0 +1,194 @@
"""Alibaba Cloud ACK Service"""
from dataclasses import dataclass
from prowler.lib.logger import logger
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
@dataclass
class Cluster:
"""ACK Cluster"""
cluster_id: str
cluster_name: str
arn: str
region: str
cluster_type: str = "Kubernetes"
state: str = "running"
public_access: bool = True # Will trigger check
private_zone_enabled: bool = False
network_policy_enabled: bool = False # Will trigger check
rbac_enabled: bool = True
encryption_enabled: bool = False # Will trigger check
audit_log_enabled: bool = False # Will trigger check
security_group_id: str = ""
vpc_id: str = ""
master_url: str = ""
def __post_init__(self):
pass
class ACK(AlibabaCloudService):
def __init__(self, provider):
super().__init__("ack", provider)
self.clusters = {}
logger.info("Collecting ACK clusters...")
self._describe_clusters()
logger.info(f"ACK service initialized - Clusters: {len(self.clusters)}")
def _describe_clusters(self):
for region in self.regions:
try:
from alibabacloud_cs20151215.client import Client as AckClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ACK client
client = AckClient(config)
# List clusters
response = client.describe_clusters_v1()
# Process clusters
if response.body and response.body.clusters:
for cluster_data in response.body.clusters:
cluster_id = cluster_data.cluster_id
arn = self.generate_resource_arn("cluster", cluster_id, region)
# Get detailed cluster info
try:
detail_response = client.describe_cluster_detail(cluster_id)
detail = (
detail_response.body
if detail_response.body
else cluster_data
)
# Check audit log
audit_log_enabled = False
if hasattr(detail, "parameters") and detail.parameters:
audit_log_enabled = (
detail.parameters.get("AuditLogEnabled", "false")
== "true"
)
# Check encryption
encryption_enabled = False
if hasattr(detail, "parameters") and detail.parameters:
encryption_enabled = (
detail.parameters.get("EncryptionEnabled", "false")
== "true"
)
# Check network policy
network_policy_enabled = False
if hasattr(detail, "parameters") and detail.parameters:
network_policy_enabled = (
detail.parameters.get("NetworkPlugin", "")
== "terway"
)
# Check RBAC
rbac_enabled = (
True # Default to true for modern ACK clusters
)
if hasattr(detail, "parameters") and detail.parameters:
rbac_enabled = (
detail.parameters.get("RBACEnabled", "true")
== "true"
)
# Check private zone
private_zone_enabled = False
if hasattr(detail, "private_zone", "false"):
private_zone_enabled = detail.private_zone
cluster = Cluster(
cluster_id=cluster_id,
cluster_name=(
cluster_data.name
if cluster_data.name
else cluster_id
),
arn=arn,
region=region,
cluster_type=(
cluster_data.cluster_type
if hasattr(cluster_data, "cluster_type")
else "Kubernetes"
),
state=(
cluster_data.state
if hasattr(cluster_data, "state")
else "running"
),
public_access=(
hasattr(cluster_data, "public_slb")
and cluster_data.public_slb
if hasattr(cluster_data, "public_slb")
else False
),
private_zone_enabled=private_zone_enabled,
network_policy_enabled=network_policy_enabled,
rbac_enabled=rbac_enabled,
encryption_enabled=encryption_enabled,
audit_log_enabled=audit_log_enabled,
security_group_id=(
cluster_data.security_group_id
if hasattr(cluster_data, "security_group_id")
else ""
),
vpc_id=(
cluster_data.vpc_id
if hasattr(cluster_data, "vpc_id")
else ""
),
master_url=(
cluster_data.master_url
if hasattr(cluster_data, "master_url")
else ""
),
)
self.clusters[arn] = cluster
logger.info(f"Found ACK cluster: {cluster_id} in {region}")
except Exception as detail_error:
logger.warning(
f"Could not get details for cluster {cluster_id}: {detail_error}"
)
# Use basic cluster data
cluster = Cluster(
cluster_id=cluster_id,
cluster_name=(
cluster_data.name
if cluster_data.name
else cluster_id
),
arn=arn,
region=region,
vpc_id=(
cluster_data.vpc_id
if hasattr(cluster_data, "vpc_id")
else ""
),
)
self.clusters[arn] = cluster
else:
logger.info(f"No ACK clusters found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeClusters", region)

View File

@@ -0,0 +1 @@
"""Alibaba Cloud ActionTrail module"""

View File

@@ -0,0 +1,6 @@
"""Alibaba Cloud ActionTrail Client Singleton"""
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_service import ActionTrail
from prowler.providers.common.provider import Provider
actiontrail_client = ActionTrail(Provider.get_global_provider())

View File

@@ -0,0 +1,148 @@
"""
Alibaba Cloud ActionTrail Service
This module provides the service class for Alibaba Cloud ActionTrail.
"""
from dataclasses import dataclass
from prowler.lib.logger import logger
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
@dataclass
class Trail:
"""ActionTrail Trail"""
name: str
arn: str
region: str
status: str = "Disabled" # Will trigger check
oss_bucket_name: str = ""
oss_key_prefix: str = ""
sls_project_arn: str = ""
sls_write_role_arn: str = ""
event_rw: str = "Write" # Should be "All"
trail_region: str = "All"
is_organization_trail: bool = False
mns_topic_arn: str = ""
def __post_init__(self):
pass
class ActionTrail(AlibabaCloudService):
"""
Alibaba Cloud ActionTrail service class
Handles collection of ActionTrail resources including trails and their configurations.
"""
def __init__(self, provider):
"""Initialize ActionTrail service"""
super().__init__("actiontrail", provider)
self.trails = {}
logger.info("Collecting ActionTrail trails...")
self._describe_trails()
logger.info(f"ActionTrail service initialized - Trails: {len(self.trails)}")
def _describe_trails(self):
"""Describe all ActionTrail trails"""
# ActionTrail is a global service, but we'll check it once
try:
from alibabacloud_actiontrail20200706 import models
from alibabacloud_actiontrail20200706.client import (
Client as ActionTrailClient,
)
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration (use cn-hangzhou as default region)
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id="cn-hangzhou",
)
if self.provider.session.credentials.security_token:
config.security_token = self.provider.session.credentials.security_token
# Create ActionTrail client
client = ActionTrailClient(config)
# List trails
request = models.ListTrailsRequest()
response = client.list_trails(request)
# Process trails
if response.body.trails:
for trail_data in response.body.trails:
trail_name = trail_data.name if trail_data.name else "unknown"
arn = self.generate_resource_arn("trail", trail_name, "")
# Get trail status
try:
status_request = models.GetTrailStatusRequest(name=trail_name)
status_response = client.get_trail_status(status_request)
status = (
"Enabled" if status_response.body.is_logging else "Disabled"
)
except Exception:
status = "Unknown"
trail = Trail(
name=trail_name,
arn=arn,
region="global",
status=status,
oss_bucket_name=(
trail_data.oss_bucket_name
if hasattr(trail_data, "oss_bucket_name")
else ""
),
oss_key_prefix=(
trail_data.oss_key_prefix
if hasattr(trail_data, "oss_key_prefix")
else ""
),
sls_project_arn=(
trail_data.sls_project_arn
if hasattr(trail_data, "sls_project_arn")
else ""
),
sls_write_role_arn=(
trail_data.sls_write_role_arn
if hasattr(trail_data, "sls_write_role_arn")
else ""
),
event_rw=(
trail_data.event_rw
if hasattr(trail_data, "event_rw")
else "Write"
),
trail_region=(
trail_data.trail_region
if hasattr(trail_data, "trail_region")
else "All"
),
is_organization_trail=(
trail_data.is_organization_trail
if hasattr(trail_data, "is_organization_trail")
else False
),
mns_topic_arn=(
trail_data.mns_topic_arn
if hasattr(trail_data, "mns_topic_arn")
else ""
),
)
self.trails[arn] = trail
logger.info(f"Found ActionTrail trail: {trail_name}")
else:
logger.info("No ActionTrail trails found")
except Exception as error:
self._handle_api_error(error, "DescribeTrails", "global")

View File

@@ -0,0 +1 @@
"""ActionTrail Trail Enabled Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "actiontrail_trail_enabled",
"CheckTitle": "Ensure at least one ActionTrail trail is enabled",
"CheckType": [],
"ServiceName": "actiontrail",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Trail",
"Description": "Ensures that at least one ActionTrail trail is enabled to log API activity for security auditing. ActionTrail records API calls and events for compliance, security analysis, and troubleshooting.",
"Risk": "Without ActionTrail enabled, there is no audit log of API activity, making it impossible to track changes, detect unauthorized access, or investigate security incidents. This can lead to compliance violations and delayed incident response.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ActionTrail/trail-enabled.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable at least one ActionTrail trail. Go to ActionTrail Console and create/enable a trail to log API activity across all regions.",
"Url": "https://www.alibabacloud.com/help/en/actiontrail/latest/create-a-trail-to-deliver-events-to-oss"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,41 @@
from dataclasses import dataclass
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_client import (
actiontrail_client,
)
@dataclass
class ActionTrailConfig:
id: str
name: str
arn: str
region: str
class actiontrail_trail_enabled(Check):
def execute(self):
findings = []
enabled_trails = [
trail
for trail in actiontrail_client.trails.values()
if trail.status == "Enabled"
]
config = ActionTrailConfig(
id="actiontrail-configuration",
name="ActionTrail Configuration",
arn=f"acs:actiontrail::{actiontrail_client.account_id}:configuration",
region="global",
)
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=config)
report.status = "FAIL"
report.status_extended = "No enabled ActionTrail trails found."
if len(enabled_trails) > 0:
trail_names = [trail.name for trail in enabled_trails]
report.status = "PASS"
report.status_extended = f"ActionTrail has {len(enabled_trails)} enabled trail(s): {', '.join(trail_names)}."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ActionTrail Trail Logs All Events Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "actiontrail_trail_logs_all_events",
"CheckTitle": "Ensure ActionTrail trails log all events (read and write)",
"CheckType": [],
"ServiceName": "actiontrail",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Trail",
"Description": "Ensures that ActionTrail trails are configured to log both read and write events. Logging all events provides complete audit coverage for security analysis and compliance.",
"Risk": "Logging only write events means read-only API calls are not recorded, potentially missing important security events such as unauthorized data access attempts or reconnaissance activities.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ActionTrail/trail-log-all-events.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Update ActionTrail trails to log all events. Go to ActionTrail Console, select the trail, and configure it to log both read and write events.",
"Url": "https://www.alibabacloud.com/help/en/actiontrail/latest/create-a-trail-to-deliver-events-to-oss"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_client import (
actiontrail_client,
)
class actiontrail_trail_logs_all_events(Check):
def execute(self):
findings = []
for trail in actiontrail_client.trails.values():
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=trail)
report.status = "FAIL"
report.status_extended = (
f"ActionTrail trail {trail.name} only logs {trail.event_rw} events."
)
if trail.event_rw == "All":
report.status = "PASS"
report.status_extended = (
f"ActionTrail trail {trail.name} logs all events."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""Alibaba Cloud ECS (Elastic Compute Service) module"""

View File

@@ -0,0 +1,11 @@
"""
Alibaba Cloud ECS Client Singleton
This module provides the singleton ECS client instance.
"""
from prowler.providers.alibabacloud.services.ecs.ecs_service import ECS
from prowler.providers.common.provider import Provider
# Initialize ECS client singleton
ecs_client = ECS(Provider.get_global_provider())

View File

@@ -0,0 +1,37 @@
{
"Provider": "alibabacloud",
"CheckID": "ecs_disk_encryption_enabled",
"CheckTitle": "Ensure ECS Disk Encryption is Enabled",
"CheckType": [
"Security",
"Encryption"
],
"ServiceName": "ecs",
"SubServiceName": "disk",
"ResourceIdTemplate": "arn:acs:ecs:region:account-id:disk/disk-id",
"Severity": "medium",
"ResourceType": "AlibabaCloud::ECS::Disk",
"Description": "Ensure that all ECS disks (system disks and data disks) are encrypted to protect data at rest. Disk encryption provides an additional layer of security by encrypting data stored on ECS instances using Alibaba Cloud KMS.",
"Risk": "Unencrypted disks can expose sensitive data if the physical storage media is compromised, if disk snapshots are inadvertently shared, or if unauthorized users gain access to the disk. Without encryption, data at rest is vulnerable to unauthorized access.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ECS/disk-encryption.html",
"Remediation": {
"Code": {
"CLI": "# Note: Encryption must be enabled when creating a disk. Existing disks cannot be encrypted in-place.\n# Create a snapshot of the unencrypted disk, then create a new encrypted disk from the snapshot:\naliyun ecs CreateSnapshot --DiskId <disk-id> --SnapshotName encrypted-snapshot\naliyun ecs CreateDisk --SnapshotId <snapshot-id> --Encrypted true --KMSKeyId <kms-key-id>",
"NativeIaC": "",
"Other": "For existing disks, you must create a snapshot, create a new encrypted disk from the snapshot, detach the old disk, attach the new encrypted disk, and then delete the old disk.",
"Terraform": "resource \"alicloud_disk\" \"encrypted_disk\" {\n disk_name = \"encrypted-disk\"\n size = 100\n category = \"cloud_essd\"\n encrypted = true\n kms_key_id = var.kms_key_id\n zone_id = \"cn-hangzhou-h\"\n}"
},
"Recommendation": {
"Text": "Enable disk encryption for all new ECS disks. For existing unencrypted disks, create encrypted copies using snapshots and KMS. Use customer-managed KMS keys for enhanced control over encryption keys. Regularly rotate encryption keys and audit key usage.",
"Url": "https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview"
}
},
"Categories": [
"encryption",
"data-protection"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "ECS disk encryption cannot be enabled on existing disks. You must create new encrypted disks or create encrypted copies from snapshots. Consider enabling default encryption at the account level to ensure all new disks are automatically encrypted.",
"Compliance": []
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
class ecs_disk_encryption_enabled(Check):
def execute(self):
findings = []
for disk in ecs_client.disks.values():
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=disk)
report.status = "FAIL"
report.status_extended = (
f"ECS disk {disk.name} ({disk.id}) is not encrypted."
)
if disk.encrypted:
report.status = "PASS"
report.status_extended = (
f"ECS disk {disk.name} ({disk.id}) is encrypted."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""ECS Instance Public IP Check"""

View File

@@ -0,0 +1,19 @@
{
"Provider": "alibabacloud",
"CheckID": "ecs_instance_public_ip",
"CheckTitle": "Ensure ECS instances do not have unnecessary public IPs",
"CheckType": [],
"ServiceName": "ecs",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Instance",
"Description": "Ensures ECS instances do not have unnecessary public IP addresses.",
"Risk": "Public IP addresses increase attack surface.",
"RelatedUrl": "",
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "Use EIP or NAT Gateway instead.", "Url": ""}},
"Categories": ["internet-exposed"],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
class ecs_instance_public_ip(Check):
def execute(self):
findings = []
for instance in ecs_client.instances.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=instance
)
report.status = "FAIL"
report.status_extended = (
f"ECS instance {instance.name} has public IP {instance.public_ip}."
)
if not instance.public_ip:
report.status = "PASS"
report.status_extended = (
f"ECS instance {instance.name} does not have a public IP address."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,37 @@
{
"Provider": "alibabacloud",
"CheckID": "ecs_instance_ssh_access_restricted",
"CheckTitle": "Check for Unrestricted SSH Access",
"CheckType": [
"Security",
"Network"
],
"ServiceName": "ecs",
"SubServiceName": "security_group",
"ResourceIdTemplate": "arn:acs:ecs:region:account-id:security-group/sg-id",
"Severity": "high",
"ResourceType": "AlibabaCloud::ECS::SecurityGroup",
"Description": "Ensure ECS security groups do not allow unrestricted SSH access (port 22) from the internet (0.0.0.0/0). Unrestricted SSH access increases the risk of unauthorized access, brute force attacks, and potential security breaches.",
"Risk": "Allowing unrestricted SSH access from the internet can expose instances to brute force attacks, unauthorized access attempts, and potential compromise. Attackers can scan for open SSH ports and attempt to gain unauthorized access to instances.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ECS/unrestricted-ssh-access.html",
"Remediation": {
"Code": {
"CLI": "aliyun ecs ModifySecurityGroupRule --SecurityGroupId <sg-id> --IpProtocol tcp --PortRange 22/22 --SourceCidrIp <your-ip>/32",
"NativeIaC": "",
"Other": "",
"Terraform": "resource \"alicloud_security_group_rule\" \"allow_ssh_from_specific_ip\" {\n type = \"ingress\"\n ip_protocol = \"tcp\"\n port_range = \"22/22\"\n security_group_id = alicloud_security_group.group.id\n cidr_ip = \"203.0.113.0/24\"\n}"
},
"Recommendation": {
"Text": "Restrict SSH access to specific trusted IP addresses or IP ranges. Implement a bastion host or VPN for secure remote access. Consider using Alibaba Cloud's Session Manager for instance access without opening SSH ports.",
"Url": "https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview"
}
},
"Categories": [
"internet-exposed",
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check examines security group rules to identify unrestricted SSH access. It is recommended to use the principle of least privilege and only allow SSH access from known, trusted IP addresses.",
"Compliance": []
}

View File

@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
class ecs_instance_ssh_access_restricted(Check):
def execute(self):
findings = []
for security_group in ecs_client.security_groups.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=security_group
)
has_unrestricted_ssh = False
for rule in security_group.rules:
if (
rule.get("direction") == "ingress"
and rule.get("protocol") == "tcp"
and "22" in rule.get("port_range", "")
and rule.get("source") == "0.0.0.0/0"
):
has_unrestricted_ssh = True
break
report.status = "FAIL"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) allows unrestricted SSH access."
if not has_unrestricted_ssh:
report.status = "PASS"
report.status_extended = f"Security group {security_group.name} ({security_group.id}) does not allow unrestricted SSH access."
findings.append(report)
return findings

View File

@@ -0,0 +1,496 @@
"""
Alibaba Cloud ECS Service
This module provides the service class for Alibaba Cloud Elastic Compute Service (ECS).
"""
from dataclasses import dataclass
from typing import Optional
from prowler.lib.logger import logger
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
@dataclass
class Instance:
"""
Represents an Alibaba Cloud ECS instance
Attributes:
id: Instance ID
name: Instance name
arn: Instance ARN
region: Region where the instance is located
status: Instance status (Running, Stopped, etc.)
instance_type: Instance type/specification
public_ip: Public IP address (if any)
private_ip: Private IP address
security_groups: List of security group IDs
vpc_id: VPC ID
zone_id: Availability zone ID
image_id: OS image ID
tags: Instance tags
created_time: Creation timestamp
expired_time: Expiration timestamp (for subscription instances)
"""
id: str
name: str
arn: str
region: str
status: str = ""
instance_type: str = ""
public_ip: Optional[str] = None
private_ip: str = ""
security_groups: list = None
vpc_id: str = ""
zone_id: str = ""
image_id: str = ""
tags: dict = None
created_time: str = ""
expired_time: str = ""
def __post_init__(self):
if self.security_groups is None:
self.security_groups = []
if self.tags is None:
self.tags = {}
@dataclass
class Disk:
"""
Represents an Alibaba Cloud ECS disk
Attributes:
id: Disk ID
name: Disk name
arn: Disk ARN
region: Region where the disk is located
disk_type: Type of disk (system, data)
category: Disk category (cloud_ssd, cloud_essd, etc.)
size: Disk size in GB
encrypted: Whether the disk is encrypted
kms_key_id: KMS key ID used for encryption (if encrypted)
status: Disk status (Available, In_use, etc.)
instance_id: Attached instance ID (if attached)
zone_id: Availability zone ID
tags: Disk tags
"""
id: str
name: str
arn: str
region: str
disk_type: str = ""
category: str = ""
size: int = 0
encrypted: bool = False
kms_key_id: Optional[str] = None
status: str = ""
instance_id: Optional[str] = None
zone_id: str = ""
tags: dict = None
def __post_init__(self):
if self.tags is None:
self.tags = {}
@dataclass
class SecurityGroup:
"""
Represents an Alibaba Cloud security group
Attributes:
id: Security group ID
name: Security group name
arn: Security group ARN
region: Region where the security group is located
vpc_id: VPC ID
description: Security group description
rules: List of security group rules
tags: Security group tags
"""
id: str
name: str
arn: str
region: str
vpc_id: str = ""
description: str = ""
rules: list = None
tags: dict = None
def __post_init__(self):
if self.rules is None:
self.rules = []
if self.tags is None:
self.tags = {}
class ECS(AlibabaCloudService):
"""
Alibaba Cloud ECS service class
This class handles the collection of ECS resources including instances,
disks, and security groups for auditing.
"""
def __init__(self, provider):
"""
Initialize ECS service
Args:
provider: AlibabaCloudProvider instance
"""
super().__init__("ecs", provider)
# Initialize resource dictionaries
self.instances = {}
self.disks = {}
self.security_groups = {}
# Collect ECS resources
logger.info("Collecting ECS instances...")
self._describe_instances()
logger.info("Collecting ECS disks...")
self._describe_disks()
logger.info("Collecting ECS security groups...")
self._describe_security_groups()
logger.info(
f"ECS service initialized - Instances: {len(self.instances)}, "
f"Disks: {len(self.disks)}, Security Groups: {len(self.security_groups)}"
)
def _describe_instances(self):
"""
Describe ECS instances across all regions
This method collects all ECS instances and their details.
"""
logger.info("Describing ECS instances across regions...")
for region in self.regions:
try:
from alibabacloud_ecs20140526 import models
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
# Add security token if present (for STS)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ECS client
client = EcsClient(config)
# Describe instances
request = models.DescribeInstancesRequest(
page_size=100, region_id=region
)
response = client.describe_instances(request)
# Process instances
if response.body.instances and response.body.instances.instance:
for instance_data in response.body.instances.instance:
instance_id = instance_data.instance_id
arn = self.generate_resource_arn(
"instance", instance_id, region
)
# Get public IP
public_ip = None
if (
instance_data.public_ip_address
and instance_data.public_ip_address.ip_address
):
public_ip = (
instance_data.public_ip_address.ip_address[0]
if instance_data.public_ip_address.ip_address
else None
)
elif (
instance_data.eip_address
and instance_data.eip_address.ip_address
):
public_ip = instance_data.eip_address.ip_address
# Get private IP
private_ip = ""
if (
instance_data.vpc_attributes
and instance_data.vpc_attributes.private_ip_address
):
private_ip = (
instance_data.vpc_attributes.private_ip_address.ip_address[
0
]
if instance_data.vpc_attributes.private_ip_address.ip_address
else ""
)
# Get security groups
security_groups = []
if (
instance_data.security_group_ids
and instance_data.security_group_ids.security_group_id
):
security_groups = (
instance_data.security_group_ids.security_group_id
)
# Get VPC ID
vpc_id = (
instance_data.vpc_attributes.vpc_id
if instance_data.vpc_attributes
else ""
)
# Get tags
tags = {}
if instance_data.tags and instance_data.tags.tag:
for tag in instance_data.tags.tag:
tags[tag.tag_key] = tag.tag_value
instance = Instance(
id=instance_id,
name=instance_data.instance_name or instance_id,
arn=arn,
region=region,
status=instance_data.status,
instance_type=instance_data.instance_type,
public_ip=public_ip,
private_ip=private_ip,
security_groups=security_groups,
vpc_id=vpc_id,
zone_id=instance_data.zone_id,
image_id=instance_data.image_id,
tags=tags,
created_time=instance_data.creation_time,
expired_time=(
instance_data.expired_time
if hasattr(instance_data, "expired_time")
else ""
),
)
self.instances[arn] = instance
logger.info(f"Found ECS instance: {instance_id} in {region}")
else:
logger.info(f"No ECS instances found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeInstances", region)
def _describe_disks(self):
"""
Describe ECS disks across all regions
This method collects all ECS disks and their encryption status.
"""
logger.info("Describing ECS disks across regions...")
for region in self.regions:
try:
from alibabacloud_ecs20140526 import models
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ECS client
client = EcsClient(config)
# Describe disks
request = models.DescribeDisksRequest(page_size=100, region_id=region)
response = client.describe_disks(request)
# Process disks
if response.body.disks and response.body.disks.disk:
for disk_data in response.body.disks.disk:
disk_id = disk_data.disk_id
arn = self.generate_resource_arn("disk", disk_id, region)
# Get tags
tags = {}
if disk_data.tags and disk_data.tags.tag:
for tag in disk_data.tags.tag:
tags[tag.tag_key] = tag.tag_value
disk = Disk(
id=disk_id,
name=disk_data.disk_name or disk_id,
arn=arn,
region=region,
disk_type=disk_data.type if disk_data.type else "",
category=disk_data.category if disk_data.category else "",
size=disk_data.size if disk_data.size else 0,
encrypted=(
disk_data.encrypted
if hasattr(disk_data, "encrypted")
else False
),
kms_key_id=(
disk_data.kms_key_id
if hasattr(disk_data, "kms_key_id")
else None
),
status=disk_data.status if disk_data.status else "",
instance_id=(
disk_data.instance_id if disk_data.instance_id else None
),
zone_id=disk_data.zone_id if disk_data.zone_id else "",
tags=tags,
)
self.disks[arn] = disk
logger.info(f"Found ECS disk: {disk_id} in {region}")
else:
logger.info(f"No ECS disks found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeDisks", region)
def _describe_security_groups(self):
"""
Describe security groups and their rules
This method collects security groups and analyzes their rules.
"""
logger.info("Describing security groups across regions...")
for region in self.regions:
try:
from alibabacloud_ecs20140526 import models
from alibabacloud_ecs20140526.client import Client as EcsClient
from alibabacloud_tea_openapi import models as openapi_models
# Create client configuration
config = openapi_models.Config(
access_key_id=self.provider.session.credentials.access_key_id,
access_key_secret=self.provider.session.credentials.access_key_secret,
region_id=region,
)
if self.provider.session.credentials.security_token:
config.security_token = (
self.provider.session.credentials.security_token
)
# Create ECS client
client = EcsClient(config)
# Describe security groups
request = models.DescribeSecurityGroupsRequest(
page_size=100, region_id=region
)
response = client.describe_security_groups(request)
# Process security groups
if (
response.body.security_groups
and response.body.security_groups.security_group
):
for sg_data in response.body.security_groups.security_group:
sg_id = sg_data.security_group_id
arn = self.generate_resource_arn(
"security-group", sg_id, region
)
# Get tags
tags = {}
if sg_data.tags and sg_data.tags.tag:
for tag in sg_data.tags.tag:
tags[tag.tag_key] = tag.tag_value
# Get security group rules
rules = []
try:
rules_request = (
models.DescribeSecurityGroupAttributeRequest(
security_group_id=sg_id, region_id=region
)
)
rules_response = client.describe_security_group_attribute(
rules_request
)
# Process ingress rules
if (
rules_response.body.permissions
and rules_response.body.permissions.permission
):
for perm in rules_response.body.permissions.permission:
rule = {
"direction": (
perm.direction
if perm.direction
else "ingress"
),
"protocol": (
perm.ip_protocol if perm.ip_protocol else ""
),
"port_range": (
perm.port_range if perm.port_range else ""
),
"source": (
perm.source_cidr_ip
if hasattr(perm, "source_cidr_ip")
and perm.source_cidr_ip
else ""
),
"source_group_id": (
perm.source_group_id
if hasattr(perm, "source_group_id")
and perm.source_group_id
else ""
),
}
rules.append(rule)
except Exception as rules_error:
logger.warning(
f"Could not retrieve rules for security group {sg_id}: {rules_error}"
)
security_group = SecurityGroup(
id=sg_id,
name=sg_data.security_group_name or sg_id,
arn=arn,
region=region,
vpc_id=sg_data.vpc_id if sg_data.vpc_id else "",
description=(
sg_data.description if sg_data.description else ""
),
rules=rules,
tags=tags,
)
self.security_groups[arn] = security_group
logger.info(f"Found security group: {sg_id} in {region}")
else:
logger.info(f"No security groups found in {region}")
except Exception as error:
self._handle_api_error(error, "DescribeSecurityGroups", region)

View File

@@ -0,0 +1 @@
"""Alibaba Cloud OSS (Object Storage Service) module"""

View File

@@ -0,0 +1 @@
"""OSS Bucket CORS Not Overly Permissive Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_cors_not_overly_permissive",
"CheckTitle": "Ensure OSS bucket CORS rules are not overly permissive",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Bucket",
"Description": "Ensures that OSS bucket CORS (Cross-Origin Resource Sharing) rules are not overly permissive. Overly permissive CORS rules can allow unauthorized websites to access bucket resources.",
"Risk": "Overly permissive CORS rules allowing all origins (*) can enable malicious websites to access bucket resources, potentially leading to data leakage or unauthorized operations.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-cors.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Update CORS rules to allow only specific trusted origins instead of '*'. Go to OSS Console > Buckets, select the bucket, and configure CORS rules with specific allowed origins.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/cors"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,30 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_cors_not_overly_permissive(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
if not bucket.cors_rules or len(bucket.cors_rules) == 0:
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} does not have CORS rules configured."
)
else:
overly_permissive = False
for rule in bucket.cors_rules:
allowed_origins = rule.get("AllowedOrigin", [])
if "*" in allowed_origins:
overly_permissive = True
break
report.status = "FAIL"
report.status_extended = f"OSS bucket {bucket.name} has overly permissive CORS rules allowing all origins."
if not overly_permissive:
report.status = "PASS"
report.status_extended = f"OSS bucket {bucket.name} has appropriately restrictive CORS rules."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Default Encryption KMS Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_default_encryption_kms",
"CheckTitle": "Ensure OSS buckets use KMS for default encryption",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "low",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets use KMS (Key Management Service) for encryption instead of AES256. KMS provides better key management, rotation, and audit capabilities compared to AES256.",
"Risk": "Using AES256 encryption instead of KMS means missing out on centralized key management, automatic key rotation, and comprehensive audit logging capabilities.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-encryption-kms.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Update OSS bucket encryption to use KMS instead of AES256. Go to OSS Console > Buckets, select the bucket, and configure server-side encryption with KMS.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/server-side-encryption"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "KMS encryption may have additional costs compared to AES256."
}

View File

@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_default_encryption_kms(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
if bucket.encryption_enabled and bucket.encryption_algorithm:
if "KMS" in bucket.encryption_algorithm.upper():
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} uses KMS for encryption."
)
else:
report.status_extended = f"OSS bucket {bucket.name} uses {bucket.encryption_algorithm} for encryption."
else:
report.status_extended = (
f"OSS bucket {bucket.name} does not have encryption enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Encryption Enabled Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_encryption_enabled",
"CheckTitle": "Ensure OSS buckets have encryption enabled",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "high",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets have server-side encryption enabled to protect data at rest. Encryption protects sensitive data from unauthorized access.",
"Risk": "Unencrypted data at rest is vulnerable to unauthorized access if storage media is compromised. Encryption ensures data confidentiality even if physical access is obtained.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-encryption.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable server-side encryption for OSS buckets. Go to OSS Console > Buckets, select the bucket, and enable server-side encryption with AES256 or KMS.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/server-side-encryption"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_encryption_enabled(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = (
f"OSS bucket {bucket.name} does not have encryption enabled."
)
if bucket.encryption_enabled:
report.status = "PASS"
report.status_extended = f"OSS bucket {bucket.name} has encryption enabled with {bucket.encryption_algorithm}."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Lifecycle Rules Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_lifecycle_rules",
"CheckTitle": "Ensure OSS buckets have lifecycle rules configured",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "low",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets have lifecycle rules configured to manage object lifecycle and optimize costs. Lifecycle rules automatically transition or delete objects based on age.",
"Risk": "Without lifecycle rules, old or unused objects remain in the bucket indefinitely, leading to unnecessary storage costs and difficulty managing data retention policies.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-lifecycle.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure lifecycle rules for OSS buckets. Go to OSS Console > Buckets, select the bucket, and create lifecycle rules to automatically transition or delete objects.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/lifecycle-rules"
}
},
"Categories": [
"trustworthy-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_lifecycle_rules(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = (
f"OSS bucket {bucket.name} does not have lifecycle rules configured."
)
if bucket.lifecycle_rules and len(bucket.lifecycle_rules) > 0:
report.status = "PASS"
report.status_extended = f"OSS bucket {bucket.name} has {len(bucket.lifecycle_rules)} lifecycle rule(s) configured."
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Logging Enabled Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_logging_enabled",
"CheckTitle": "Ensure OSS buckets have access logging enabled",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets have access logging enabled for audit and compliance purposes. Access logs provide detailed records of requests made to the bucket.",
"Risk": "Without access logging, it is difficult to track access patterns, detect unauthorized access attempts, or investigate security incidents. This can lead to compliance violations and delayed incident response.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-logging.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable access logging for OSS buckets. Go to OSS Console > Buckets, select the bucket, and configure logging with a target bucket for log storage.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/logging"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_logging_enabled(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = (
f"OSS bucket {bucket.name} does not have access logging enabled."
)
if bucket.logging_enabled:
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} has access logging enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Public Access Blocked Check"""

View File

@@ -0,0 +1,33 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_public_access_blocked",
"CheckTitle": "Ensure OSS buckets block public access",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "critical",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets block public access to prevent unauthorized data exposure. Public buckets can be accessed by anyone on the internet, potentially exposing sensitive data.",
"Risk": "Public OSS buckets expose data to anyone on the internet, potentially leading to data breaches, unauthorized access to sensitive information, compliance violations, and reputational damage.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-public-access.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure OSS bucket ACL to block public access. Go to OSS Console > Buckets, select the bucket, and set ACL to 'Private'. Use bucket policies and RAM policies to grant access instead of public ACLs.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/bucket-acl"
}
},
"Categories": [
"encryption",
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,20 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_public_access_blocked(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = f"OSS bucket {bucket.name} allows public access."
if not bucket.public_access:
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} blocks public access."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Referer Whitelist Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_referer_whitelist",
"CheckTitle": "Ensure OSS buckets have referer whitelist configured",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "low",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets have referer whitelist configured and do not allow empty referers. Referer whitelisting prevents hotlinking and unauthorized access from unknown sources.",
"Risk": "Without referer whitelist, buckets can be accessed from any source, potentially leading to bandwidth theft through hotlinking and unauthorized access to resources.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-referer.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Configure referer whitelist for OSS buckets and disable empty referers. Go to OSS Console > Buckets, select the bucket, and configure referer whitelist with trusted domains.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/hotlink-protection"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,34 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_referer_whitelist(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
if bucket.referer_config:
allow_empty = bucket.referer_config.get("AllowEmpty", True)
referer_list = bucket.referer_config.get("RefererList", [])
if allow_empty and (not referer_list or len(referer_list) == 0):
report.status_extended = f"OSS bucket {bucket.name} allows empty referers and has no referer whitelist."
elif allow_empty:
report.status_extended = (
f"OSS bucket {bucket.name} allows empty referers."
)
elif not referer_list or len(referer_list) == 0:
report.status_extended = (
f"OSS bucket {bucket.name} has no referer whitelist configured."
)
else:
report.status = "PASS"
report.status_extended = f"OSS bucket {bucket.name} has proper referer whitelist configuration."
else:
report.status_extended = (
f"OSS bucket {bucket.name} has no referer configuration."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Transfer Acceleration Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_transfer_acceleration",
"CheckTitle": "Check if OSS buckets have transfer acceleration enabled",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "informational",
"ResourceType": "Bucket",
"Description": "Checks if OSS buckets have transfer acceleration enabled for improved global data transfer performance. Transfer acceleration uses optimized network paths for faster uploads/downloads from distant locations.",
"Risk": "Without transfer acceleration, data transfers from distant geographic locations may be slower, impacting user experience and operational efficiency.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-transfer-acceleration.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable transfer acceleration for OSS buckets that serve global users. Go to OSS Console > Buckets, select the bucket, and enable transfer acceleration.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/transfer-acceleration"
}
},
"Categories": [
"trustworthy-ai"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This is an informational check. Transfer acceleration has additional costs."
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_transfer_acceleration(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = (
f"OSS bucket {bucket.name} does not have transfer acceleration enabled."
)
if bucket.transfer_acceleration:
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} has transfer acceleration enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1 @@
"""OSS Bucket Versioning Enabled Check"""

View File

@@ -0,0 +1,32 @@
{
"Provider": "alibabacloud",
"CheckID": "oss_bucket_versioning_enabled",
"CheckTitle": "Ensure OSS buckets have versioning enabled",
"CheckType": [],
"ServiceName": "oss",
"SubServiceName": "",
"ResourceIdTemplate": "arn",
"Severity": "medium",
"ResourceType": "Bucket",
"Description": "Ensures that OSS buckets have versioning enabled to protect against accidental deletions and overwrites. Versioning allows recovery of objects from accidental deletion or overwrite.",
"Risk": "Without versioning, accidental deletion or overwrite of objects cannot be recovered, potentially leading to permanent data loss.",
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-versioning.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable versioning for OSS buckets. Go to OSS Console > Buckets, select the bucket, and enable versioning.",
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/versioning"
}
},
"Categories": [
"forensics-ready"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}

View File

@@ -0,0 +1,22 @@
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
class oss_bucket_versioning_enabled(Check):
def execute(self):
findings = []
for bucket in oss_client.buckets.values():
report = Check_Report_AlibabaCloud(
metadata=self.metadata(), resource=bucket
)
report.status = "FAIL"
report.status_extended = (
f"OSS bucket {bucket.name} does not have versioning enabled."
)
if bucket.versioning_enabled:
report.status = "PASS"
report.status_extended = (
f"OSS bucket {bucket.name} has versioning enabled."
)
findings.append(report)
return findings

View File

@@ -0,0 +1,6 @@
"""Alibaba Cloud OSS Client Singleton"""
from prowler.providers.alibabacloud.services.oss.oss_service import OSS
from prowler.providers.common.provider import Provider
oss_client = OSS(Provider.get_global_provider())

Some files were not shown because too many files have changed in this diff Show More