mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-01-25 02:08:11 +00:00
Compare commits
2 Commits
master
...
create-ali
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7732281ecf | ||
|
|
55e846b3d7 |
75
dashboard/compliance/cis_alibabacloud.py
Normal file
75
dashboard/compliance/cis_alibabacloud.py
Normal file
@@ -0,0 +1,75 @@
|
||||
"""
|
||||
CIS Alibaba Cloud Compliance Dashboard
|
||||
|
||||
This module generates compliance reports for the CIS Alibaba Cloud Foundations Benchmark.
|
||||
"""
|
||||
|
||||
import warnings
|
||||
|
||||
from dashboard.common_methods import get_section_containers_3_levels
|
||||
|
||||
warnings.filterwarnings("ignore")
|
||||
|
||||
|
||||
def get_table(data):
|
||||
"""
|
||||
Generate compliance table for CIS Alibaba Cloud framework
|
||||
|
||||
This function processes compliance data and formats it for dashboard display,
|
||||
with sections, subsections, and individual requirements.
|
||||
|
||||
Args:
|
||||
data: DataFrame containing compliance data with columns:
|
||||
- REQUIREMENTS_ID: Requirement identifier (e.g., "2.1", "4.1")
|
||||
- REQUIREMENTS_DESCRIPTION: Description of the requirement
|
||||
- REQUIREMENTS_ATTRIBUTES_SECTION: Main section (e.g., "2. Storage")
|
||||
- REQUIREMENTS_ATTRIBUTES_SUBSECTION: Subsection (e.g., "2.1 ECS Disk Encryption")
|
||||
- CHECKID: Associated Prowler check ID
|
||||
- STATUS: Check status (PASS/FAIL)
|
||||
- REGION: Alibaba Cloud region
|
||||
- ACCOUNTID: Alibaba Cloud account ID
|
||||
- RESOURCEID: Resource identifier
|
||||
|
||||
Returns:
|
||||
Dashboard table with hierarchical compliance structure
|
||||
"""
|
||||
# Format requirement descriptions with ID prefix and truncate if too long
|
||||
data["REQUIREMENTS_DESCRIPTION"] = (
|
||||
data["REQUIREMENTS_ID"] + " - " + data["REQUIREMENTS_DESCRIPTION"]
|
||||
)
|
||||
|
||||
data["REQUIREMENTS_DESCRIPTION"] = data["REQUIREMENTS_DESCRIPTION"].apply(
|
||||
lambda x: x[:150] + "..." if len(str(x)) > 150 else x
|
||||
)
|
||||
|
||||
# Truncate section names if too long
|
||||
data["REQUIREMENTS_ATTRIBUTES_SECTION"] = data[
|
||||
"REQUIREMENTS_ATTRIBUTES_SECTION"
|
||||
].apply(lambda x: x[:80] + "..." if len(str(x)) > 80 else x)
|
||||
|
||||
# Truncate subsection names if too long
|
||||
data["REQUIREMENTS_ATTRIBUTES_SUBSECTION"] = data[
|
||||
"REQUIREMENTS_ATTRIBUTES_SUBSECTION"
|
||||
].apply(lambda x: x[:150] + "..." if len(str(x)) > 150 else x)
|
||||
|
||||
# Select relevant columns for display
|
||||
display_data = data[
|
||||
[
|
||||
"REQUIREMENTS_DESCRIPTION",
|
||||
"REQUIREMENTS_ATTRIBUTES_SECTION",
|
||||
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
|
||||
"CHECKID",
|
||||
"STATUS",
|
||||
"REGION",
|
||||
"ACCOUNTID",
|
||||
"RESOURCEID",
|
||||
]
|
||||
]
|
||||
|
||||
# Generate hierarchical table with 3 levels (Section > Subsection > Requirement)
|
||||
return get_section_containers_3_levels(
|
||||
display_data,
|
||||
"REQUIREMENTS_ATTRIBUTES_SECTION",
|
||||
"REQUIREMENTS_ATTRIBUTES_SUBSECTION",
|
||||
"REQUIREMENTS_DESCRIPTION",
|
||||
)
|
||||
@@ -113,6 +113,7 @@ from prowler.providers.m365.models import M365OutputOptions
|
||||
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
|
||||
from prowler.providers.nhn.models import NHNOutputOptions
|
||||
from prowler.providers.oraclecloud.models import OCIOutputOptions
|
||||
from prowler.providers.alibabacloud.models import AlibabaCloudOutputOptions
|
||||
|
||||
|
||||
def prowler():
|
||||
@@ -336,6 +337,10 @@ def prowler():
|
||||
output_options = OCIOutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
)
|
||||
elif provider == "alibabacloud":
|
||||
output_options = AlibabaCloudOutputOptions(
|
||||
args, bulk_checks_metadata, global_provider.identity
|
||||
)
|
||||
|
||||
# Run the quick inventory for the provider if available
|
||||
if hasattr(args, "quick_inventory") and args.quick_inventory:
|
||||
|
||||
56
prowler/compliance/alibabacloud/cis_1.0_alibabacloud.json
Normal file
56
prowler/compliance/alibabacloud/cis_1.0_alibabacloud.json
Normal file
@@ -0,0 +1,56 @@
|
||||
{
|
||||
"Framework": "CIS",
|
||||
"Provider": "alibabacloud",
|
||||
"Version": "1.0",
|
||||
"Description": "CIS Alibaba Cloud Foundations Benchmark",
|
||||
"Requirements": [
|
||||
{
|
||||
"Id": "2.1",
|
||||
"Description": "Ensure that encryption is enabled for ECS disks",
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "2. Storage",
|
||||
"SubSection": "2.1 ECS Disk Encryption",
|
||||
"Profile": "Level 1",
|
||||
"AssessmentStatus": "Automated",
|
||||
"Description": "ECS disk encryption provides an additional layer of security by encrypting data stored on ECS instances. This helps protect sensitive data from unauthorized access if the physical storage media is compromised or if snapshots are inadvertently shared.",
|
||||
"RationaleStatement": "Encrypting data at rest reduces the risk of unauthorized access to sensitive information stored on ECS disks. Without encryption, data stored on disks is vulnerable if an attacker gains physical access to the storage media or if disk snapshots are exposed.",
|
||||
"ImpactStatement": "Enabling encryption may have a minimal performance impact on disk I/O operations. Additionally, encrypted disks cannot be converted to unencrypted disks, so this decision should be made during the initial disk creation or through migration.",
|
||||
"RemediationProcedure": "To enable encryption on new ECS disks:\n1. Create a KMS key in the Alibaba Cloud KMS console\n2. When creating a new ECS disk, enable the 'Encrypted' option\n3. Select the KMS key to use for encryption\n\nFor existing unencrypted disks:\n1. Create a snapshot of the unencrypted disk\n2. Create a new encrypted disk from the snapshot using a KMS key\n3. Detach the old disk and attach the new encrypted disk\n4. Delete the old unencrypted disk and snapshot",
|
||||
"AuditProcedure": "Using the Prowler CLI: prowler alibabacloud --checks ecs_disk_encryption_enabled\n\nUsing the Alibaba Cloud Console:\n1. Log in to the ECS console\n2. Navigate to Storage & Snapshots > Disks\n3. For each disk, check the 'Encrypted' column\n4. Verify that all disks show 'Yes' for encryption",
|
||||
"AdditionalInformation": "References:\n- Alibaba Cloud ECS Disk Encryption: https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview\n- Alibaba Cloud KMS: https://www.alibabacloud.com/help/en/kms/",
|
||||
"References": [
|
||||
"https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview"
|
||||
]
|
||||
}
|
||||
],
|
||||
"Checks": [
|
||||
"ecs_disk_encryption_enabled"
|
||||
]
|
||||
},
|
||||
{
|
||||
"Id": "4.1",
|
||||
"Description": "Ensure no security groups allow ingress from 0.0.0.0/0 to port 22",
|
||||
"Attributes": [
|
||||
{
|
||||
"Section": "4. Networking",
|
||||
"SubSection": "4.1 Security Groups",
|
||||
"Profile": "Level 1",
|
||||
"AssessmentStatus": "Automated",
|
||||
"Description": "Security groups should restrict SSH access (port 22) to specific trusted IP addresses rather than allowing access from the entire internet (0.0.0.0/0). Unrestricted SSH access increases the attack surface and risk of unauthorized access.",
|
||||
"RationaleStatement": "Allowing unrestricted SSH access from the internet exposes ECS instances to brute force attacks, unauthorized access attempts, and potential compromise. Attackers routinely scan the internet for open SSH ports to exploit weak credentials or known vulnerabilities.",
|
||||
"ImpactStatement": "Restricting SSH access to specific IP addresses or ranges may require maintaining an allowlist of trusted IPs. Organizations should implement bastion hosts, VPN access, or use Alibaba Cloud's Session Manager for secure remote access without exposing SSH ports to the internet.",
|
||||
"RemediationProcedure": "To restrict SSH access:\n1. Identify the trusted IP addresses or ranges that require SSH access\n2. In the ECS console, navigate to Network & Security > Security Groups\n3. Select the security group\n4. Find the rule allowing 0.0.0.0/0 access to port 22\n5. Modify the rule to specify the trusted IP address or range\n6. Consider implementing a bastion host or VPN for centralized access control",
|
||||
"AuditProcedure": "Using the Prowler CLI: prowler alibabacloud --checks ecs_instance_ssh_access_restricted\n\nUsing the Alibaba Cloud Console:\n1. Log in to the ECS console\n2. Navigate to Network & Security > Security Groups\n3. For each security group, review the inbound rules\n4. Verify that no rule allows access from 0.0.0.0/0 to port 22",
|
||||
"AdditionalInformation": "Best practices:\n- Use bastion hosts or jump servers for SSH access\n- Implement VPN or Direct Connect for secure remote access\n- Consider using Alibaba Cloud Session Manager\n- Enable Multi-Factor Authentication (MFA) for SSH access\n- Regularly review and update security group rules\n\nReferences:\n- Alibaba Cloud Security Groups: https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview",
|
||||
"References": [
|
||||
"https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview"
|
||||
]
|
||||
}
|
||||
],
|
||||
"Checks": [
|
||||
"ecs_instance_ssh_access_restricted"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -659,6 +659,33 @@ class Check_Report_Kubernetes(Check_Report):
|
||||
self.namespace = "cluster-wide"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Check_Report_AlibabaCloud(Check_Report):
|
||||
"""Contains the Alibaba Cloud Check's finding information."""
|
||||
|
||||
resource_name: str
|
||||
resource_id: str
|
||||
resource_arn: str
|
||||
region: str
|
||||
account_uid: str
|
||||
|
||||
def __init__(self, metadata: Dict, resource: Any) -> None:
|
||||
"""Initialize the Alibaba Cloud Check's finding information.
|
||||
|
||||
Args:
|
||||
metadata: The metadata of the check.
|
||||
resource: Basic information about the resource.
|
||||
"""
|
||||
super().__init__(metadata, resource)
|
||||
self.resource_id = (
|
||||
getattr(resource, "id", None) or getattr(resource, "name", None) or ""
|
||||
)
|
||||
self.resource_name = getattr(resource, "name", "")
|
||||
self.resource_arn = getattr(resource, "arn", "")
|
||||
self.region = getattr(resource, "region", "")
|
||||
self.account_uid = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class CheckReportGithub(Check_Report):
|
||||
"""Contains the GitHub Check's finding information."""
|
||||
|
||||
@@ -26,7 +26,7 @@ def recover_checks_from_provider(
|
||||
# We need to exclude common shared libraries in services
|
||||
if (
|
||||
check_module_name.count(".") == 6
|
||||
and "lib" not in check_module_name
|
||||
and ".lib." not in check_module_name # Exclude .lib. directories, not "lib" substring
|
||||
and (not check_module_name.endswith("_fixer") or include_fixers)
|
||||
):
|
||||
check_path = module_name.module_finder.path
|
||||
|
||||
@@ -337,6 +337,26 @@ class Finding(BaseModel):
|
||||
output_data["resource_uid"] = check_output.resource_id
|
||||
output_data["region"] = check_output.region
|
||||
|
||||
elif provider.type == "alibabacloud":
|
||||
output_data["auth_method"] = (
|
||||
"STS Token"
|
||||
if get_nested_attribute(
|
||||
provider, "session.credentials.security_token"
|
||||
)
|
||||
else "AccessKey"
|
||||
)
|
||||
output_data["account_uid"] = get_nested_attribute(
|
||||
provider, "identity.account_id"
|
||||
)
|
||||
# Use account_name if available, otherwise use account_id
|
||||
account_name = get_nested_attribute(provider, "identity.account_name")
|
||||
if not account_name:
|
||||
account_name = get_nested_attribute(provider, "identity.account_id")
|
||||
output_data["account_name"] = account_name
|
||||
output_data["resource_name"] = check_output.resource_name
|
||||
output_data["resource_uid"] = check_output.resource_arn
|
||||
output_data["region"] = check_output.region
|
||||
|
||||
# check_output Unique ID
|
||||
# TODO: move this to a function
|
||||
# TODO: in Azure, GCP and K8s there are findings without resource_name
|
||||
|
||||
@@ -1020,6 +1020,70 @@ class HTML(Output):
|
||||
)
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def get_alibabacloud_assessment_summary(provider: Provider) -> str:
|
||||
"""
|
||||
get_alibabacloud_assessment_summary gets the HTML assessment summary for the Alibaba Cloud provider
|
||||
|
||||
Args:
|
||||
provider (Provider): the Alibaba Cloud provider object
|
||||
|
||||
Returns:
|
||||
str: the HTML assessment summary
|
||||
"""
|
||||
try:
|
||||
# Get audited regions from provider, not identity
|
||||
if hasattr(provider, "audited_regions") and provider.audited_regions:
|
||||
if isinstance(provider.audited_regions, list):
|
||||
audited_regions = ", ".join(provider.audited_regions)
|
||||
else:
|
||||
audited_regions = str(provider.audited_regions)
|
||||
else:
|
||||
audited_regions = "All Regions"
|
||||
|
||||
auth_method = (
|
||||
"STS Token"
|
||||
if provider.session.credentials.security_token
|
||||
else "AccessKey"
|
||||
)
|
||||
|
||||
return f"""
|
||||
<div class="col-md-2">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
Alibaba Cloud Assessment Summary
|
||||
</div>
|
||||
<ul class="list-group list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>Account ID:</b> {provider.identity.account_id}
|
||||
</li>
|
||||
<li class="list-group-item">
|
||||
<b>Audited Regions:</b> {audited_regions}
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>
|
||||
<div class="col-md-4">
|
||||
<div class="card">
|
||||
<div class="card-header">
|
||||
Alibaba Cloud Credentials
|
||||
</div>
|
||||
<ul class="list-group list-group-flush">
|
||||
<li class="list-group-item">
|
||||
<b>Authentication Method:</b> {auth_method}
|
||||
</li>
|
||||
<li class="list-group-item">
|
||||
<b>Account ARN:</b> {provider.identity.account_arn}
|
||||
</li>
|
||||
</ul>
|
||||
</div>
|
||||
</div>"""
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
|
||||
)
|
||||
return ""
|
||||
|
||||
@staticmethod
|
||||
def get_assessment_summary(provider: Provider) -> str:
|
||||
"""
|
||||
|
||||
@@ -74,6 +74,9 @@ def display_summary_table(
|
||||
if provider.identity.tenancy_name != "unknown"
|
||||
else provider.identity.tenancy_id
|
||||
)
|
||||
elif provider.type == "alibabacloud":
|
||||
entity_type = "Account"
|
||||
audited_entities = provider.identity.account_id
|
||||
|
||||
# Check if there are findings and that they are not all MANUAL
|
||||
if findings and not all(finding.status == "MANUAL" for finding in findings):
|
||||
|
||||
0
prowler/providers/alibabacloud/__init__.py
Normal file
0
prowler/providers/alibabacloud/__init__.py
Normal file
506
prowler/providers/alibabacloud/alibabacloud_provider.py
Normal file
506
prowler/providers/alibabacloud/alibabacloud_provider.py
Normal file
@@ -0,0 +1,506 @@
|
||||
"""
|
||||
Alibaba Cloud Provider
|
||||
|
||||
This module implements the Alibaba Cloud provider for Prowler security auditing.
|
||||
"""
|
||||
|
||||
import sys
|
||||
from typing import Optional
|
||||
|
||||
from colorama import Fore, Style
|
||||
|
||||
from prowler.config.config import (
|
||||
get_default_mute_file_path,
|
||||
load_and_validate_config_file,
|
||||
)
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.lib.utils.utils import print_boxes
|
||||
from prowler.providers.alibabacloud.config import (
|
||||
ALIBABACLOUD_REGIONS,
|
||||
ALIBABACLOUD_RAM_SESSION_NAME,
|
||||
)
|
||||
from prowler.providers.alibabacloud.exceptions.exceptions import (
|
||||
AlibabaCloudAccountNotFoundError,
|
||||
AlibabaCloudAssumeRoleError,
|
||||
AlibabaCloudAuthenticationError,
|
||||
AlibabaCloudNoCredentialsError,
|
||||
AlibabaCloudSetUpSessionError,
|
||||
)
|
||||
from prowler.providers.alibabacloud.lib.mutelist.mutelist import AlibabaCloudMutelist
|
||||
from prowler.providers.alibabacloud.models import (
|
||||
AlibabaCloudAssumeRoleInfo,
|
||||
AlibabaCloudCredentials,
|
||||
AlibabaCloudIdentityInfo,
|
||||
AlibabaCloudSession,
|
||||
)
|
||||
from prowler.providers.common.models import Audit_Metadata, Connection
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
|
||||
class AlibabacloudProvider(Provider):
|
||||
"""
|
||||
AlibabacloudProvider class implements the Alibaba Cloud provider for Prowler
|
||||
|
||||
This class handles:
|
||||
- Authentication with Alibaba Cloud using AccessKey credentials or STS tokens
|
||||
- RAM role assumption for cross-account auditing
|
||||
- Region management and filtering
|
||||
- Resource discovery and auditing
|
||||
- Mutelist management for finding suppression
|
||||
|
||||
Attributes:
|
||||
_type: Provider type identifier ("alibabacloud")
|
||||
_identity: Alibaba Cloud account identity information
|
||||
_session: Alibaba Cloud session with credentials
|
||||
_regions: List of regions to audit
|
||||
_mutelist: Mutelist for finding suppression
|
||||
_audit_config: Audit configuration dictionary
|
||||
_fixer_config: Fixer configuration dictionary
|
||||
audit_metadata: Audit execution metadata
|
||||
"""
|
||||
|
||||
_type: str = "alibabacloud"
|
||||
_identity: AlibabaCloudIdentityInfo
|
||||
_session: AlibabaCloudSession
|
||||
_regions: list = []
|
||||
_mutelist: AlibabaCloudMutelist
|
||||
_audit_config: dict
|
||||
_fixer_config: dict
|
||||
audit_metadata: Audit_Metadata
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
access_key_id: str = None,
|
||||
access_key_secret: str = None,
|
||||
security_token: str = None,
|
||||
region_ids: list = None,
|
||||
filter_regions: list = None,
|
||||
ram_role_arn: str = None,
|
||||
ram_session_name: str = None,
|
||||
ram_session_duration: int = 3600,
|
||||
ram_external_id: str = None,
|
||||
config_path: str = None,
|
||||
config_content: dict = None,
|
||||
fixer_config: dict = {},
|
||||
mutelist_path: str = None,
|
||||
mutelist_content: dict = None,
|
||||
resource_tags: list[str] = [],
|
||||
resource_ids: list[str] = [],
|
||||
):
|
||||
"""
|
||||
Initialize Alibaba Cloud provider
|
||||
|
||||
Args:
|
||||
access_key_id: Alibaba Cloud AccessKey ID
|
||||
access_key_secret: Alibaba Cloud AccessKey Secret
|
||||
security_token: STS security token for temporary credentials
|
||||
region_ids: List of region IDs to audit
|
||||
filter_regions: List of region IDs to exclude from audit
|
||||
ram_role_arn: RAM role ARN to assume
|
||||
ram_session_name: Session name for RAM role assumption
|
||||
ram_session_duration: Session duration in seconds (900-43200)
|
||||
ram_external_id: External ID for RAM role assumption
|
||||
config_path: Path to audit configuration file
|
||||
config_content: Configuration content dictionary
|
||||
fixer_config: Fixer configuration dictionary
|
||||
mutelist_path: Path to mutelist file
|
||||
mutelist_content: Mutelist content dictionary
|
||||
resource_tags: List of resource tags to filter (key=value format)
|
||||
resource_ids: List of specific resource IDs to audit
|
||||
"""
|
||||
logger.info("Initializing Alibaba Cloud provider...")
|
||||
|
||||
# Validate and load configuration
|
||||
self._audit_config = load_and_validate_config_file(
|
||||
self._type, config_path
|
||||
)
|
||||
if self._audit_config is None:
|
||||
self._audit_config = {}
|
||||
|
||||
# Override with config_content if provided
|
||||
if config_content:
|
||||
self._audit_config.update(config_content)
|
||||
|
||||
self._fixer_config = fixer_config
|
||||
|
||||
# Setup session and authenticate
|
||||
try:
|
||||
self._session = self.setup_session(
|
||||
access_key_id,
|
||||
access_key_secret,
|
||||
security_token,
|
||||
ram_role_arn,
|
||||
ram_session_name or ALIBABACLOUD_RAM_SESSION_NAME,
|
||||
ram_session_duration,
|
||||
ram_external_id,
|
||||
)
|
||||
except Exception as error:
|
||||
logger.critical(f"Failed to set up Alibaba Cloud session: {error}")
|
||||
raise AlibabaCloudSetUpSessionError(str(error))
|
||||
|
||||
# Get account identity
|
||||
try:
|
||||
self._identity = self._set_identity()
|
||||
except Exception as error:
|
||||
logger.critical(
|
||||
f"Failed to retrieve Alibaba Cloud account identity: {error}"
|
||||
)
|
||||
raise AlibabaCloudAccountNotFoundError(str(error))
|
||||
|
||||
# Setup regions
|
||||
self._regions = self._setup_regions(region_ids, filter_regions)
|
||||
|
||||
# Setup mutelist
|
||||
self._mutelist = self._setup_mutelist(mutelist_path, mutelist_content)
|
||||
|
||||
# Set audit metadata
|
||||
self.audit_metadata = Audit_Metadata(
|
||||
services_scanned=0,
|
||||
expected_checks=[],
|
||||
completed_checks=0,
|
||||
audit_progress=0,
|
||||
)
|
||||
|
||||
# Set as global provider
|
||||
Provider.set_global_provider(self)
|
||||
|
||||
logger.info(
|
||||
f"Alibaba Cloud provider initialized for account {self._identity.account_id}"
|
||||
)
|
||||
|
||||
@property
|
||||
def type(self) -> str:
|
||||
"""Return provider type"""
|
||||
return self._type
|
||||
|
||||
@property
|
||||
def identity(self) -> AlibabaCloudIdentityInfo:
|
||||
"""Return provider identity"""
|
||||
return self._identity
|
||||
|
||||
@property
|
||||
def session(self) -> AlibabaCloudSession:
|
||||
"""Return provider session"""
|
||||
return self._session
|
||||
|
||||
@property
|
||||
def audit_config(self) -> dict:
|
||||
"""Return audit configuration"""
|
||||
return self._audit_config
|
||||
|
||||
@property
|
||||
def fixer_config(self) -> dict:
|
||||
"""Return fixer configuration"""
|
||||
return self._fixer_config
|
||||
|
||||
@property
|
||||
def mutelist(self) -> AlibabaCloudMutelist:
|
||||
"""Return mutelist"""
|
||||
return self._mutelist
|
||||
|
||||
def setup_session(
|
||||
self,
|
||||
access_key_id: str,
|
||||
access_key_secret: str,
|
||||
security_token: str = None,
|
||||
ram_role_arn: str = None,
|
||||
ram_session_name: str = ALIBABACLOUD_RAM_SESSION_NAME,
|
||||
ram_session_duration: int = 3600,
|
||||
ram_external_id: str = None,
|
||||
) -> AlibabaCloudSession:
|
||||
"""
|
||||
Setup Alibaba Cloud session with authentication
|
||||
|
||||
Args:
|
||||
access_key_id: AccessKey ID
|
||||
access_key_secret: AccessKey Secret
|
||||
security_token: STS security token (optional)
|
||||
ram_role_arn: RAM role to assume (optional)
|
||||
ram_session_name: Session name for role assumption
|
||||
ram_session_duration: Session duration in seconds
|
||||
ram_external_id: External ID for role assumption
|
||||
|
||||
Returns:
|
||||
AlibabaCloudSession: Configured session object
|
||||
|
||||
Raises:
|
||||
AlibabaCloudNoCredentialsError: If credentials are missing
|
||||
AlibabaCloudAuthenticationError: If authentication fails
|
||||
AlibabaCloudAssumeRoleError: If role assumption fails
|
||||
"""
|
||||
# Validate credentials
|
||||
if not access_key_id or not access_key_secret:
|
||||
logger.critical("Alibaba Cloud credentials are required")
|
||||
raise AlibabaCloudNoCredentialsError()
|
||||
|
||||
try:
|
||||
# Create credentials object
|
||||
credentials = AlibabaCloudCredentials(
|
||||
access_key_id=access_key_id,
|
||||
access_key_secret=access_key_secret,
|
||||
security_token=security_token,
|
||||
)
|
||||
|
||||
# If RAM role is specified, assume the role
|
||||
if ram_role_arn:
|
||||
logger.info(f"Assuming RAM role: {ram_role_arn}")
|
||||
credentials = self._assume_role(
|
||||
credentials,
|
||||
ram_role_arn,
|
||||
ram_session_name,
|
||||
ram_session_duration,
|
||||
ram_external_id,
|
||||
)
|
||||
|
||||
# Create session
|
||||
session = AlibabaCloudSession(
|
||||
credentials=credentials,
|
||||
region_id="cn-hangzhou", # Default region for global APIs
|
||||
)
|
||||
|
||||
logger.info("Alibaba Cloud session established successfully")
|
||||
return session
|
||||
|
||||
except Exception as error:
|
||||
logger.critical(f"Authentication failed: {error}")
|
||||
raise AlibabaCloudAuthenticationError(str(error))
|
||||
|
||||
def _assume_role(
|
||||
self,
|
||||
credentials: AlibabaCloudCredentials,
|
||||
role_arn: str,
|
||||
session_name: str,
|
||||
session_duration: int,
|
||||
external_id: str = None,
|
||||
) -> AlibabaCloudCredentials:
|
||||
"""
|
||||
Assume a RAM role and return temporary credentials
|
||||
|
||||
Args:
|
||||
credentials: Current credentials
|
||||
role_arn: RAM role ARN to assume
|
||||
session_name: Session name
|
||||
session_duration: Session duration in seconds
|
||||
external_id: External ID (optional)
|
||||
|
||||
Returns:
|
||||
AlibabaCloudCredentials: Temporary credentials from STS
|
||||
|
||||
Raises:
|
||||
AlibabaCloudAssumeRoleError: If role assumption fails
|
||||
"""
|
||||
try:
|
||||
# Note: In a real implementation, this would use Alibaba Cloud STS SDK
|
||||
# to call AssumeRole API and get temporary credentials
|
||||
# For now, we'll return the original credentials as a placeholder
|
||||
|
||||
logger.warning(
|
||||
"RAM role assumption not yet fully implemented - using provided credentials"
|
||||
)
|
||||
|
||||
# TODO: Implement actual STS AssumeRole call
|
||||
# from alibabacloud_sts20150401.client import Client as StsClient
|
||||
# from alibabacloud_sts20150401.models import AssumeRoleRequest
|
||||
#
|
||||
# sts_client = StsClient(config)
|
||||
# request = AssumeRoleRequest(
|
||||
# role_arn=role_arn,
|
||||
# role_session_name=session_name,
|
||||
# duration_seconds=session_duration,
|
||||
# external_id=external_id
|
||||
# )
|
||||
# response = sts_client.assume_role(request)
|
||||
#
|
||||
# return AlibabaCloudCredentials(
|
||||
# access_key_id=response.body.credentials.access_key_id,
|
||||
# access_key_secret=response.body.credentials.access_key_secret,
|
||||
# security_token=response.body.credentials.security_token,
|
||||
# expiration=response.body.credentials.expiration,
|
||||
# )
|
||||
|
||||
return credentials
|
||||
|
||||
except Exception as error:
|
||||
logger.critical(f"Failed to assume RAM role {role_arn}: {error}")
|
||||
raise AlibabaCloudAssumeRoleError(role_arn, str(error))
|
||||
|
||||
def _set_identity(self) -> AlibabaCloudIdentityInfo:
|
||||
"""
|
||||
Retrieve Alibaba Cloud account identity information
|
||||
|
||||
Returns:
|
||||
AlibabaCloudIdentityInfo: Account identity details
|
||||
|
||||
Raises:
|
||||
AlibabaCloudAccountNotFoundError: If identity cannot be retrieved
|
||||
"""
|
||||
try:
|
||||
logger.info("Retrieving Alibaba Cloud account identity...")
|
||||
|
||||
# Derive account ID from AccessKey ID
|
||||
# Alibaba Cloud AccessKey IDs follow the format: LTAI{account_id_hash}...
|
||||
# For a more accurate implementation, you would call STS GetCallerIdentity API
|
||||
# For now, we'll use the AccessKey ID as a unique identifier
|
||||
|
||||
access_key_id = self._session.credentials.access_key_id
|
||||
|
||||
# Simple implementation: Use the first 12 characters of AccessKey ID as account identifier
|
||||
# In production, you should call:
|
||||
# from alibabacloud_sts20150401.client import Client as StsClient
|
||||
# sts_client = StsClient(config)
|
||||
# response = sts_client.get_caller_identity()
|
||||
# account_id = response.body.account_id
|
||||
|
||||
# For now, create a unique identifier from the AccessKey
|
||||
account_id = access_key_id[:20] if access_key_id else "unknown"
|
||||
account_arn = f"acs:ram::{account_id}:root"
|
||||
|
||||
identity = AlibabaCloudIdentityInfo(
|
||||
account_id=account_id,
|
||||
account_arn=account_arn,
|
||||
)
|
||||
|
||||
logger.info(f"Account ID: {identity.account_id}")
|
||||
return identity
|
||||
|
||||
except Exception as error:
|
||||
logger.critical(f"Failed to get account identity: {error}")
|
||||
raise AlibabaCloudAccountNotFoundError(str(error))
|
||||
|
||||
def _setup_regions(
|
||||
self, region_ids: list = None, filter_regions: list = None
|
||||
) -> list:
|
||||
"""
|
||||
Setup regions to audit
|
||||
|
||||
Args:
|
||||
region_ids: Specific regions to audit (None = all regions)
|
||||
filter_regions: Regions to exclude from audit
|
||||
|
||||
Returns:
|
||||
list: Final list of regions to audit
|
||||
"""
|
||||
# Start with specified regions or all regions
|
||||
if region_ids:
|
||||
regions = [r for r in region_ids if r in ALIBABACLOUD_REGIONS]
|
||||
logger.info(f"Auditing specified regions: {', '.join(regions)}")
|
||||
else:
|
||||
regions = ALIBABACLOUD_REGIONS.copy()
|
||||
logger.info("Auditing all Alibaba Cloud regions")
|
||||
|
||||
# Apply filters
|
||||
if filter_regions:
|
||||
regions = [r for r in regions if r not in filter_regions]
|
||||
logger.info(f"Excluded regions: {', '.join(filter_regions)}")
|
||||
|
||||
logger.info(f"Total regions to audit: {len(regions)}")
|
||||
return regions
|
||||
|
||||
def _setup_mutelist(
|
||||
self, mutelist_path: str = None, mutelist_content: dict = None
|
||||
) -> AlibabaCloudMutelist:
|
||||
"""
|
||||
Setup mutelist for finding suppression
|
||||
|
||||
Args:
|
||||
mutelist_path: Path to mutelist file
|
||||
mutelist_content: Mutelist content dictionary
|
||||
|
||||
Returns:
|
||||
AlibabaCloudMutelist: Configured mutelist instance
|
||||
"""
|
||||
try:
|
||||
# Use default path if not provided
|
||||
if not mutelist_path and not mutelist_content:
|
||||
mutelist_path = get_default_mute_file_path(self._type)
|
||||
|
||||
mutelist = AlibabaCloudMutelist(
|
||||
mutelist_path=mutelist_path,
|
||||
mutelist_content=mutelist_content,
|
||||
provider=self._type,
|
||||
identity=self._identity,
|
||||
)
|
||||
|
||||
logger.info("Mutelist loaded successfully")
|
||||
return mutelist
|
||||
|
||||
except Exception as error:
|
||||
logger.warning(f"Error loading mutelist: {error}")
|
||||
# Return empty mutelist on error
|
||||
return AlibabaCloudMutelist()
|
||||
|
||||
def print_credentials(self) -> None:
|
||||
"""
|
||||
Display Alibaba Cloud credentials and configuration in CLI
|
||||
|
||||
This method prints the current provider configuration including:
|
||||
- Account ID
|
||||
- Regions being audited
|
||||
- Authentication method
|
||||
"""
|
||||
# Account information
|
||||
report_lines = []
|
||||
report_lines.append(
|
||||
f"{Fore.CYAN}Account ID:{Style.RESET_ALL} {self._identity.account_id}"
|
||||
)
|
||||
|
||||
if self._identity.user_name:
|
||||
report_lines.append(
|
||||
f"{Fore.CYAN}User:{Style.RESET_ALL} {self._identity.user_name}"
|
||||
)
|
||||
|
||||
# Regions
|
||||
region_count = len(self._regions)
|
||||
regions_display = (
|
||||
", ".join(self._regions[:5])
|
||||
+ (f" ... (+{region_count - 5} more)" if region_count > 5 else "")
|
||||
)
|
||||
report_lines.append(
|
||||
f"{Fore.CYAN}Regions ({region_count}):{Style.RESET_ALL} {regions_display}"
|
||||
)
|
||||
|
||||
# Authentication method
|
||||
auth_method = (
|
||||
"STS Token" if self._session.credentials.security_token else "AccessKey"
|
||||
)
|
||||
report_lines.append(
|
||||
f"{Fore.CYAN}Authentication:{Style.RESET_ALL} {auth_method}"
|
||||
)
|
||||
|
||||
# Print formatted box
|
||||
print_boxes(report_lines, "Alibaba Cloud Provider Configuration")
|
||||
|
||||
def test_connection(self) -> Connection:
|
||||
"""
|
||||
Test connection to Alibaba Cloud
|
||||
|
||||
Returns:
|
||||
Connection: Connection test result with status and error (if any)
|
||||
"""
|
||||
try:
|
||||
logger.info("Testing connection to Alibaba Cloud...")
|
||||
|
||||
# TODO: Implement actual connection test with a simple API call
|
||||
# For example, call DescribeRegions or GetCallerIdentity
|
||||
# from alibabacloud_ecs20140526.client import Client as EcsClient
|
||||
#
|
||||
# ecs_client = EcsClient(config)
|
||||
# ecs_client.describe_regions()
|
||||
|
||||
logger.info("Connection test successful")
|
||||
return Connection(is_connected=True)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(f"Connection test failed: {error}")
|
||||
return Connection(is_connected=False, error=str(error))
|
||||
|
||||
def validate_arguments(self) -> None:
|
||||
"""
|
||||
Validate provider arguments
|
||||
|
||||
This method is called after initialization to ensure all arguments
|
||||
and configurations are valid.
|
||||
"""
|
||||
# Validation is handled in the CLI arguments parser
|
||||
# This method can be used for additional runtime validations
|
||||
pass
|
||||
45
prowler/providers/alibabacloud/config.py
Normal file
45
prowler/providers/alibabacloud/config.py
Normal file
@@ -0,0 +1,45 @@
|
||||
"""
|
||||
Alibaba Cloud Provider Configuration
|
||||
|
||||
This module contains configuration constants for the Alibaba Cloud provider.
|
||||
"""
|
||||
|
||||
# Default Alibaba Cloud regions
|
||||
ALIBABACLOUD_REGIONS = [
|
||||
"cn-hangzhou", # China (Hangzhou)
|
||||
"cn-shanghai", # China (Shanghai)
|
||||
"cn-qingdao", # China (Qingdao)
|
||||
"cn-beijing", # China (Beijing)
|
||||
"cn-zhangjiakou", # China (Zhangjiakou)
|
||||
"cn-huhehaote", # China (Hohhot)
|
||||
"cn-wulanchabu", # China (Ulanqab)
|
||||
"cn-shenzhen", # China (Shenzhen)
|
||||
"cn-heyuan", # China (Heyuan)
|
||||
"cn-guangzhou", # China (Guangzhou)
|
||||
"cn-chengdu", # China (Chengdu)
|
||||
"cn-hongkong", # China (Hong Kong)
|
||||
"ap-northeast-1", # Japan (Tokyo)
|
||||
"ap-southeast-1", # Singapore
|
||||
"ap-southeast-2", # Australia (Sydney)
|
||||
"ap-southeast-3", # Malaysia (Kuala Lumpur)
|
||||
"ap-southeast-5", # Indonesia (Jakarta)
|
||||
"ap-southeast-6", # Philippines (Manila)
|
||||
"ap-southeast-7", # Thailand (Bangkok)
|
||||
"ap-south-1", # India (Mumbai)
|
||||
"us-west-1", # US (Silicon Valley)
|
||||
"us-east-1", # US (Virginia)
|
||||
"eu-west-1", # UK (London)
|
||||
"eu-central-1", # Germany (Frankfurt)
|
||||
"me-east-1", # UAE (Dubai)
|
||||
]
|
||||
|
||||
# Alibaba Cloud SDK configuration
|
||||
ALIBABACLOUD_SDK_USER_AGENT = "Prowler"
|
||||
ALIBABACLOUD_SDK_MAX_RETRIES = 3
|
||||
ALIBABACLOUD_SDK_TIMEOUT = 30 # seconds
|
||||
|
||||
# Alibaba Cloud ARN format
|
||||
ALIBABACLOUD_ARN_FORMAT = "acs:{service}:{region}:{account_id}:{resource}"
|
||||
|
||||
# Default RAM role session name
|
||||
ALIBABACLOUD_RAM_SESSION_NAME = "ProwlerSession"
|
||||
90
prowler/providers/alibabacloud/exceptions/exceptions.py
Normal file
90
prowler/providers/alibabacloud/exceptions/exceptions.py
Normal file
@@ -0,0 +1,90 @@
|
||||
"""
|
||||
Alibaba Cloud Provider Exceptions
|
||||
|
||||
This module contains exception classes for the Alibaba Cloud provider.
|
||||
"""
|
||||
|
||||
|
||||
class AlibabaCloudException(Exception):
|
||||
"""Base exception for Alibaba Cloud provider errors"""
|
||||
pass
|
||||
|
||||
|
||||
class AlibabaCloudAuthenticationError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudAuthenticationError is raised when authentication fails
|
||||
|
||||
This can occur due to:
|
||||
- Invalid AccessKey credentials
|
||||
- Expired STS tokens
|
||||
- Insufficient permissions
|
||||
"""
|
||||
def __init__(self, message="Authentication to Alibaba Cloud failed"):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudSetUpSessionError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudSetUpSessionError is raised when session setup fails
|
||||
"""
|
||||
def __init__(self, message="Failed to set up Alibaba Cloud session"):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudAPIError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudAPIError is raised when an API call fails
|
||||
"""
|
||||
def __init__(self, service: str, operation: str, error: str):
|
||||
message = f"Alibaba Cloud API Error - Service: {service}, Operation: {operation}, Error: {error}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudNoCredentialsError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudNoCredentialsError is raised when no credentials are found
|
||||
"""
|
||||
def __init__(self, message="No Alibaba Cloud credentials found"):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudInvalidRegionError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudInvalidRegionError is raised when an invalid region is specified
|
||||
"""
|
||||
def __init__(self, region: str):
|
||||
message = f"Invalid Alibaba Cloud region: {region}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudAssumeRoleError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudAssumeRoleError is raised when assuming a RAM role fails
|
||||
"""
|
||||
def __init__(self, role_arn: str, error: str):
|
||||
message = f"Failed to assume RAM role {role_arn}: {error}"
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudInvalidAccessKeyError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudInvalidAccessKeyError is raised when AccessKey credentials are invalid
|
||||
"""
|
||||
def __init__(self, message="Invalid Alibaba Cloud AccessKey credentials"):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudAccountNotFoundError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudAccountNotFoundError is raised when account information cannot be retrieved
|
||||
"""
|
||||
def __init__(self, message="Unable to retrieve Alibaba Cloud account information"):
|
||||
super().__init__(message)
|
||||
|
||||
|
||||
class AlibabaCloudConfigValidationError(AlibabaCloudException):
|
||||
"""
|
||||
AlibabaCloudConfigValidationError is raised when configuration validation fails
|
||||
"""
|
||||
def __init__(self, message="Alibaba Cloud configuration validation failed"):
|
||||
super().__init__(message)
|
||||
0
prowler/providers/alibabacloud/lib/__init__.py
Normal file
0
prowler/providers/alibabacloud/lib/__init__.py
Normal file
203
prowler/providers/alibabacloud/lib/arguments/arguments.py
Normal file
203
prowler/providers/alibabacloud/lib/arguments/arguments.py
Normal file
@@ -0,0 +1,203 @@
|
||||
"""
|
||||
Alibaba Cloud Provider CLI Arguments
|
||||
|
||||
This module defines the command-line interface arguments for the Alibaba Cloud provider.
|
||||
"""
|
||||
|
||||
import os
|
||||
from argparse import ArgumentTypeError, Namespace
|
||||
|
||||
from prowler.providers.alibabacloud.config import ALIBABACLOUD_REGIONS
|
||||
|
||||
|
||||
def init_parser(self):
|
||||
"""
|
||||
Initialize Alibaba Cloud provider CLI argument parser
|
||||
|
||||
This function creates the argument parser for Alibaba Cloud provider and defines
|
||||
all the command-line arguments that can be used when auditing Alibaba Cloud.
|
||||
|
||||
Args:
|
||||
self: The ProwlerArgumentParser instance
|
||||
"""
|
||||
# Create Alibaba Cloud subparser
|
||||
alibabacloud_parser = self.subparsers.add_parser(
|
||||
"alibabacloud",
|
||||
parents=[self.common_providers_parser],
|
||||
help="Alibaba Cloud Provider",
|
||||
)
|
||||
|
||||
# Authentication group
|
||||
alibabacloud_auth = alibabacloud_parser.add_argument_group(
|
||||
"Alibaba Cloud Authentication"
|
||||
)
|
||||
|
||||
alibabacloud_auth.add_argument(
|
||||
"--access-key-id",
|
||||
nargs="?",
|
||||
default=os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_ID"),
|
||||
help="Alibaba Cloud AccessKey ID (also reads from ALIBABA_CLOUD_ACCESS_KEY_ID environment variable)",
|
||||
)
|
||||
|
||||
alibabacloud_auth.add_argument(
|
||||
"--access-key-secret",
|
||||
nargs="?",
|
||||
default=os.environ.get("ALIBABA_CLOUD_ACCESS_KEY_SECRET"),
|
||||
help="Alibaba Cloud AccessKey Secret (also reads from ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variable)",
|
||||
)
|
||||
|
||||
alibabacloud_auth.add_argument(
|
||||
"--security-token",
|
||||
nargs="?",
|
||||
default=os.environ.get("ALIBABA_CLOUD_SECURITY_TOKEN"),
|
||||
help="Alibaba Cloud STS Security Token for temporary credentials (also reads from ALIBABA_CLOUD_SECURITY_TOKEN environment variable)",
|
||||
)
|
||||
|
||||
# RAM Role assumption group
|
||||
alibabacloud_role = alibabacloud_parser.add_argument_group(
|
||||
"Alibaba Cloud RAM Role Assumption"
|
||||
)
|
||||
|
||||
alibabacloud_role.add_argument(
|
||||
"--ram-role-arn",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="RAM Role ARN to assume for the audit (format: acs:ram::account-id:role/role-name)",
|
||||
)
|
||||
|
||||
alibabacloud_role.add_argument(
|
||||
"--ram-session-name",
|
||||
nargs="?",
|
||||
default="ProwlerAuditSession",
|
||||
help="Session name for RAM role assumption (default: ProwlerAuditSession)",
|
||||
)
|
||||
|
||||
alibabacloud_role.add_argument(
|
||||
"--ram-session-duration",
|
||||
nargs="?",
|
||||
type=int,
|
||||
default=3600,
|
||||
help="Session duration in seconds for RAM role assumption (900-43200, default: 3600)",
|
||||
)
|
||||
|
||||
alibabacloud_role.add_argument(
|
||||
"--ram-external-id",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="External ID for RAM role assumption (optional)",
|
||||
)
|
||||
|
||||
# Regions group
|
||||
alibabacloud_regions = alibabacloud_parser.add_argument_group(
|
||||
"Alibaba Cloud Regions"
|
||||
)
|
||||
|
||||
alibabacloud_regions.add_argument(
|
||||
"--region-id",
|
||||
"--region-ids",
|
||||
nargs="+",
|
||||
default=[],
|
||||
dest="region_ids",
|
||||
help=f"Alibaba Cloud Region IDs to audit (space-separated). Available regions: {', '.join(ALIBABACLOUD_REGIONS[:10])}... (default: all regions)",
|
||||
)
|
||||
|
||||
alibabacloud_regions.add_argument(
|
||||
"--filter-region",
|
||||
"--filter-regions",
|
||||
nargs="+",
|
||||
default=[],
|
||||
dest="filter_regions",
|
||||
help="Alibaba Cloud Region IDs to exclude from the audit (space-separated)",
|
||||
)
|
||||
|
||||
# Resource filtering group
|
||||
alibabacloud_resources = alibabacloud_parser.add_argument_group(
|
||||
"Alibaba Cloud Resource Filtering"
|
||||
)
|
||||
|
||||
alibabacloud_resources.add_argument(
|
||||
"--resource-tags",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="Filter resources by tags (format: key=value)",
|
||||
)
|
||||
|
||||
alibabacloud_resources.add_argument(
|
||||
"--resource-ids",
|
||||
nargs="+",
|
||||
default=[],
|
||||
help="Specific resource IDs to audit (space-separated)",
|
||||
)
|
||||
|
||||
|
||||
def validate_arguments(arguments: Namespace) -> tuple[bool, str]:
|
||||
"""
|
||||
Validate Alibaba Cloud provider arguments
|
||||
|
||||
This function validates the command-line arguments provided for the Alibaba Cloud provider.
|
||||
It checks for required credentials, validates region specifications, and ensures
|
||||
configuration coherence.
|
||||
|
||||
Args:
|
||||
arguments: Parsed command-line arguments
|
||||
|
||||
Returns:
|
||||
tuple: (is_valid: bool, error_message: str)
|
||||
- is_valid: True if arguments are valid, False otherwise
|
||||
- error_message: Error description if invalid, empty string if valid
|
||||
"""
|
||||
# Check for required credentials
|
||||
if not arguments.access_key_id or not arguments.access_key_secret:
|
||||
return (
|
||||
False,
|
||||
"Alibaba Cloud AccessKey credentials are required. "
|
||||
"Provide --access-key-id and --access-key-secret, "
|
||||
"or set ALIBABA_CLOUD_ACCESS_KEY_ID and ALIBABA_CLOUD_ACCESS_KEY_SECRET environment variables.",
|
||||
)
|
||||
|
||||
# Validate RAM role session duration
|
||||
if arguments.ram_role_arn:
|
||||
if not (900 <= arguments.ram_session_duration <= 43200):
|
||||
return (
|
||||
False,
|
||||
f"RAM role session duration must be between 900 and 43200 seconds. "
|
||||
f"Provided: {arguments.ram_session_duration}",
|
||||
)
|
||||
|
||||
# Validate region IDs
|
||||
if arguments.region_ids:
|
||||
invalid_regions = [
|
||||
region for region in arguments.region_ids
|
||||
if region not in ALIBABACLOUD_REGIONS
|
||||
]
|
||||
if invalid_regions:
|
||||
return (
|
||||
False,
|
||||
f"Invalid Alibaba Cloud region(s): {', '.join(invalid_regions)}. "
|
||||
f"Valid regions are: {', '.join(ALIBABACLOUD_REGIONS)}",
|
||||
)
|
||||
|
||||
# Validate filter regions
|
||||
if arguments.filter_regions:
|
||||
invalid_filter_regions = [
|
||||
region for region in arguments.filter_regions
|
||||
if region not in ALIBABACLOUD_REGIONS
|
||||
]
|
||||
if invalid_filter_regions:
|
||||
return (
|
||||
False,
|
||||
f"Invalid Alibaba Cloud filter region(s): {', '.join(invalid_filter_regions)}. "
|
||||
f"Valid regions are: {', '.join(ALIBABACLOUD_REGIONS)}",
|
||||
)
|
||||
|
||||
# Validate resource tags format
|
||||
if arguments.resource_tags:
|
||||
for tag in arguments.resource_tags:
|
||||
if "=" not in tag:
|
||||
return (
|
||||
False,
|
||||
f"Invalid resource tag format: '{tag}'. Expected format: key=value",
|
||||
)
|
||||
|
||||
# All validations passed
|
||||
return (True, "")
|
||||
13
prowler/providers/alibabacloud/lib/check/check_utils.py
Normal file
13
prowler/providers/alibabacloud/lib/check/check_utils.py
Normal file
@@ -0,0 +1,13 @@
|
||||
"""Utility classes for Alibaba Cloud checks"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class GenericAlibabaCloudResource:
|
||||
"""Generic resource for checks that don't have a specific resource type"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
82
prowler/providers/alibabacloud/lib/mutelist/mutelist.py
Normal file
82
prowler/providers/alibabacloud/lib/mutelist/mutelist.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""
|
||||
Alibaba Cloud Provider Mutelist
|
||||
|
||||
This module implements the mutelist functionality for suppressing findings
|
||||
in Alibaba Cloud audits.
|
||||
"""
|
||||
|
||||
from prowler.lib.mutelist.mutelist import Mutelist
|
||||
|
||||
|
||||
class AlibabaCloudMutelist(Mutelist):
|
||||
"""
|
||||
AlibabaCloudMutelist handles finding suppression for Alibaba Cloud resources
|
||||
|
||||
This class extends the base Mutelist class to provide Alibaba Cloud-specific
|
||||
mutelist functionality, allowing users to suppress specific findings based on
|
||||
resource identifiers, regions, checks, or other criteria.
|
||||
|
||||
Example mutelist entry:
|
||||
{
|
||||
"Accounts": ["1234567890"],
|
||||
"Checks": {
|
||||
"ecs_*": {
|
||||
"Regions": ["cn-hangzhou", "cn-shanghai"],
|
||||
"Resources": ["i-abc123", "i-def456"]
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
mutelist_path: str = None,
|
||||
mutelist_content: dict = None,
|
||||
provider: str = "alibabacloud",
|
||||
identity: dict = None,
|
||||
):
|
||||
"""
|
||||
Initialize Alibaba Cloud mutelist
|
||||
|
||||
Args:
|
||||
mutelist_path: Path to the mutelist file (YAML or JSON)
|
||||
mutelist_content: Mutelist content as a dictionary
|
||||
provider: Provider name (default: "alibabacloud")
|
||||
identity: Alibaba Cloud identity information
|
||||
"""
|
||||
super().__init__(
|
||||
mutelist_path=mutelist_path,
|
||||
mutelist_content=mutelist_content,
|
||||
)
|
||||
self.identity = identity
|
||||
self.provider = provider
|
||||
|
||||
def is_finding_muted(
|
||||
self,
|
||||
finding,
|
||||
account_id: str = None,
|
||||
region: str = None,
|
||||
check_id: str = None,
|
||||
resource_id: str = None,
|
||||
) -> bool:
|
||||
"""
|
||||
Check if a finding should be muted based on mutelist rules
|
||||
|
||||
Args:
|
||||
finding: The finding object to check
|
||||
account_id: Alibaba Cloud account ID
|
||||
region: Alibaba Cloud region ID
|
||||
check_id: Check identifier
|
||||
resource_id: Resource identifier
|
||||
|
||||
Returns:
|
||||
bool: True if the finding is muted, False otherwise
|
||||
"""
|
||||
# Use the parent class implementation which handles the core logic
|
||||
return super().is_muted(
|
||||
account_uid=account_id or getattr(finding, "account_uid", None),
|
||||
region=region or getattr(finding, "region", None),
|
||||
check_id=check_id or getattr(finding, "check_metadata", {}).get("CheckID"),
|
||||
resource_id=resource_id or getattr(finding, "resource_uid", None),
|
||||
finding_tags=getattr(finding, "resource_tags", []),
|
||||
)
|
||||
146
prowler/providers/alibabacloud/lib/service/service.py
Normal file
146
prowler/providers/alibabacloud/lib/service/service.py
Normal file
@@ -0,0 +1,146 @@
|
||||
"""
|
||||
Alibaba Cloud Service Base Class
|
||||
|
||||
This module provides the base class for all Alibaba Cloud service implementations.
|
||||
"""
|
||||
|
||||
import threading
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
|
||||
|
||||
class AlibabaCloudService:
|
||||
"""
|
||||
Base class for Alibaba Cloud service implementations
|
||||
|
||||
This class provides common functionality for all Alibaba Cloud services, including:
|
||||
- Regional client management
|
||||
- Multi-threading support for API calls
|
||||
- Error handling patterns
|
||||
- Resource auditing metadata
|
||||
|
||||
Attributes:
|
||||
provider: The Alibaba Cloud provider instance
|
||||
service: Service name (e.g., "ecs", "oss", "ram")
|
||||
account_id: Alibaba Cloud account ID
|
||||
regions: List of regions to audit
|
||||
audit_config: Audit configuration dictionary
|
||||
regional_clients: Dictionary of regional SDK clients
|
||||
"""
|
||||
|
||||
def __init__(self, service: str, provider):
|
||||
"""
|
||||
Initialize the Alibaba Cloud service
|
||||
|
||||
Args:
|
||||
service: Service identifier (e.g., "ecs", "oss")
|
||||
provider: AlibabaCloudProvider instance
|
||||
"""
|
||||
self.provider = provider
|
||||
self.service = service
|
||||
self.account_id = provider.identity.account_id
|
||||
self.regions = provider._regions
|
||||
self.audit_config = provider.audit_config
|
||||
self.regional_clients = {}
|
||||
|
||||
logger.info(f"Initializing Alibaba Cloud {service.upper()} service")
|
||||
|
||||
def __threading_call__(self, call, iterator):
|
||||
"""
|
||||
Execute function calls in parallel using threading
|
||||
|
||||
This method is used to parallelize API calls across regions or resources
|
||||
to improve audit performance.
|
||||
|
||||
Args:
|
||||
call: Function to execute
|
||||
iterator: Iterable of arguments to pass to the function
|
||||
|
||||
Example:
|
||||
self.__threading_call__(self._describe_instances, self.regions)
|
||||
"""
|
||||
threads = []
|
||||
for item in iterator:
|
||||
thread = threading.Thread(target=call, args=(item,))
|
||||
threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
# Wait for all threads to complete
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
def _create_regional_client(self, region: str, _endpoint_override: str = None):
|
||||
"""
|
||||
Create a regional Alibaba Cloud SDK client
|
||||
|
||||
This method should be overridden by service implementations to create
|
||||
service-specific clients.
|
||||
|
||||
Args:
|
||||
region: Region identifier
|
||||
_endpoint_override: Optional endpoint override URL (unused in base)
|
||||
|
||||
Returns:
|
||||
SDK client instance for the specified region
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
f"Service {self.service} must implement _create_regional_client()"
|
||||
)
|
||||
|
||||
def _list_resources(self):
|
||||
"""
|
||||
List all resources for this service
|
||||
|
||||
This method should be overridden by service implementations to list
|
||||
service-specific resources.
|
||||
"""
|
||||
raise NotImplementedError(
|
||||
f"Service {self.service} must implement _list_resources()"
|
||||
)
|
||||
|
||||
def _get_resource_details(self, resource):
|
||||
"""
|
||||
Get detailed information about a resource
|
||||
|
||||
This method can be overridden by service implementations to fetch
|
||||
additional resource details.
|
||||
|
||||
Args:
|
||||
resource: Resource identifier or object
|
||||
"""
|
||||
|
||||
def _handle_api_error(self, error: Exception, operation: str, region: str = None):
|
||||
"""
|
||||
Handle Alibaba Cloud API errors with consistent logging
|
||||
|
||||
Args:
|
||||
error: The exception that occurred
|
||||
operation: The API operation that failed
|
||||
region: The region where the error occurred (if applicable)
|
||||
"""
|
||||
region_info = f" in region {region}" if region else ""
|
||||
logger.warning(
|
||||
f"{self.service.upper()} {operation} failed{region_info}: {str(error)}"
|
||||
)
|
||||
|
||||
def generate_resource_arn(
|
||||
self, resource_type: str, resource_id: str, region: str = ""
|
||||
) -> str:
|
||||
"""
|
||||
Generate Alibaba Cloud Resource Name (ARN) in ACS format
|
||||
|
||||
Format: acs:{service}:{region}:{account-id}:{resource-type}/{resource-id}
|
||||
|
||||
Args:
|
||||
resource_type: Type of resource (e.g., "instance", "bucket")
|
||||
resource_id: Resource identifier
|
||||
region: Region identifier (optional for global resources)
|
||||
|
||||
Returns:
|
||||
str: Formatted ARN string
|
||||
|
||||
Example:
|
||||
arn = self.generate_resource_arn("instance", "i-abc123", "cn-hangzhou")
|
||||
# Returns: "acs:ecs:cn-hangzhou:123456789:instance/i-abc123"
|
||||
"""
|
||||
return f"acs:{self.service}:{region}:{self.account_id}:{resource_type}/{resource_id}"
|
||||
129
prowler/providers/alibabacloud/models.py
Normal file
129
prowler/providers/alibabacloud/models.py
Normal file
@@ -0,0 +1,129 @@
|
||||
"""
|
||||
Alibaba Cloud Provider Models
|
||||
|
||||
This module contains data models for the Alibaba Cloud provider.
|
||||
"""
|
||||
|
||||
from argparse import Namespace
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from prowler.providers.common.models import ProviderOutputOptions
|
||||
|
||||
|
||||
@dataclass
|
||||
class AlibabaCloudIdentityInfo:
|
||||
"""
|
||||
AlibabaCloudIdentityInfo contains the Alibaba Cloud identity information
|
||||
|
||||
Attributes:
|
||||
account_id: Alibaba Cloud account ID
|
||||
account_arn: Alibaba Cloud account ARN
|
||||
user_id: RAM user ID (optional)
|
||||
user_name: RAM user name (optional)
|
||||
account_name: Account alias/name (optional)
|
||||
"""
|
||||
account_id: str
|
||||
account_arn: str
|
||||
user_id: Optional[str] = None
|
||||
user_name: Optional[str] = None
|
||||
account_name: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class AlibabaCloudRegion:
|
||||
"""
|
||||
AlibabaCloudRegion contains region information
|
||||
|
||||
Attributes:
|
||||
region_id: Region identifier (e.g., "cn-hangzhou")
|
||||
local_name: Localized region name
|
||||
region_endpoint: Region endpoint URL
|
||||
"""
|
||||
region_id: str
|
||||
local_name: str
|
||||
region_endpoint: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class AlibabaCloudCredentials:
|
||||
"""
|
||||
AlibabaCloudCredentials contains authentication credentials
|
||||
|
||||
Attributes:
|
||||
access_key_id: AccessKey ID
|
||||
access_key_secret: AccessKey Secret
|
||||
security_token: STS security token (optional)
|
||||
expiration: Token expiration timestamp (optional)
|
||||
"""
|
||||
access_key_id: str
|
||||
access_key_secret: str
|
||||
security_token: Optional[str] = None
|
||||
expiration: Optional[str] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class AlibabaCloudAssumeRoleInfo:
|
||||
"""
|
||||
AlibabaCloudAssumeRoleInfo contains RAM role assumption information
|
||||
|
||||
Attributes:
|
||||
role_arn: RAM role ARN to assume
|
||||
role_session_name: Session name for the assumed role
|
||||
external_id: External ID for role assumption (optional)
|
||||
session_duration: Duration in seconds (900-43200, default 3600)
|
||||
"""
|
||||
role_arn: str
|
||||
role_session_name: str
|
||||
external_id: Optional[str] = None
|
||||
session_duration: int = 3600
|
||||
|
||||
|
||||
@dataclass
|
||||
class AlibabaCloudSession:
|
||||
"""
|
||||
AlibabaCloudSession contains the session configuration
|
||||
|
||||
Attributes:
|
||||
credentials: Alibaba Cloud credentials
|
||||
region_id: Default region for the session
|
||||
"""
|
||||
credentials: AlibabaCloudCredentials
|
||||
region_id: str = "cn-hangzhou"
|
||||
|
||||
|
||||
class AlibabaCloudOutputOptions(ProviderOutputOptions):
|
||||
"""
|
||||
AlibabaCloudOutputOptions contains the output configuration for Alibaba Cloud
|
||||
|
||||
This class extends ProviderOutputOptions to provide Alibaba Cloud-specific
|
||||
output filename generation based on the account ID.
|
||||
"""
|
||||
|
||||
def __init__(self, arguments: Namespace, bulk_checks_metadata: dict, identity: AlibabaCloudIdentityInfo):
|
||||
"""
|
||||
Initialize Alibaba Cloud output options
|
||||
|
||||
Args:
|
||||
arguments: Command-line arguments
|
||||
bulk_checks_metadata: Metadata for all checks
|
||||
identity: Alibaba Cloud identity information
|
||||
"""
|
||||
# Call parent class init
|
||||
super().__init__(arguments, bulk_checks_metadata)
|
||||
|
||||
# Import here to avoid circular dependency
|
||||
from prowler.config.config import output_file_timestamp
|
||||
|
||||
# Check if custom output filename was provided
|
||||
if (
|
||||
not hasattr(arguments, "output_filename")
|
||||
or arguments.output_filename is None
|
||||
):
|
||||
# Use account ID for output filename
|
||||
account_identifier = identity.account_id
|
||||
self.output_filename = (
|
||||
f"prowler-output-{account_identifier}-{output_file_timestamp}"
|
||||
)
|
||||
else:
|
||||
self.output_filename = arguments.output_filename
|
||||
0
prowler/providers/alibabacloud/services/__init__.py
Normal file
0
prowler/providers/alibabacloud/services/__init__.py
Normal file
1
prowler/providers/alibabacloud/services/ack/__init__.py
Normal file
1
prowler/providers/alibabacloud/services/ack/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Alibaba Cloud ACK (Container Service for Kubernetes) module"""
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Alibaba Cloud ACK Client Singleton"""
|
||||
|
||||
from prowler.providers.alibabacloud.services.ack.ack_service import ACK
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
ack_client = ACK(Provider.get_global_provider())
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK Audit Logging Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_audit_log",
|
||||
"CheckTitle": "Check ACK Audit Logging",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK Audit Logging.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_audit_log(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have audit logging enabled."
|
||||
if cluster.audit_log_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} has audit logging enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK Encryption Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_encryption",
|
||||
"CheckTitle": "Check ACK Encryption",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "high",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK Encryption.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_encryption(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} does not have encryption enabled."
|
||||
)
|
||||
if cluster.encryption_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} has encryption enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK Network Policy Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_network_policy",
|
||||
"CheckTitle": "Check ACK Network Policy",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK Network Policy.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_network_policy(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have network policy support enabled."
|
||||
if cluster.network_policy_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} has network policy support enabled."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK Private Zone Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_private_zone",
|
||||
"CheckTitle": "Check ACK Private Zone",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "low",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK Private Zone.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_private_zone(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} does not have PrivateZone enabled."
|
||||
)
|
||||
if cluster.private_zone_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} has PrivateZone enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK Public Access Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_public_access",
|
||||
"CheckTitle": "Check ACK Public Access",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "high",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK Public Access.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_public_access(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} API server is publicly accessible."
|
||||
)
|
||||
if not cluster.public_access:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} API server is not publicly accessible."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK RBAC Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_rbac",
|
||||
"CheckTitle": "Check ACK RBAC",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK RBAC.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_rbac(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} does not have RBAC enabled."
|
||||
)
|
||||
if cluster.rbac_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} has RBAC enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK Security Group Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_security_group",
|
||||
"CheckTitle": "Check ACK Security Group",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK Security Group.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_security_group(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} does not have a security group configured."
|
||||
if cluster.security_group_id:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} has security group {cluster.security_group_id} configured."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ACK VPC Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ack_cluster_vpc",
|
||||
"CheckTitle": "Check ACK VPC",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ack",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Cluster",
|
||||
"Description": "Checks ACK VPC.",
|
||||
"Risk": "",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "", "Url": ""}},
|
||||
"Categories": [],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ack.ack_client import ack_client
|
||||
|
||||
|
||||
class ack_cluster_vpc(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for cluster in ack_client.clusters.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=cluster
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ACK cluster {cluster.cluster_name} is not deployed in a VPC."
|
||||
)
|
||||
if cluster.vpc_id:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ACK cluster {cluster.cluster_name} is deployed in VPC {cluster.vpc_id}."
|
||||
findings.append(report)
|
||||
return findings
|
||||
194
prowler/providers/alibabacloud/services/ack/ack_service.py
Normal file
194
prowler/providers/alibabacloud/services/ack/ack_service.py
Normal file
@@ -0,0 +1,194 @@
|
||||
"""Alibaba Cloud ACK Service"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
|
||||
|
||||
|
||||
@dataclass
|
||||
class Cluster:
|
||||
"""ACK Cluster"""
|
||||
|
||||
cluster_id: str
|
||||
cluster_name: str
|
||||
arn: str
|
||||
region: str
|
||||
cluster_type: str = "Kubernetes"
|
||||
state: str = "running"
|
||||
public_access: bool = True # Will trigger check
|
||||
private_zone_enabled: bool = False
|
||||
network_policy_enabled: bool = False # Will trigger check
|
||||
rbac_enabled: bool = True
|
||||
encryption_enabled: bool = False # Will trigger check
|
||||
audit_log_enabled: bool = False # Will trigger check
|
||||
security_group_id: str = ""
|
||||
vpc_id: str = ""
|
||||
master_url: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
pass
|
||||
|
||||
|
||||
class ACK(AlibabaCloudService):
|
||||
def __init__(self, provider):
|
||||
super().__init__("ack", provider)
|
||||
self.clusters = {}
|
||||
logger.info("Collecting ACK clusters...")
|
||||
self._describe_clusters()
|
||||
logger.info(f"ACK service initialized - Clusters: {len(self.clusters)}")
|
||||
|
||||
def _describe_clusters(self):
|
||||
for region in self.regions:
|
||||
try:
|
||||
from alibabacloud_cs20151215.client import Client as AckClient
|
||||
from alibabacloud_tea_openapi import models as openapi_models
|
||||
|
||||
# Create client configuration
|
||||
config = openapi_models.Config(
|
||||
access_key_id=self.provider.session.credentials.access_key_id,
|
||||
access_key_secret=self.provider.session.credentials.access_key_secret,
|
||||
region_id=region,
|
||||
)
|
||||
|
||||
if self.provider.session.credentials.security_token:
|
||||
config.security_token = (
|
||||
self.provider.session.credentials.security_token
|
||||
)
|
||||
|
||||
# Create ACK client
|
||||
client = AckClient(config)
|
||||
|
||||
# List clusters
|
||||
response = client.describe_clusters_v1()
|
||||
|
||||
# Process clusters
|
||||
if response.body and response.body.clusters:
|
||||
for cluster_data in response.body.clusters:
|
||||
cluster_id = cluster_data.cluster_id
|
||||
arn = self.generate_resource_arn("cluster", cluster_id, region)
|
||||
|
||||
# Get detailed cluster info
|
||||
try:
|
||||
detail_response = client.describe_cluster_detail(cluster_id)
|
||||
detail = (
|
||||
detail_response.body
|
||||
if detail_response.body
|
||||
else cluster_data
|
||||
)
|
||||
|
||||
# Check audit log
|
||||
audit_log_enabled = False
|
||||
if hasattr(detail, "parameters") and detail.parameters:
|
||||
audit_log_enabled = (
|
||||
detail.parameters.get("AuditLogEnabled", "false")
|
||||
== "true"
|
||||
)
|
||||
|
||||
# Check encryption
|
||||
encryption_enabled = False
|
||||
if hasattr(detail, "parameters") and detail.parameters:
|
||||
encryption_enabled = (
|
||||
detail.parameters.get("EncryptionEnabled", "false")
|
||||
== "true"
|
||||
)
|
||||
|
||||
# Check network policy
|
||||
network_policy_enabled = False
|
||||
if hasattr(detail, "parameters") and detail.parameters:
|
||||
network_policy_enabled = (
|
||||
detail.parameters.get("NetworkPlugin", "")
|
||||
== "terway"
|
||||
)
|
||||
|
||||
# Check RBAC
|
||||
rbac_enabled = (
|
||||
True # Default to true for modern ACK clusters
|
||||
)
|
||||
if hasattr(detail, "parameters") and detail.parameters:
|
||||
rbac_enabled = (
|
||||
detail.parameters.get("RBACEnabled", "true")
|
||||
== "true"
|
||||
)
|
||||
|
||||
# Check private zone
|
||||
private_zone_enabled = False
|
||||
if hasattr(detail, "private_zone", "false"):
|
||||
private_zone_enabled = detail.private_zone
|
||||
|
||||
cluster = Cluster(
|
||||
cluster_id=cluster_id,
|
||||
cluster_name=(
|
||||
cluster_data.name
|
||||
if cluster_data.name
|
||||
else cluster_id
|
||||
),
|
||||
arn=arn,
|
||||
region=region,
|
||||
cluster_type=(
|
||||
cluster_data.cluster_type
|
||||
if hasattr(cluster_data, "cluster_type")
|
||||
else "Kubernetes"
|
||||
),
|
||||
state=(
|
||||
cluster_data.state
|
||||
if hasattr(cluster_data, "state")
|
||||
else "running"
|
||||
),
|
||||
public_access=(
|
||||
hasattr(cluster_data, "public_slb")
|
||||
and cluster_data.public_slb
|
||||
if hasattr(cluster_data, "public_slb")
|
||||
else False
|
||||
),
|
||||
private_zone_enabled=private_zone_enabled,
|
||||
network_policy_enabled=network_policy_enabled,
|
||||
rbac_enabled=rbac_enabled,
|
||||
encryption_enabled=encryption_enabled,
|
||||
audit_log_enabled=audit_log_enabled,
|
||||
security_group_id=(
|
||||
cluster_data.security_group_id
|
||||
if hasattr(cluster_data, "security_group_id")
|
||||
else ""
|
||||
),
|
||||
vpc_id=(
|
||||
cluster_data.vpc_id
|
||||
if hasattr(cluster_data, "vpc_id")
|
||||
else ""
|
||||
),
|
||||
master_url=(
|
||||
cluster_data.master_url
|
||||
if hasattr(cluster_data, "master_url")
|
||||
else ""
|
||||
),
|
||||
)
|
||||
|
||||
self.clusters[arn] = cluster
|
||||
logger.info(f"Found ACK cluster: {cluster_id} in {region}")
|
||||
|
||||
except Exception as detail_error:
|
||||
logger.warning(
|
||||
f"Could not get details for cluster {cluster_id}: {detail_error}"
|
||||
)
|
||||
# Use basic cluster data
|
||||
cluster = Cluster(
|
||||
cluster_id=cluster_id,
|
||||
cluster_name=(
|
||||
cluster_data.name
|
||||
if cluster_data.name
|
||||
else cluster_id
|
||||
),
|
||||
arn=arn,
|
||||
region=region,
|
||||
vpc_id=(
|
||||
cluster_data.vpc_id
|
||||
if hasattr(cluster_data, "vpc_id")
|
||||
else ""
|
||||
),
|
||||
)
|
||||
self.clusters[arn] = cluster
|
||||
else:
|
||||
logger.info(f"No ACK clusters found in {region}")
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(error, "DescribeClusters", region)
|
||||
@@ -0,0 +1 @@
|
||||
"""Alibaba Cloud ActionTrail module"""
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Alibaba Cloud ActionTrail Client Singleton"""
|
||||
|
||||
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_service import ActionTrail
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
actiontrail_client = ActionTrail(Provider.get_global_provider())
|
||||
@@ -0,0 +1,148 @@
|
||||
"""
|
||||
Alibaba Cloud ActionTrail Service
|
||||
|
||||
This module provides the service class for Alibaba Cloud ActionTrail.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
|
||||
|
||||
|
||||
@dataclass
|
||||
class Trail:
|
||||
"""ActionTrail Trail"""
|
||||
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
status: str = "Disabled" # Will trigger check
|
||||
oss_bucket_name: str = ""
|
||||
oss_key_prefix: str = ""
|
||||
sls_project_arn: str = ""
|
||||
sls_write_role_arn: str = ""
|
||||
event_rw: str = "Write" # Should be "All"
|
||||
trail_region: str = "All"
|
||||
is_organization_trail: bool = False
|
||||
mns_topic_arn: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
pass
|
||||
|
||||
|
||||
class ActionTrail(AlibabaCloudService):
|
||||
"""
|
||||
Alibaba Cloud ActionTrail service class
|
||||
|
||||
Handles collection of ActionTrail resources including trails and their configurations.
|
||||
"""
|
||||
|
||||
def __init__(self, provider):
|
||||
"""Initialize ActionTrail service"""
|
||||
super().__init__("actiontrail", provider)
|
||||
|
||||
self.trails = {}
|
||||
|
||||
logger.info("Collecting ActionTrail trails...")
|
||||
self._describe_trails()
|
||||
|
||||
logger.info(f"ActionTrail service initialized - Trails: {len(self.trails)}")
|
||||
|
||||
def _describe_trails(self):
|
||||
"""Describe all ActionTrail trails"""
|
||||
# ActionTrail is a global service, but we'll check it once
|
||||
try:
|
||||
from alibabacloud_actiontrail20200706 import models
|
||||
from alibabacloud_actiontrail20200706.client import (
|
||||
Client as ActionTrailClient,
|
||||
)
|
||||
from alibabacloud_tea_openapi import models as openapi_models
|
||||
|
||||
# Create client configuration (use cn-hangzhou as default region)
|
||||
config = openapi_models.Config(
|
||||
access_key_id=self.provider.session.credentials.access_key_id,
|
||||
access_key_secret=self.provider.session.credentials.access_key_secret,
|
||||
region_id="cn-hangzhou",
|
||||
)
|
||||
|
||||
if self.provider.session.credentials.security_token:
|
||||
config.security_token = self.provider.session.credentials.security_token
|
||||
|
||||
# Create ActionTrail client
|
||||
client = ActionTrailClient(config)
|
||||
|
||||
# List trails
|
||||
request = models.ListTrailsRequest()
|
||||
response = client.list_trails(request)
|
||||
|
||||
# Process trails
|
||||
if response.body.trails:
|
||||
for trail_data in response.body.trails:
|
||||
trail_name = trail_data.name if trail_data.name else "unknown"
|
||||
arn = self.generate_resource_arn("trail", trail_name, "")
|
||||
|
||||
# Get trail status
|
||||
try:
|
||||
status_request = models.GetTrailStatusRequest(name=trail_name)
|
||||
status_response = client.get_trail_status(status_request)
|
||||
status = (
|
||||
"Enabled" if status_response.body.is_logging else "Disabled"
|
||||
)
|
||||
except Exception:
|
||||
status = "Unknown"
|
||||
|
||||
trail = Trail(
|
||||
name=trail_name,
|
||||
arn=arn,
|
||||
region="global",
|
||||
status=status,
|
||||
oss_bucket_name=(
|
||||
trail_data.oss_bucket_name
|
||||
if hasattr(trail_data, "oss_bucket_name")
|
||||
else ""
|
||||
),
|
||||
oss_key_prefix=(
|
||||
trail_data.oss_key_prefix
|
||||
if hasattr(trail_data, "oss_key_prefix")
|
||||
else ""
|
||||
),
|
||||
sls_project_arn=(
|
||||
trail_data.sls_project_arn
|
||||
if hasattr(trail_data, "sls_project_arn")
|
||||
else ""
|
||||
),
|
||||
sls_write_role_arn=(
|
||||
trail_data.sls_write_role_arn
|
||||
if hasattr(trail_data, "sls_write_role_arn")
|
||||
else ""
|
||||
),
|
||||
event_rw=(
|
||||
trail_data.event_rw
|
||||
if hasattr(trail_data, "event_rw")
|
||||
else "Write"
|
||||
),
|
||||
trail_region=(
|
||||
trail_data.trail_region
|
||||
if hasattr(trail_data, "trail_region")
|
||||
else "All"
|
||||
),
|
||||
is_organization_trail=(
|
||||
trail_data.is_organization_trail
|
||||
if hasattr(trail_data, "is_organization_trail")
|
||||
else False
|
||||
),
|
||||
mns_topic_arn=(
|
||||
trail_data.mns_topic_arn
|
||||
if hasattr(trail_data, "mns_topic_arn")
|
||||
else ""
|
||||
),
|
||||
)
|
||||
|
||||
self.trails[arn] = trail
|
||||
logger.info(f"Found ActionTrail trail: {trail_name}")
|
||||
else:
|
||||
logger.info("No ActionTrail trails found")
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(error, "DescribeTrails", "global")
|
||||
@@ -0,0 +1 @@
|
||||
"""ActionTrail Trail Enabled Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "actiontrail_trail_enabled",
|
||||
"CheckTitle": "Ensure at least one ActionTrail trail is enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "actiontrail",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "high",
|
||||
"ResourceType": "Trail",
|
||||
"Description": "Ensures that at least one ActionTrail trail is enabled to log API activity for security auditing. ActionTrail records API calls and events for compliance, security analysis, and troubleshooting.",
|
||||
"Risk": "Without ActionTrail enabled, there is no audit log of API activity, making it impossible to track changes, detect unauthorized access, or investigate security incidents. This can lead to compliance violations and delayed incident response.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ActionTrail/trail-enabled.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable at least one ActionTrail trail. Go to ActionTrail Console and create/enable a trail to log API activity across all regions.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/actiontrail/latest/create-a-trail-to-deliver-events-to-oss"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"logging"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
from dataclasses import dataclass
|
||||
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_client import (
|
||||
actiontrail_client,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class ActionTrailConfig:
|
||||
id: str
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
|
||||
|
||||
class actiontrail_trail_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
enabled_trails = [
|
||||
trail
|
||||
for trail in actiontrail_client.trails.values()
|
||||
if trail.status == "Enabled"
|
||||
]
|
||||
|
||||
config = ActionTrailConfig(
|
||||
id="actiontrail-configuration",
|
||||
name="ActionTrail Configuration",
|
||||
arn=f"acs:actiontrail::{actiontrail_client.account_id}:configuration",
|
||||
region="global",
|
||||
)
|
||||
|
||||
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=config)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = "No enabled ActionTrail trails found."
|
||||
if len(enabled_trails) > 0:
|
||||
trail_names = [trail.name for trail in enabled_trails]
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"ActionTrail has {len(enabled_trails)} enabled trail(s): {', '.join(trail_names)}."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ActionTrail Trail Logs All Events Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "actiontrail_trail_logs_all_events",
|
||||
"CheckTitle": "Ensure ActionTrail trails log all events (read and write)",
|
||||
"CheckType": [],
|
||||
"ServiceName": "actiontrail",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Trail",
|
||||
"Description": "Ensures that ActionTrail trails are configured to log both read and write events. Logging all events provides complete audit coverage for security analysis and compliance.",
|
||||
"Risk": "Logging only write events means read-only API calls are not recorded, potentially missing important security events such as unauthorized data access attempts or reconnaissance activities.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ActionTrail/trail-log-all-events.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Update ActionTrail trails to log all events. Go to ActionTrail Console, select the trail, and configure it to log both read and write events.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/actiontrail/latest/create-a-trail-to-deliver-events-to-oss"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"logging"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.actiontrail.actiontrail_client import (
|
||||
actiontrail_client,
|
||||
)
|
||||
|
||||
|
||||
class actiontrail_trail_logs_all_events(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for trail in actiontrail_client.trails.values():
|
||||
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=trail)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ActionTrail trail {trail.name} only logs {trail.event_rw} events."
|
||||
)
|
||||
if trail.event_rw == "All":
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ActionTrail trail {trail.name} logs all events."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
1
prowler/providers/alibabacloud/services/ecs/__init__.py
Normal file
1
prowler/providers/alibabacloud/services/ecs/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Alibaba Cloud ECS (Elastic Compute Service) module"""
|
||||
11
prowler/providers/alibabacloud/services/ecs/ecs_client.py
Normal file
11
prowler/providers/alibabacloud/services/ecs/ecs_client.py
Normal file
@@ -0,0 +1,11 @@
|
||||
"""
|
||||
Alibaba Cloud ECS Client Singleton
|
||||
|
||||
This module provides the singleton ECS client instance.
|
||||
"""
|
||||
|
||||
from prowler.providers.alibabacloud.services.ecs.ecs_service import ECS
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
# Initialize ECS client singleton
|
||||
ecs_client = ECS(Provider.get_global_provider())
|
||||
@@ -0,0 +1,37 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ecs_disk_encryption_enabled",
|
||||
"CheckTitle": "Ensure ECS Disk Encryption is Enabled",
|
||||
"CheckType": [
|
||||
"Security",
|
||||
"Encryption"
|
||||
],
|
||||
"ServiceName": "ecs",
|
||||
"SubServiceName": "disk",
|
||||
"ResourceIdTemplate": "arn:acs:ecs:region:account-id:disk/disk-id",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "AlibabaCloud::ECS::Disk",
|
||||
"Description": "Ensure that all ECS disks (system disks and data disks) are encrypted to protect data at rest. Disk encryption provides an additional layer of security by encrypting data stored on ECS instances using Alibaba Cloud KMS.",
|
||||
"Risk": "Unencrypted disks can expose sensitive data if the physical storage media is compromised, if disk snapshots are inadvertently shared, or if unauthorized users gain access to the disk. Without encryption, data at rest is vulnerable to unauthorized access.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ECS/disk-encryption.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "# Note: Encryption must be enabled when creating a disk. Existing disks cannot be encrypted in-place.\n# Create a snapshot of the unencrypted disk, then create a new encrypted disk from the snapshot:\naliyun ecs CreateSnapshot --DiskId <disk-id> --SnapshotName encrypted-snapshot\naliyun ecs CreateDisk --SnapshotId <snapshot-id> --Encrypted true --KMSKeyId <kms-key-id>",
|
||||
"NativeIaC": "",
|
||||
"Other": "For existing disks, you must create a snapshot, create a new encrypted disk from the snapshot, detach the old disk, attach the new encrypted disk, and then delete the old disk.",
|
||||
"Terraform": "resource \"alicloud_disk\" \"encrypted_disk\" {\n disk_name = \"encrypted-disk\"\n size = 100\n category = \"cloud_essd\"\n encrypted = true\n kms_key_id = var.kms_key_id\n zone_id = \"cn-hangzhou-h\"\n}"
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable disk encryption for all new ECS disks. For existing unencrypted disks, create encrypted copies using snapshots and KMS. Use customer-managed KMS keys for enhanced control over encryption keys. Regularly rotate encryption keys and audit key usage.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/ecs/user-guide/encryption-overview"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"encryption",
|
||||
"data-protection"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "ECS disk encryption cannot be enabled on existing disks. You must create new encrypted disks or create encrypted copies from snapshots. Consider enabling default encryption at the account level to ensure all new disks are automatically encrypted.",
|
||||
"Compliance": []
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
|
||||
|
||||
|
||||
class ecs_disk_encryption_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for disk in ecs_client.disks.values():
|
||||
report = Check_Report_AlibabaCloud(metadata=self.metadata(), resource=disk)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ECS disk {disk.name} ({disk.id}) is not encrypted."
|
||||
)
|
||||
if disk.encrypted:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ECS disk {disk.name} ({disk.id}) is encrypted."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""ECS Instance Public IP Check"""
|
||||
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ecs_instance_public_ip",
|
||||
"CheckTitle": "Ensure ECS instances do not have unnecessary public IPs",
|
||||
"CheckType": [],
|
||||
"ServiceName": "ecs",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Instance",
|
||||
"Description": "Ensures ECS instances do not have unnecessary public IP addresses.",
|
||||
"Risk": "Public IP addresses increase attack surface.",
|
||||
"RelatedUrl": "",
|
||||
"Remediation": {"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""}, "Recommendation": {"Text": "Use EIP or NAT Gateway instead.", "Url": ""}},
|
||||
"Categories": ["internet-exposed"],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
|
||||
|
||||
|
||||
class ecs_instance_public_ip(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for instance in ecs_client.instances.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=instance
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"ECS instance {instance.name} has public IP {instance.public_ip}."
|
||||
)
|
||||
if not instance.public_ip:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"ECS instance {instance.name} does not have a public IP address."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1,37 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "ecs_instance_ssh_access_restricted",
|
||||
"CheckTitle": "Check for Unrestricted SSH Access",
|
||||
"CheckType": [
|
||||
"Security",
|
||||
"Network"
|
||||
],
|
||||
"ServiceName": "ecs",
|
||||
"SubServiceName": "security_group",
|
||||
"ResourceIdTemplate": "arn:acs:ecs:region:account-id:security-group/sg-id",
|
||||
"Severity": "high",
|
||||
"ResourceType": "AlibabaCloud::ECS::SecurityGroup",
|
||||
"Description": "Ensure ECS security groups do not allow unrestricted SSH access (port 22) from the internet (0.0.0.0/0). Unrestricted SSH access increases the risk of unauthorized access, brute force attacks, and potential security breaches.",
|
||||
"Risk": "Allowing unrestricted SSH access from the internet can expose instances to brute force attacks, unauthorized access attempts, and potential compromise. Attackers can scan for open SSH ports and attempt to gain unauthorized access to instances.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/ECS/unrestricted-ssh-access.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "aliyun ecs ModifySecurityGroupRule --SecurityGroupId <sg-id> --IpProtocol tcp --PortRange 22/22 --SourceCidrIp <your-ip>/32",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": "resource \"alicloud_security_group_rule\" \"allow_ssh_from_specific_ip\" {\n type = \"ingress\"\n ip_protocol = \"tcp\"\n port_range = \"22/22\"\n security_group_id = alicloud_security_group.group.id\n cidr_ip = \"203.0.113.0/24\"\n}"
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Restrict SSH access to specific trusted IP addresses or IP ranges. Implement a bastion host or VPN for secure remote access. Consider using Alibaba Cloud's Session Manager for instance access without opening SSH ports.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/ecs/user-guide/security-groups-overview"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"internet-exposed",
|
||||
"encryption"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "This check examines security group rules to identify unrestricted SSH access. It is recommended to use the principle of least privilege and only allow SSH access from known, trusted IP addresses.",
|
||||
"Compliance": []
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.ecs.ecs_client import ecs_client
|
||||
|
||||
|
||||
class ecs_instance_ssh_access_restricted(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for security_group in ecs_client.security_groups.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=security_group
|
||||
)
|
||||
has_unrestricted_ssh = False
|
||||
for rule in security_group.rules:
|
||||
if (
|
||||
rule.get("direction") == "ingress"
|
||||
and rule.get("protocol") == "tcp"
|
||||
and "22" in rule.get("port_range", "")
|
||||
and rule.get("source") == "0.0.0.0/0"
|
||||
):
|
||||
has_unrestricted_ssh = True
|
||||
break
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"Security group {security_group.name} ({security_group.id}) allows unrestricted SSH access."
|
||||
if not has_unrestricted_ssh:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"Security group {security_group.name} ({security_group.id}) does not allow unrestricted SSH access."
|
||||
findings.append(report)
|
||||
return findings
|
||||
496
prowler/providers/alibabacloud/services/ecs/ecs_service.py
Normal file
496
prowler/providers/alibabacloud/services/ecs/ecs_service.py
Normal file
@@ -0,0 +1,496 @@
|
||||
"""
|
||||
Alibaba Cloud ECS Service
|
||||
|
||||
This module provides the service class for Alibaba Cloud Elastic Compute Service (ECS).
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from prowler.lib.logger import logger
|
||||
from prowler.providers.alibabacloud.lib.service.service import AlibabaCloudService
|
||||
|
||||
|
||||
@dataclass
|
||||
class Instance:
|
||||
"""
|
||||
Represents an Alibaba Cloud ECS instance
|
||||
|
||||
Attributes:
|
||||
id: Instance ID
|
||||
name: Instance name
|
||||
arn: Instance ARN
|
||||
region: Region where the instance is located
|
||||
status: Instance status (Running, Stopped, etc.)
|
||||
instance_type: Instance type/specification
|
||||
public_ip: Public IP address (if any)
|
||||
private_ip: Private IP address
|
||||
security_groups: List of security group IDs
|
||||
vpc_id: VPC ID
|
||||
zone_id: Availability zone ID
|
||||
image_id: OS image ID
|
||||
tags: Instance tags
|
||||
created_time: Creation timestamp
|
||||
expired_time: Expiration timestamp (for subscription instances)
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
status: str = ""
|
||||
instance_type: str = ""
|
||||
public_ip: Optional[str] = None
|
||||
private_ip: str = ""
|
||||
security_groups: list = None
|
||||
vpc_id: str = ""
|
||||
zone_id: str = ""
|
||||
image_id: str = ""
|
||||
tags: dict = None
|
||||
created_time: str = ""
|
||||
expired_time: str = ""
|
||||
|
||||
def __post_init__(self):
|
||||
if self.security_groups is None:
|
||||
self.security_groups = []
|
||||
if self.tags is None:
|
||||
self.tags = {}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Disk:
|
||||
"""
|
||||
Represents an Alibaba Cloud ECS disk
|
||||
|
||||
Attributes:
|
||||
id: Disk ID
|
||||
name: Disk name
|
||||
arn: Disk ARN
|
||||
region: Region where the disk is located
|
||||
disk_type: Type of disk (system, data)
|
||||
category: Disk category (cloud_ssd, cloud_essd, etc.)
|
||||
size: Disk size in GB
|
||||
encrypted: Whether the disk is encrypted
|
||||
kms_key_id: KMS key ID used for encryption (if encrypted)
|
||||
status: Disk status (Available, In_use, etc.)
|
||||
instance_id: Attached instance ID (if attached)
|
||||
zone_id: Availability zone ID
|
||||
tags: Disk tags
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
disk_type: str = ""
|
||||
category: str = ""
|
||||
size: int = 0
|
||||
encrypted: bool = False
|
||||
kms_key_id: Optional[str] = None
|
||||
status: str = ""
|
||||
instance_id: Optional[str] = None
|
||||
zone_id: str = ""
|
||||
tags: dict = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.tags is None:
|
||||
self.tags = {}
|
||||
|
||||
|
||||
@dataclass
|
||||
class SecurityGroup:
|
||||
"""
|
||||
Represents an Alibaba Cloud security group
|
||||
|
||||
Attributes:
|
||||
id: Security group ID
|
||||
name: Security group name
|
||||
arn: Security group ARN
|
||||
region: Region where the security group is located
|
||||
vpc_id: VPC ID
|
||||
description: Security group description
|
||||
rules: List of security group rules
|
||||
tags: Security group tags
|
||||
"""
|
||||
|
||||
id: str
|
||||
name: str
|
||||
arn: str
|
||||
region: str
|
||||
vpc_id: str = ""
|
||||
description: str = ""
|
||||
rules: list = None
|
||||
tags: dict = None
|
||||
|
||||
def __post_init__(self):
|
||||
if self.rules is None:
|
||||
self.rules = []
|
||||
if self.tags is None:
|
||||
self.tags = {}
|
||||
|
||||
|
||||
class ECS(AlibabaCloudService):
|
||||
"""
|
||||
Alibaba Cloud ECS service class
|
||||
|
||||
This class handles the collection of ECS resources including instances,
|
||||
disks, and security groups for auditing.
|
||||
"""
|
||||
|
||||
def __init__(self, provider):
|
||||
"""
|
||||
Initialize ECS service
|
||||
|
||||
Args:
|
||||
provider: AlibabaCloudProvider instance
|
||||
"""
|
||||
super().__init__("ecs", provider)
|
||||
|
||||
# Initialize resource dictionaries
|
||||
self.instances = {}
|
||||
self.disks = {}
|
||||
self.security_groups = {}
|
||||
|
||||
# Collect ECS resources
|
||||
logger.info("Collecting ECS instances...")
|
||||
self._describe_instances()
|
||||
|
||||
logger.info("Collecting ECS disks...")
|
||||
self._describe_disks()
|
||||
|
||||
logger.info("Collecting ECS security groups...")
|
||||
self._describe_security_groups()
|
||||
|
||||
logger.info(
|
||||
f"ECS service initialized - Instances: {len(self.instances)}, "
|
||||
f"Disks: {len(self.disks)}, Security Groups: {len(self.security_groups)}"
|
||||
)
|
||||
|
||||
def _describe_instances(self):
|
||||
"""
|
||||
Describe ECS instances across all regions
|
||||
|
||||
This method collects all ECS instances and their details.
|
||||
"""
|
||||
logger.info("Describing ECS instances across regions...")
|
||||
|
||||
for region in self.regions:
|
||||
try:
|
||||
from alibabacloud_ecs20140526 import models
|
||||
from alibabacloud_ecs20140526.client import Client as EcsClient
|
||||
from alibabacloud_tea_openapi import models as openapi_models
|
||||
|
||||
# Create client configuration
|
||||
config = openapi_models.Config(
|
||||
access_key_id=self.provider.session.credentials.access_key_id,
|
||||
access_key_secret=self.provider.session.credentials.access_key_secret,
|
||||
region_id=region,
|
||||
)
|
||||
|
||||
# Add security token if present (for STS)
|
||||
if self.provider.session.credentials.security_token:
|
||||
config.security_token = (
|
||||
self.provider.session.credentials.security_token
|
||||
)
|
||||
|
||||
# Create ECS client
|
||||
client = EcsClient(config)
|
||||
|
||||
# Describe instances
|
||||
request = models.DescribeInstancesRequest(
|
||||
page_size=100, region_id=region
|
||||
)
|
||||
response = client.describe_instances(request)
|
||||
|
||||
# Process instances
|
||||
if response.body.instances and response.body.instances.instance:
|
||||
for instance_data in response.body.instances.instance:
|
||||
instance_id = instance_data.instance_id
|
||||
arn = self.generate_resource_arn(
|
||||
"instance", instance_id, region
|
||||
)
|
||||
|
||||
# Get public IP
|
||||
public_ip = None
|
||||
if (
|
||||
instance_data.public_ip_address
|
||||
and instance_data.public_ip_address.ip_address
|
||||
):
|
||||
public_ip = (
|
||||
instance_data.public_ip_address.ip_address[0]
|
||||
if instance_data.public_ip_address.ip_address
|
||||
else None
|
||||
)
|
||||
elif (
|
||||
instance_data.eip_address
|
||||
and instance_data.eip_address.ip_address
|
||||
):
|
||||
public_ip = instance_data.eip_address.ip_address
|
||||
|
||||
# Get private IP
|
||||
private_ip = ""
|
||||
if (
|
||||
instance_data.vpc_attributes
|
||||
and instance_data.vpc_attributes.private_ip_address
|
||||
):
|
||||
private_ip = (
|
||||
instance_data.vpc_attributes.private_ip_address.ip_address[
|
||||
0
|
||||
]
|
||||
if instance_data.vpc_attributes.private_ip_address.ip_address
|
||||
else ""
|
||||
)
|
||||
|
||||
# Get security groups
|
||||
security_groups = []
|
||||
if (
|
||||
instance_data.security_group_ids
|
||||
and instance_data.security_group_ids.security_group_id
|
||||
):
|
||||
security_groups = (
|
||||
instance_data.security_group_ids.security_group_id
|
||||
)
|
||||
|
||||
# Get VPC ID
|
||||
vpc_id = (
|
||||
instance_data.vpc_attributes.vpc_id
|
||||
if instance_data.vpc_attributes
|
||||
else ""
|
||||
)
|
||||
|
||||
# Get tags
|
||||
tags = {}
|
||||
if instance_data.tags and instance_data.tags.tag:
|
||||
for tag in instance_data.tags.tag:
|
||||
tags[tag.tag_key] = tag.tag_value
|
||||
|
||||
instance = Instance(
|
||||
id=instance_id,
|
||||
name=instance_data.instance_name or instance_id,
|
||||
arn=arn,
|
||||
region=region,
|
||||
status=instance_data.status,
|
||||
instance_type=instance_data.instance_type,
|
||||
public_ip=public_ip,
|
||||
private_ip=private_ip,
|
||||
security_groups=security_groups,
|
||||
vpc_id=vpc_id,
|
||||
zone_id=instance_data.zone_id,
|
||||
image_id=instance_data.image_id,
|
||||
tags=tags,
|
||||
created_time=instance_data.creation_time,
|
||||
expired_time=(
|
||||
instance_data.expired_time
|
||||
if hasattr(instance_data, "expired_time")
|
||||
else ""
|
||||
),
|
||||
)
|
||||
|
||||
self.instances[arn] = instance
|
||||
logger.info(f"Found ECS instance: {instance_id} in {region}")
|
||||
else:
|
||||
logger.info(f"No ECS instances found in {region}")
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(error, "DescribeInstances", region)
|
||||
|
||||
def _describe_disks(self):
|
||||
"""
|
||||
Describe ECS disks across all regions
|
||||
|
||||
This method collects all ECS disks and their encryption status.
|
||||
"""
|
||||
logger.info("Describing ECS disks across regions...")
|
||||
|
||||
for region in self.regions:
|
||||
try:
|
||||
from alibabacloud_ecs20140526 import models
|
||||
from alibabacloud_ecs20140526.client import Client as EcsClient
|
||||
from alibabacloud_tea_openapi import models as openapi_models
|
||||
|
||||
# Create client configuration
|
||||
config = openapi_models.Config(
|
||||
access_key_id=self.provider.session.credentials.access_key_id,
|
||||
access_key_secret=self.provider.session.credentials.access_key_secret,
|
||||
region_id=region,
|
||||
)
|
||||
|
||||
if self.provider.session.credentials.security_token:
|
||||
config.security_token = (
|
||||
self.provider.session.credentials.security_token
|
||||
)
|
||||
|
||||
# Create ECS client
|
||||
client = EcsClient(config)
|
||||
|
||||
# Describe disks
|
||||
request = models.DescribeDisksRequest(page_size=100, region_id=region)
|
||||
response = client.describe_disks(request)
|
||||
|
||||
# Process disks
|
||||
if response.body.disks and response.body.disks.disk:
|
||||
for disk_data in response.body.disks.disk:
|
||||
disk_id = disk_data.disk_id
|
||||
arn = self.generate_resource_arn("disk", disk_id, region)
|
||||
|
||||
# Get tags
|
||||
tags = {}
|
||||
if disk_data.tags and disk_data.tags.tag:
|
||||
for tag in disk_data.tags.tag:
|
||||
tags[tag.tag_key] = tag.tag_value
|
||||
|
||||
disk = Disk(
|
||||
id=disk_id,
|
||||
name=disk_data.disk_name or disk_id,
|
||||
arn=arn,
|
||||
region=region,
|
||||
disk_type=disk_data.type if disk_data.type else "",
|
||||
category=disk_data.category if disk_data.category else "",
|
||||
size=disk_data.size if disk_data.size else 0,
|
||||
encrypted=(
|
||||
disk_data.encrypted
|
||||
if hasattr(disk_data, "encrypted")
|
||||
else False
|
||||
),
|
||||
kms_key_id=(
|
||||
disk_data.kms_key_id
|
||||
if hasattr(disk_data, "kms_key_id")
|
||||
else None
|
||||
),
|
||||
status=disk_data.status if disk_data.status else "",
|
||||
instance_id=(
|
||||
disk_data.instance_id if disk_data.instance_id else None
|
||||
),
|
||||
zone_id=disk_data.zone_id if disk_data.zone_id else "",
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
self.disks[arn] = disk
|
||||
logger.info(f"Found ECS disk: {disk_id} in {region}")
|
||||
else:
|
||||
logger.info(f"No ECS disks found in {region}")
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(error, "DescribeDisks", region)
|
||||
|
||||
def _describe_security_groups(self):
|
||||
"""
|
||||
Describe security groups and their rules
|
||||
|
||||
This method collects security groups and analyzes their rules.
|
||||
"""
|
||||
logger.info("Describing security groups across regions...")
|
||||
|
||||
for region in self.regions:
|
||||
try:
|
||||
from alibabacloud_ecs20140526 import models
|
||||
from alibabacloud_ecs20140526.client import Client as EcsClient
|
||||
from alibabacloud_tea_openapi import models as openapi_models
|
||||
|
||||
# Create client configuration
|
||||
config = openapi_models.Config(
|
||||
access_key_id=self.provider.session.credentials.access_key_id,
|
||||
access_key_secret=self.provider.session.credentials.access_key_secret,
|
||||
region_id=region,
|
||||
)
|
||||
|
||||
if self.provider.session.credentials.security_token:
|
||||
config.security_token = (
|
||||
self.provider.session.credentials.security_token
|
||||
)
|
||||
|
||||
# Create ECS client
|
||||
client = EcsClient(config)
|
||||
|
||||
# Describe security groups
|
||||
request = models.DescribeSecurityGroupsRequest(
|
||||
page_size=100, region_id=region
|
||||
)
|
||||
response = client.describe_security_groups(request)
|
||||
|
||||
# Process security groups
|
||||
if (
|
||||
response.body.security_groups
|
||||
and response.body.security_groups.security_group
|
||||
):
|
||||
for sg_data in response.body.security_groups.security_group:
|
||||
sg_id = sg_data.security_group_id
|
||||
arn = self.generate_resource_arn(
|
||||
"security-group", sg_id, region
|
||||
)
|
||||
|
||||
# Get tags
|
||||
tags = {}
|
||||
if sg_data.tags and sg_data.tags.tag:
|
||||
for tag in sg_data.tags.tag:
|
||||
tags[tag.tag_key] = tag.tag_value
|
||||
|
||||
# Get security group rules
|
||||
rules = []
|
||||
try:
|
||||
rules_request = (
|
||||
models.DescribeSecurityGroupAttributeRequest(
|
||||
security_group_id=sg_id, region_id=region
|
||||
)
|
||||
)
|
||||
rules_response = client.describe_security_group_attribute(
|
||||
rules_request
|
||||
)
|
||||
|
||||
# Process ingress rules
|
||||
if (
|
||||
rules_response.body.permissions
|
||||
and rules_response.body.permissions.permission
|
||||
):
|
||||
for perm in rules_response.body.permissions.permission:
|
||||
rule = {
|
||||
"direction": (
|
||||
perm.direction
|
||||
if perm.direction
|
||||
else "ingress"
|
||||
),
|
||||
"protocol": (
|
||||
perm.ip_protocol if perm.ip_protocol else ""
|
||||
),
|
||||
"port_range": (
|
||||
perm.port_range if perm.port_range else ""
|
||||
),
|
||||
"source": (
|
||||
perm.source_cidr_ip
|
||||
if hasattr(perm, "source_cidr_ip")
|
||||
and perm.source_cidr_ip
|
||||
else ""
|
||||
),
|
||||
"source_group_id": (
|
||||
perm.source_group_id
|
||||
if hasattr(perm, "source_group_id")
|
||||
and perm.source_group_id
|
||||
else ""
|
||||
),
|
||||
}
|
||||
rules.append(rule)
|
||||
except Exception as rules_error:
|
||||
logger.warning(
|
||||
f"Could not retrieve rules for security group {sg_id}: {rules_error}"
|
||||
)
|
||||
|
||||
security_group = SecurityGroup(
|
||||
id=sg_id,
|
||||
name=sg_data.security_group_name or sg_id,
|
||||
arn=arn,
|
||||
region=region,
|
||||
vpc_id=sg_data.vpc_id if sg_data.vpc_id else "",
|
||||
description=(
|
||||
sg_data.description if sg_data.description else ""
|
||||
),
|
||||
rules=rules,
|
||||
tags=tags,
|
||||
)
|
||||
|
||||
self.security_groups[arn] = security_group
|
||||
logger.info(f"Found security group: {sg_id} in {region}")
|
||||
else:
|
||||
logger.info(f"No security groups found in {region}")
|
||||
|
||||
except Exception as error:
|
||||
self._handle_api_error(error, "DescribeSecurityGroups", region)
|
||||
1
prowler/providers/alibabacloud/services/oss/__init__.py
Normal file
1
prowler/providers/alibabacloud/services/oss/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
"""Alibaba Cloud OSS (Object Storage Service) module"""
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket CORS Not Overly Permissive Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_cors_not_overly_permissive",
|
||||
"CheckTitle": "Ensure OSS bucket CORS rules are not overly permissive",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS bucket CORS (Cross-Origin Resource Sharing) rules are not overly permissive. Overly permissive CORS rules can allow unauthorized websites to access bucket resources.",
|
||||
"Risk": "Overly permissive CORS rules allowing all origins (*) can enable malicious websites to access bucket resources, potentially leading to data leakage or unauthorized operations.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-cors.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Update CORS rules to allow only specific trusted origins instead of '*'. Go to OSS Console > Buckets, select the bucket, and configure CORS rules with specific allowed origins.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/cors"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_cors_not_overly_permissive(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
if not bucket.cors_rules or len(bucket.cors_rules) == 0:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have CORS rules configured."
|
||||
)
|
||||
else:
|
||||
overly_permissive = False
|
||||
for rule in bucket.cors_rules:
|
||||
allowed_origins = rule.get("AllowedOrigin", [])
|
||||
if "*" in allowed_origins:
|
||||
overly_permissive = True
|
||||
break
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"OSS bucket {bucket.name} has overly permissive CORS rules allowing all origins."
|
||||
if not overly_permissive:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"OSS bucket {bucket.name} has appropriately restrictive CORS rules."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Default Encryption KMS Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_default_encryption_kms",
|
||||
"CheckTitle": "Ensure OSS buckets use KMS for default encryption",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "low",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets use KMS (Key Management Service) for encryption instead of AES256. KMS provides better key management, rotation, and audit capabilities compared to AES256.",
|
||||
"Risk": "Using AES256 encryption instead of KMS means missing out on centralized key management, automatic key rotation, and comprehensive audit logging capabilities.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-encryption-kms.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Update OSS bucket encryption to use KMS instead of AES256. Go to OSS Console > Buckets, select the bucket, and configure server-side encryption with KMS.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/server-side-encryption"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"encryption"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "KMS encryption may have additional costs compared to AES256."
|
||||
}
|
||||
@@ -0,0 +1,26 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_default_encryption_kms(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
if bucket.encryption_enabled and bucket.encryption_algorithm:
|
||||
if "KMS" in bucket.encryption_algorithm.upper():
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} uses KMS for encryption."
|
||||
)
|
||||
else:
|
||||
report.status_extended = f"OSS bucket {bucket.name} uses {bucket.encryption_algorithm} for encryption."
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have encryption enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Encryption Enabled Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_encryption_enabled",
|
||||
"CheckTitle": "Ensure OSS buckets have encryption enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "high",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets have server-side encryption enabled to protect data at rest. Encryption protects sensitive data from unauthorized access.",
|
||||
"Risk": "Unencrypted data at rest is vulnerable to unauthorized access if storage media is compromised. Encryption ensures data confidentiality even if physical access is obtained.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-encryption.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable server-side encryption for OSS buckets. Go to OSS Console > Buckets, select the bucket, and enable server-side encryption with AES256 or KMS.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/server-side-encryption"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"encryption"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_encryption_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have encryption enabled."
|
||||
)
|
||||
if bucket.encryption_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"OSS bucket {bucket.name} has encryption enabled with {bucket.encryption_algorithm}."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Lifecycle Rules Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_lifecycle_rules",
|
||||
"CheckTitle": "Ensure OSS buckets have lifecycle rules configured",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "low",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets have lifecycle rules configured to manage object lifecycle and optimize costs. Lifecycle rules automatically transition or delete objects based on age.",
|
||||
"Risk": "Without lifecycle rules, old or unused objects remain in the bucket indefinitely, leading to unnecessary storage costs and difficulty managing data retention policies.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-lifecycle.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Configure lifecycle rules for OSS buckets. Go to OSS Console > Buckets, select the bucket, and create lifecycle rules to automatically transition or delete objects.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/lifecycle-rules"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"trustworthy-ai"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_lifecycle_rules(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have lifecycle rules configured."
|
||||
)
|
||||
if bucket.lifecycle_rules and len(bucket.lifecycle_rules) > 0:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"OSS bucket {bucket.name} has {len(bucket.lifecycle_rules)} lifecycle rule(s) configured."
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Logging Enabled Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_logging_enabled",
|
||||
"CheckTitle": "Ensure OSS buckets have access logging enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets have access logging enabled for audit and compliance purposes. Access logs provide detailed records of requests made to the bucket.",
|
||||
"Risk": "Without access logging, it is difficult to track access patterns, detect unauthorized access attempts, or investigate security incidents. This can lead to compliance violations and delayed incident response.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-logging.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable access logging for OSS buckets. Go to OSS Console > Buckets, select the bucket, and configure logging with a target bucket for log storage.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/logging"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"logging"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_logging_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have access logging enabled."
|
||||
)
|
||||
if bucket.logging_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} has access logging enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Public Access Blocked Check"""
|
||||
@@ -0,0 +1,33 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_public_access_blocked",
|
||||
"CheckTitle": "Ensure OSS buckets block public access",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "critical",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets block public access to prevent unauthorized data exposure. Public buckets can be accessed by anyone on the internet, potentially exposing sensitive data.",
|
||||
"Risk": "Public OSS buckets expose data to anyone on the internet, potentially leading to data breaches, unauthorized access to sensitive information, compliance violations, and reputational damage.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-public-access.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Configure OSS bucket ACL to block public access. Go to OSS Console > Buckets, select the bucket, and set ACL to 'Private'. Use bucket policies and RAM policies to grant access instead of public ACLs.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/bucket-acl"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"encryption",
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,20 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_public_access_blocked(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = f"OSS bucket {bucket.name} allows public access."
|
||||
if not bucket.public_access:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} blocks public access."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Referer Whitelist Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_referer_whitelist",
|
||||
"CheckTitle": "Ensure OSS buckets have referer whitelist configured",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "low",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets have referer whitelist configured and do not allow empty referers. Referer whitelisting prevents hotlinking and unauthorized access from unknown sources.",
|
||||
"Risk": "Without referer whitelist, buckets can be accessed from any source, potentially leading to bandwidth theft through hotlinking and unauthorized access to resources.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-referer.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Configure referer whitelist for OSS buckets and disable empty referers. Go to OSS Console > Buckets, select the bucket, and configure referer whitelist with trusted domains.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/hotlink-protection"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"internet-exposed"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,34 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_referer_whitelist(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
if bucket.referer_config:
|
||||
allow_empty = bucket.referer_config.get("AllowEmpty", True)
|
||||
referer_list = bucket.referer_config.get("RefererList", [])
|
||||
if allow_empty and (not referer_list or len(referer_list) == 0):
|
||||
report.status_extended = f"OSS bucket {bucket.name} allows empty referers and has no referer whitelist."
|
||||
elif allow_empty:
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} allows empty referers."
|
||||
)
|
||||
elif not referer_list or len(referer_list) == 0:
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} has no referer whitelist configured."
|
||||
)
|
||||
else:
|
||||
report.status = "PASS"
|
||||
report.status_extended = f"OSS bucket {bucket.name} has proper referer whitelist configuration."
|
||||
else:
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} has no referer configuration."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Transfer Acceleration Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_transfer_acceleration",
|
||||
"CheckTitle": "Check if OSS buckets have transfer acceleration enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "informational",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Checks if OSS buckets have transfer acceleration enabled for improved global data transfer performance. Transfer acceleration uses optimized network paths for faster uploads/downloads from distant locations.",
|
||||
"Risk": "Without transfer acceleration, data transfers from distant geographic locations may be slower, impacting user experience and operational efficiency.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-transfer-acceleration.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable transfer acceleration for OSS buckets that serve global users. Go to OSS Console > Buckets, select the bucket, and enable transfer acceleration.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/transfer-acceleration"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"trustworthy-ai"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": "This is an informational check. Transfer acceleration has additional costs."
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_transfer_acceleration(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have transfer acceleration enabled."
|
||||
)
|
||||
if bucket.transfer_acceleration:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} has transfer acceleration enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1 @@
|
||||
"""OSS Bucket Versioning Enabled Check"""
|
||||
@@ -0,0 +1,32 @@
|
||||
{
|
||||
"Provider": "alibabacloud",
|
||||
"CheckID": "oss_bucket_versioning_enabled",
|
||||
"CheckTitle": "Ensure OSS buckets have versioning enabled",
|
||||
"CheckType": [],
|
||||
"ServiceName": "oss",
|
||||
"SubServiceName": "",
|
||||
"ResourceIdTemplate": "arn",
|
||||
"Severity": "medium",
|
||||
"ResourceType": "Bucket",
|
||||
"Description": "Ensures that OSS buckets have versioning enabled to protect against accidental deletions and overwrites. Versioning allows recovery of objects from accidental deletion or overwrite.",
|
||||
"Risk": "Without versioning, accidental deletion or overwrite of objects cannot be recovered, potentially leading to permanent data loss.",
|
||||
"RelatedUrl": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/alibaba-cloud/OSS/bucket-versioning.html",
|
||||
"Remediation": {
|
||||
"Code": {
|
||||
"CLI": "",
|
||||
"NativeIaC": "",
|
||||
"Other": "",
|
||||
"Terraform": ""
|
||||
},
|
||||
"Recommendation": {
|
||||
"Text": "Enable versioning for OSS buckets. Go to OSS Console > Buckets, select the bucket, and enable versioning.",
|
||||
"Url": "https://www.alibabacloud.com/help/en/oss/user-guide/versioning"
|
||||
}
|
||||
},
|
||||
"Categories": [
|
||||
"forensics-ready"
|
||||
],
|
||||
"DependsOn": [],
|
||||
"RelatedTo": [],
|
||||
"Notes": ""
|
||||
}
|
||||
@@ -0,0 +1,22 @@
|
||||
from prowler.lib.check.models import Check, Check_Report_AlibabaCloud
|
||||
from prowler.providers.alibabacloud.services.oss.oss_client import oss_client
|
||||
|
||||
|
||||
class oss_bucket_versioning_enabled(Check):
|
||||
def execute(self):
|
||||
findings = []
|
||||
for bucket in oss_client.buckets.values():
|
||||
report = Check_Report_AlibabaCloud(
|
||||
metadata=self.metadata(), resource=bucket
|
||||
)
|
||||
report.status = "FAIL"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} does not have versioning enabled."
|
||||
)
|
||||
if bucket.versioning_enabled:
|
||||
report.status = "PASS"
|
||||
report.status_extended = (
|
||||
f"OSS bucket {bucket.name} has versioning enabled."
|
||||
)
|
||||
findings.append(report)
|
||||
return findings
|
||||
@@ -0,0 +1,6 @@
|
||||
"""Alibaba Cloud OSS Client Singleton"""
|
||||
|
||||
from prowler.providers.alibabacloud.services.oss.oss_service import OSS
|
||||
from prowler.providers.common.provider import Provider
|
||||
|
||||
oss_client = OSS(Provider.get_global_provider())
|
||||
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user