Compare commits

...

9 Commits

Author SHA1 Message Date
Hugo P.Brito 0f102a6b8d feat(sdk): apply resource limits to more AWS checks 2026-06-10 14:00:32 +02:00
Hugo P.Brito 2499e8b3ab fix(sdk): disable resource analysis limits by default
- Keep AWS resource analysis limits opt-in

- Document unlimited default behavior

- Update limit resolution tests
2026-06-10 13:09:58 +02:00
Hugo P.Brito 2376115ff1 feat(sdk): limit AWS resource analysis
- Scope resource scan limits to selected high-volume AWS paths

- Remove fail-driven finding prioritization semantics

- Add tests for bounded expensive AWS calls
2026-06-10 12:51:24 +02:00
Hugo P.Brito d4e19dca37 docs(sdk): document per-service resource scan limit config 2026-05-27 11:47:23 +02:00
Hugo P.Brito 67eb40494c feat(sdk): apply resource scan limit to CloudWatch log groups
- Lazy iter_log_groups generator deferring heavy log-event fetch
- Migrate the 4 cloudwatch_log_group_* checks to the shared limiter
- Keep log group tags eager (consumed by metric-filter checks)
2026-05-27 11:45:54 +02:00
Hugo P.Brito ec2c78180b feat(sdk): apply resource scan limit to Lambda functions
- Lazy iter_functions generator with on-demand per-function hydration
  of policy, URL config, tags and event source mappings
- Migrate the 12 awslambda_function_* checks to the shared limiter
2026-05-27 11:45:54 +02:00
Hugo P.Brito e8e6f2b5b4 feat(sdk): apply resource scan limit to EBS snapshots and Backup
- Lazy iter_recovery_points generator for Backup recovery points
- Lazy iter_snapshots with deferred public-status hydration for EBS
- Migrate EBS snapshot and Backup recovery point checks
2026-05-27 11:45:54 +02:00
Hugo P.Brito 1aa652d780 docs(sdk): changelog for per-service resource scan limit 2026-05-27 11:45:53 +02:00
Hugo P.Brito 2a24008d46 feat(sdk): fail-driven per-service AWS resource scan limit
- Add limited_findings helper and configurable per-service limit
- Lazy memoized ECS task definition and CodeArtifact package fetch
- Prioritize FAIL findings within the configurable limit
2026-05-27 11:44:08 +02:00
53 changed files with 1666 additions and 641 deletions
@@ -77,6 +77,25 @@ The following list includes all the AWS checks with configurable variables that
| `opensearch_service_domains_not_publicly_accessible` | `trusted_ips` | List of Strings |
### Resource Scan Limit
Some AWS services accumulate large numbers of resources (EBS snapshots, backup recovery points, CloudWatch log groups, Lambda functions, ECS task definitions, and CodeArtifact packages). Scanning every resource increases scan time, cost, API throttling, and finding volume. By default, Prowler scans every resource. Configure a positive resource scan limit to cap how many resources Prowler analyzes for these high-volume AWS resource paths, using the latest resources where AWS API ordering or resource timestamps support it. Otherwise, Prowler uses best-effort API order.
The global default applies to the supported resources below and is overridable per resource. The default value is `0`, which disables the limit and scans every resource. `0`, negative values, and `null` are unlimited; positive values enable limits. The limit applies to resources selected for analysis, not to findings; a selected resource may produce zero, one, or many findings.
Exact list API call reduction depends on each AWS API's ordering and pagination capabilities. When Prowler must enumerate candidates locally to select the latest resources, list calls may still read candidates, but expensive per-resource enrichment calls are bounded to the selected resources for the supported paths below.
| Value | Scope | Type |
|------------------------------------|---------------------------------------------------------|---------|
| `max_scanned_resources_per_service`| Global default for supported high-volume AWS resources (default `0`, disabled/unlimited) | Integer |
| `max_ebs_snapshots` | EBS snapshots (`ec2_ebs_*` checks) | Integer |
| `max_backup_recovery_points` | Backup recovery points (`backup_recovery_point_*`) | Integer |
| `max_cloudwatch_log_groups` | CloudWatch log groups (`cloudwatch_log_group_*`) | Integer |
| `max_lambda_functions` | Lambda functions (`awslambda_function_*`) | Integer |
| `max_ecs_task_definitions` | ECS task definitions (`ecs_task_definitions_*`) | Integer |
| `max_codeartifact_packages` | CodeArtifact packages (`codeartifact_packages_*`) | Integer |
## Azure
### Configurable Checks
@@ -181,6 +200,19 @@ aws:
# AWS Global Configuration
# aws.mute_non_default_regions --> Set to True to muted failed findings in non-default regions for AccessAnalyzer, GuardDuty, SecurityHub, DRS and Config
mute_non_default_regions: False
# AWS Resource Scan Limit Configuration
# Disabled by default: scan every resource unless a positive limit is configured.
# Findings are not capped. Set to 0 (or a negative value) to disable the limit.
# aws.max_scanned_resources_per_service --> global default for all services below
max_scanned_resources_per_service: 0
# Per-service overrides. Leave as null to fall back to the global default.
max_ebs_snapshots: null
max_backup_recovery_points: null
max_cloudwatch_log_groups: null
max_lambda_functions: null
max_ecs_task_definitions: null
max_codeartifact_packages: null
# If you want to mute failed findings only in specific regions, create a file with the following syntax and run it with `prowler aws -w mutelist.yaml`:
# Mutelist:
# Accounts:
+4
View File
@@ -10,6 +10,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
- AWS AI Security Framework compliance for AWS provider [(#11353)](https://github.com/prowler-cloud/prowler/pull/11353)
- `storage_account_public_network_access_disabled` check for Azure provider and remapped the Azure CIS "Public Network Access is Disabled" requirements to it [(#11334)](https://github.com/prowler-cloud/prowler/pull/11334)
### 🔄 Changed
- AWS scans for EBS snapshots, Backup recovery points, CloudWatch log groups, Lambda functions, ECS task definitions, and CodeArtifact packages now support configurable resource analysis limits via `aws.max_scanned_resources_per_service`; limits are disabled by default and only positive values cap analyzed resources [(#11228)](https://github.com/prowler-cloud/prowler/pull/11228)
### 🐞 Fixed
- Compliance CSV row count now matches the UI per requirement by sourcing rows from the framework JSON's `requirement.Checks` instead of the stale `finding.compliance` snapshot [(#11370)](https://github.com/prowler-cloud/prowler/pull/11370)
+26
View File
@@ -3,6 +3,32 @@ aws:
# AWS Global Configuration
# aws.mute_non_default_regions --> Set to True to muted failed findings in non-default regions for AccessAnalyzer, GuardDuty, SecurityHub, DRS and Config
mute_non_default_regions: False
# AWS Resource Scan Limit Configuration
# Limits the number of resources scanned per service for services that can
# accumulate huge numbers of resources (EBS snapshots, backup recovery
# points, CloudWatch log groups, Lambda functions, ECS task definitions,
# CodeArtifact packages). Limits apply to resources analyzed, not findings:
# a selected resource can produce zero, one, or many findings. Where the AWS
# API supports server-side ordering the latest resources are scanned first;
# otherwise it is best-effort API order.
# Disabled by default: scan every resource unless a positive limit is configured.
# Set to 0 (or a negative value) to disable the limit (scan every resource).
# aws.max_scanned_resources_per_service --> global default for all services below
max_scanned_resources_per_service: 0
# Per-service overrides. Leave as null to fall back to the global default.
# aws.max_ebs_snapshots --> ec2_ebs_* checks (EBS snapshots)
max_ebs_snapshots: null
# aws.max_backup_recovery_points --> backup_recovery_point_* checks
max_backup_recovery_points: null
# aws.max_cloudwatch_log_groups --> cloudwatch_log_group_* checks
max_cloudwatch_log_groups: null
# aws.max_lambda_functions --> awslambda_function_* checks
max_lambda_functions: null
# aws.max_ecs_task_definitions --> ecs_task_definitions_* checks
max_ecs_task_definitions: null
# aws.max_codeartifact_packages --> codeartifact_packages_* checks
max_codeartifact_packages: null
# aws.disallowed_regions --> List of AWS regions to exclude from the scan.
# Also settable via the PROWLER_AWS_DISALLOWED_REGIONS environment variable or
# the --excluded-region CLI flag. Precedence: CLI > env var > config file.
+50
View File
@@ -0,0 +1,50 @@
"""Scoped resource scan limits for high-volume AWS resources.
Some AWS services accumulate huge numbers of resources (EBS snapshots, backup
recovery points, log groups, Lambda functions, ECS task definitions,
CodeArtifact packages). Scanning all of them causes API throttling, slow
scans, cost and noisy findings.
``get_resource_scan_limit`` resolves the configured number of resources to
analyze for a supported resource path. A limited resource can produce zero,
one, or many findings; findings are not capped or re-ordered here.
"""
from collections.abc import Iterable, Iterator
from itertools import islice
from typing import Optional, TypeVar
GLOBAL_LIMIT_KEY = "max_scanned_resources_per_service"
T = TypeVar("T")
def get_resource_scan_limit(audit_config: dict, service_key: str) -> Optional[int]:
"""Resolve the resource scan limit for a service.
Precedence: per-service key (``service_key``) > global
``max_scanned_resources_per_service`` > unlimited.
A non-positive resolved value means **unlimited** (``None``), preserving
the legacy behavior as an explicit opt-out.
Args:
audit_config: The provider ``audit_config`` dictionary.
service_key: The per-service config key, e.g. ``max_lambda_functions``.
Returns:
The limit as a positive ``int``, or ``None`` for unlimited.
"""
value = audit_config.get(service_key)
if value is None:
value = audit_config.get(GLOBAL_LIMIT_KEY)
if value is None or value <= 0:
return None
return int(value)
def limit_resources(resources: Iterable[T], limit: Optional[int]) -> Iterator[T]:
"""Yield up to ``limit`` resources without changing resource order."""
if not limit or limit <= 0:
yield from resources
return
yield from islice(resources, limit)
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_env_vars_not_encrypted_with_cmk(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if not function.environment:
report.status = "PASS"
@@ -24,5 +24,5 @@ class awslambda_function_env_vars_not_encrypted_with_cmk(Check):
f"Lambda function {function.name} has environment variables "
f"but they are not encrypted with a customer-managed KMS key."
)
findings.append(report)
return findings
reports.append(report)
return reports
@@ -6,8 +6,8 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_inside_vpc(Check):
def execute(self) -> List[Check_Report_AWS]:
findings = []
for function_arn, function in awslambda_client.functions.items():
reports = []
for function in awslambda_client.iter_functions():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
report.status = "PASS"
@@ -18,13 +18,12 @@ class awslambda_function_inside_vpc(Check):
if not function.vpc_id:
awslambda_client.set_failed_check(
self.__class__.__name__,
function_arn,
function.arn,
)
report.status = "FAIL"
report.status_extended = (
f"Lambda function {function.name} is not inside a VPC"
)
findings.append(report)
return findings
reports.append(report)
return reports
@@ -7,8 +7,8 @@ from prowler.providers.aws.services.cloudtrail.cloudtrail_client import (
class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
report.status = "FAIL"
@@ -49,6 +49,5 @@ class awslambda_function_invoke_api_operations_cloudtrail_logging_enabled(Check)
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is recorded by CloudTrail trail {trail.name}."
break
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_no_dead_letter_queue(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if function.dead_letter_config:
report.status = "PASS"
@@ -13,5 +13,5 @@ class awslambda_function_no_dead_letter_queue(Check):
else:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} does not have a Dead Letter Queue configured."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -8,58 +8,59 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_no_secrets_in_code(Check):
def execute(self):
findings = []
if awslambda_client.functions:
secrets_ignore_patterns = awslambda_client.audit_config.get(
"secrets_ignore_patterns", []
if not awslambda_client.functions:
return []
secrets_ignore_patterns = awslambda_client.audit_config.get(
"secrets_ignore_patterns", []
)
reports = []
for function, function_code in awslambda_client._get_function_code():
if not function_code:
continue
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
report.status = "PASS"
report.status_extended = (
f"No secrets found in Lambda function {function.name} code."
)
for function, function_code in awslambda_client._get_function_code():
if function_code:
report = Check_Report_AWS(
metadata=self.metadata(), resource=function
with tempfile.TemporaryDirectory() as tmp_dir_name:
function_code.code_zip.extractall(tmp_dir_name)
# List all files
files_in_zip = next(os.walk(tmp_dir_name))[2]
secrets_findings = []
for file in files_in_zip:
detect_secrets_output = detect_secrets_scan(
file=f"{tmp_dir_name}/{file}",
excluded_secrets=secrets_ignore_patterns,
detect_secrets_plugins=awslambda_client.audit_config.get(
"detect_secrets_plugins",
),
)
report.status = "PASS"
report.status_extended = (
f"No secrets found in Lambda function {function.name} code."
)
with tempfile.TemporaryDirectory() as tmp_dir_name:
function_code.code_zip.extractall(tmp_dir_name)
# List all files
files_in_zip = next(os.walk(tmp_dir_name))[2]
secrets_findings = []
for file in files_in_zip:
detect_secrets_output = detect_secrets_scan(
file=f"{tmp_dir_name}/{file}",
excluded_secrets=secrets_ignore_patterns,
detect_secrets_plugins=awslambda_client.audit_config.get(
"detect_secrets_plugins",
),
if detect_secrets_output:
for (
secret
) in (
detect_secrets_output
): # Appears that only 1 file is being scanned at a time, so could rework this
output_file_name = secret["filename"].replace(
f"{tmp_dir_name}/", ""
)
secrets_string = ", ".join(
[
f"{secret['type']} on line {secret['line_number']}"
for secret in detect_secrets_output
]
)
secrets_findings.append(
f"{output_file_name}: {secrets_string}"
)
if detect_secrets_output:
for (
secret
) in (
detect_secrets_output
): # Appears that only 1 file is being scanned at a time, so could rework this
output_file_name = secret["filename"].replace(
f"{tmp_dir_name}/", ""
)
secrets_string = ", ".join(
[
f"{secret['type']} on line {secret['line_number']}"
for secret in detect_secrets_output
]
)
secrets_findings.append(
f"{output_file_name}: {secrets_string}"
)
if secrets_findings:
final_output_string = "; ".join(secrets_findings)
report.status = "FAIL"
report.status_extended = f"Potential {'secrets' if len(secrets_findings) > 1 else 'secret'} found in Lambda function {function.name} code -> {final_output_string}."
if secrets_findings:
final_output_string = "; ".join(secrets_findings)
report.status = "FAIL"
report.status_extended = f"Potential {'secrets' if len(secrets_findings) > 1 else 'secret'} found in Lambda function {function.name} code -> {final_output_string}."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -7,11 +7,12 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_no_secrets_in_variables(Check):
def execute(self):
findings = []
secrets_ignore_patterns = awslambda_client.audit_config.get(
"secrets_ignore_patterns", []
)
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
report.status = "PASS"
@@ -40,6 +41,5 @@ class awslambda_function_no_secrets_in_variables(Check):
report.status = "FAIL"
report.status_extended = f"Potential secret found in Lambda function {function.name} variables -> {secrets_string}."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -5,8 +5,8 @@ from prowler.providers.aws.services.iam.lib.policy import is_policy_public
class awslambda_function_not_publicly_accessible(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
if function.policy is None:
continue
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
@@ -21,6 +21,5 @@ class awslambda_function_not_publicly_accessible(Check):
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} has a resource-based policy with public access."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,18 +4,18 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_url_cors_policy(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
if not function.url_config:
continue
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if function.url_config:
if "*" in function.url_config.cors_config.allow_origins:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} URL has a wide CORS configuration."
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} does not have a wide CORS configuration."
if "*" in function.url_config.cors_config.allow_origins:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} URL has a wide CORS configuration."
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} does not have a wide CORS configuration."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -5,18 +5,18 @@ from prowler.providers.aws.services.awslambda.awslambda_service import AuthType
class awslambda_function_url_public(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
if not function.url_config:
continue
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if function.url_config:
if function.url_config.auth_type == AuthType.AWS_IAM:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} does not have a publicly accessible function URL."
else:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} has a publicly accessible function URL."
if function.url_config.auth_type == AuthType.AWS_IAM:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} does not have a publicly accessible function URL."
else:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} has a publicly accessible function URL."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_
class awslambda_function_using_cross_account_layers(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
reports = []
for function in awslambda_client.iter_functions():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
cross_account_layers = [
layer
@@ -30,5 +30,5 @@ class awslambda_function_using_cross_account_layers(Check):
f"Lambda function {function.name} only uses layers "
f"from the same account ({awslambda_client.audited_account})."
)
findings.append(report)
return findings
reports.append(report)
return reports
@@ -32,20 +32,20 @@ default_obsolete_lambda_runtimes = [
class awslambda_function_using_supported_runtimes(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
if function.runtime:
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
reports = []
for function in awslambda_client.iter_functions():
if not function.runtime:
continue
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if function.runtime in awslambda_client.audit_config.get(
"obsolete_lambda_runtimes", default_obsolete_lambda_runtimes
):
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is obsolete."
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is supported."
if function.runtime in awslambda_client.audit_config.get(
"obsolete_lambda_runtimes", default_obsolete_lambda_runtimes
):
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is obsolete."
else:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is using {function.runtime} which is supported."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -8,38 +8,40 @@ from prowler.providers.aws.services.vpc.vpc_client import vpc_client
class awslambda_function_vpc_multi_az(Check):
def execute(self) -> list[Check_Report_AWS]:
findings = []
LAMBDA_MIN_AZS = awslambda_client.audit_config.get("lambda_min_azs", 2)
for function_arn, function in awslambda_client.functions.items():
reports = []
for function in awslambda_client.iter_functions():
# only proceed if check "awslambda_function_inside_vpc" did not run or did not FAIL to avoid to report that the function is not inside a VPC twice
if not awslambda_client.is_failed_check(
if awslambda_client.is_failed_check(
awslambda_function_inside_vpc.__name__,
function_arn,
function.arn,
):
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
continue
report.status = "FAIL"
report.status_extended = (
f"Lambda function {function.name} is not inside a VPC."
)
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if function.vpc_id:
function_availability_zones = {
getattr(
vpc_client.vpc_subnets.get(subnet_id),
"availability_zone",
None,
)
for subnet_id in function.subnet_ids
if subnet_id in vpc_client.vpc_subnets
}
report.status = "FAIL"
report.status_extended = (
f"Lambda function {function.name} is not inside a VPC."
)
if len(function_availability_zones) >= LAMBDA_MIN_AZS:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is inside of VPC {function.vpc_id} that spans in at least {LAMBDA_MIN_AZS} AZs: {', '.join(function_availability_zones)}."
else:
report.status_extended = f"Lambda function {function.name} is inside of VPC {function.vpc_id} that spans only in {len(function_availability_zones)} AZs: {', '.join(function_availability_zones)}. Must span in at least {LAMBDA_MIN_AZS} AZs."
if function.vpc_id:
function_availability_zones = {
getattr(
vpc_client.vpc_subnets.get(subnet_id),
"availability_zone",
None,
)
for subnet_id in function.subnet_ids
if subnet_id in vpc_client.vpc_subnets
}
findings.append(report)
if len(function_availability_zones) >= LAMBDA_MIN_AZS:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} is inside of VPC {function.vpc_id} that spans in at least {LAMBDA_MIN_AZS} AZs: {', '.join(function_availability_zones)}."
else:
report.status_extended = f"Lambda function {function.name} is inside of VPC {function.vpc_id} that spans only in {len(function_availability_zones)} AZs: {', '.join(function_availability_zones)}. Must span in at least {LAMBDA_MIN_AZS} AZs."
return findings
reports.append(report)
return reports
@@ -9,6 +9,7 @@ import requests
from botocore.client import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.check.resource_limit import get_resource_scan_limit, limit_resources
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
@@ -18,12 +19,17 @@ class Lambda(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
# functions is the memoization cache for the lazy iter_functions()
# generator. Functions are listed eagerly (a single paginated
# list_functions call per region, no per-function cost), but expensive
# per-function detail is hydrated on demand for selected functions.
self.functions = {}
self._functions_hydrated = set()
self._event_source_mappings_listed_functions = set()
self.function_limit = get_resource_scan_limit(
self.audit_config, "max_lambda_functions"
)
self.__threading_call__(self._list_functions)
self._list_tags_for_resource()
self.__threading_call__(self._get_policy)
self.__threading_call__(self._get_function_url_config)
self.__threading_call__(self._list_event_source_mappings)
def _list_functions(self, regional_client):
logger.info("Lambda - Listing Functions...")
@@ -48,6 +54,10 @@ class Lambda(AWSService):
subnet_ids=set(vpc_config.get("SubnetIds", [])),
region=regional_client.region,
)
if "LastModified" in function:
self.functions[lambda_arn].last_modified = function[
"LastModified"
]
if "Runtime" in function:
self.functions[lambda_arn].runtime = function["Runtime"]
if "Environment" in function:
@@ -76,18 +86,19 @@ class Lambda(AWSService):
f" {error}"
)
def _list_event_source_mappings(self, regional_client):
def _list_event_source_mappings(self, function):
logger.info("Lambda - Listing Event Source Mappings...")
try:
regional_client = self.regional_clients[function.region]
paginator = regional_client.get_paginator("list_event_source_mappings")
for page in paginator.paginate():
for page in paginator.paginate(FunctionName=function.name):
for mapping in page.get("EventSourceMappings", []):
function_arn = mapping.get("FunctionArn", "")
# Normalise to unqualified ARN (strip :qualifier suffix if present)
base_arn = ":".join(function_arn.split(":")[:7])
if base_arn not in self.functions:
if base_arn != function.arn:
continue
self.functions[base_arn].event_source_mappings.append(
function.event_source_mappings.append(
EventSourceMapping(
uuid=mapping["UUID"],
event_source_arn=mapping.get("EventSourceArn", ""),
@@ -110,7 +121,7 @@ class Lambda(AWSService):
self.thread_pool.submit(
self._fetch_function_code, function.name, function.region
): function
for function in self.functions.values()
for function in self.selected_functions()
}
for fetched_lambda_code in as_completed(lambda_functions_to_fetch):
@@ -143,75 +154,95 @@ class Lambda(AWSService):
)
raise
def _get_policy(self, regional_client):
def _get_policy(self, function):
logger.info("Lambda - Getting Policy...")
try:
for function in self.functions.values():
if function.region == regional_client.region:
try:
function_policy = regional_client.get_policy(
FunctionName=function.name
)
self.functions[function.arn].policy = json.loads(
function_policy["Policy"]
)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
self.functions[function.arn].policy = {}
regional_client = self.regional_clients[function.region]
try:
function_policy = regional_client.get_policy(FunctionName=function.name)
function.policy = json.loads(function_policy["Policy"])
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
function.policy = {}
except Exception as error:
logger.error(
f"{regional_client.region} --"
f"{function.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def _get_function_url_config(self, regional_client):
def _get_function_url_config(self, function):
logger.info("Lambda - Getting Function URL Config...")
try:
for function in self.functions.values():
if function.region == regional_client.region:
try:
function_url_config = regional_client.get_function_url_config(
FunctionName=function.name
)
if "Cors" in function_url_config:
allow_origins = function_url_config["Cors"]["AllowOrigins"]
else:
allow_origins = []
self.functions[function.arn].url_config = URLConfig(
auth_type=function_url_config["AuthType"],
url=function_url_config["FunctionUrl"],
cors_config=URLConfigCORS(allow_origins=allow_origins),
)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
self.functions[function.arn].url_config = None
regional_client = self.regional_clients[function.region]
try:
function_url_config = regional_client.get_function_url_config(
FunctionName=function.name
)
if "Cors" in function_url_config:
allow_origins = function_url_config["Cors"]["AllowOrigins"]
else:
allow_origins = []
function.url_config = URLConfig(
auth_type=function_url_config["AuthType"],
url=function_url_config["FunctionUrl"],
cors_config=URLConfigCORS(allow_origins=allow_origins),
)
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
function.url_config = None
except Exception as error:
logger.error(
f"{regional_client.region} --"
f"{function.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def _list_tags_for_resource(self):
def _list_tags_for_resource(self, function):
logger.info("Lambda - List Tags...")
try:
for function in self.functions.values():
try:
regional_client = self.regional_clients[function.region]
response = regional_client.list_tags(Resource=function.arn)["Tags"]
function.tags = [response]
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
function.tags = []
regional_client = self.regional_clients[function.region]
try:
response = regional_client.list_tags(Resource=function.arn)["Tags"]
function.tags = [response]
except ClientError as e:
if e.response["Error"]["Code"] == "ResourceNotFoundException":
function.tags = []
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{function.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def selected_functions(self):
return limit_resources(
sorted(
self.functions.values(),
key=lambda f: f.last_modified or "",
reverse=True,
),
self.function_limit,
)
def iter_functions(self):
"""Yield functions lazily, hydrating expensive per-function detail on demand.
``list_functions`` has no server-side ordering, so newest-first is
best-effort by ``LastModified``. Policy, URL config, tags and event
source mappings are fetched only for the functions the consumer
actually pulls, memoized per function ARN and shared across checks
(checks run sequentially, so no locking needed).
"""
for function in self.selected_functions():
if function.arn not in self._functions_hydrated:
if function.arn not in self._event_source_mappings_listed_functions:
self._list_event_source_mappings(function)
self._event_source_mappings_listed_functions.add(function.arn)
self._get_policy(function)
self._get_function_url_config(function)
self._list_tags_for_resource(function)
self._functions_hydrated.add(function.arn)
yield function
class LambdaCode(BaseModel):
location: str
@@ -259,6 +290,7 @@ class Function(BaseModel):
name: str
arn: str
security_groups: list
last_modified: Optional[str] = None
runtime: Optional[str] = None
environment: Optional[dict] = None
region: str
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.backup.backup_client import backup_client
class backup_recovery_point_encrypted(Check):
def execute(self):
findings = []
for recovery_point in backup_client.recovery_points:
reports = []
for recovery_point in backup_client.iter_recovery_points():
report = Check_Report_AWS(metadata=self.metadata(), resource=recovery_point)
report.region = recovery_point.backup_vault_region
report.status = "FAIL"
@@ -13,7 +13,5 @@ class backup_recovery_point_encrypted(Check):
if recovery_point.encrypted:
report.status = "PASS"
report.status_extended = f"Backup Recovery Point {recovery_point.id} for Backup Vault {recovery_point.backup_vault_name} is encrypted at rest."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,6 +4,7 @@ from typing import Optional
from botocore.client import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.check.resource_limit import get_resource_scan_limit, limit_resources
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
@@ -27,9 +28,14 @@ class Backup(AWSService):
self.__threading_call__(self._list_backup_report_plans)
self.protected_resources = []
self.__threading_call__(self._list_backup_selections)
# recovery_points is the memoization cache for the lazy
# iter_recovery_points() generator; populated on demand so only the
# selected recovery points are tagged and analyzed.
self.recovery_points = []
self.__threading_call__(self._list_recovery_points)
self.__threading_call__(self._list_tags, self.recovery_points)
self._recovery_points_listed = False
self.recovery_point_limit = get_resource_scan_limit(
self.audit_config, "max_backup_recovery_points"
)
def _list_backup_vaults(self, regional_client):
logger.info("Backup - Listing Backup Vaults...")
@@ -183,38 +189,64 @@ class Backup(AWSService):
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _list_recovery_points(self, regional_client):
logger.info("Backup - Listing Recovery Points...")
def iter_recovery_points(self):
"""Yield recovery points lazily, memoized.
``list_recovery_points_by_backup_vault`` has no server-side ordering,
so candidates are enumerated and selected newest-first by
``CreationDate``. Tags are fetched only for selected recovery points
(checks run sequentially, so no locking needed).
"""
if self._recovery_points_listed:
yield from limit_resources(self.recovery_points, self.recovery_point_limit)
return
try:
if self.backup_vaults:
for backup_vault in self.backup_vaults:
paginator = regional_client.get_paginator(
"list_recovery_points_by_backup_vault"
)
for page in paginator.paginate(BackupVaultName=backup_vault.name):
for recovery_point in page.get("RecoveryPoints", []):
arn = recovery_point.get("RecoveryPointArn")
if arn:
self.recovery_points.append(
RecoveryPoint(
arn=arn,
id=arn.split(":")[-1],
backup_vault_name=backup_vault.name,
encrypted=recovery_point.get(
"IsEncrypted", False
),
backup_vault_region=backup_vault.region,
region=regional_client.region,
tags=[],
)
)
candidates = []
for backup_vault in self.backup_vaults or []:
regional_client = self.regional_clients[backup_vault.region]
paginator = regional_client.get_paginator(
"list_recovery_points_by_backup_vault"
)
for page in paginator.paginate(BackupVaultName=backup_vault.name):
for recovery_point in page.get("RecoveryPoints", []):
arn = recovery_point.get("RecoveryPointArn")
if not arn:
continue
candidates.append((backup_vault, recovery_point))
for backup_vault, recovery_point in limit_resources(
sorted(
candidates,
key=lambda candidate: (
candidate[1]["CreationDate"].timestamp()
if candidate[1].get("CreationDate")
else 0.0
),
reverse=True,
),
self.recovery_point_limit,
):
arn = recovery_point.get("RecoveryPointArn")
rp = RecoveryPoint(
arn=arn,
id=arn.split(":")[-1],
backup_vault_name=backup_vault.name,
encrypted=recovery_point.get("IsEncrypted", False),
creation_date=recovery_point.get("CreationDate"),
backup_vault_region=backup_vault.region,
region=backup_vault.region,
tags=[],
)
self._list_tags(rp)
self.recovery_points.append(rp)
yield rp
self._recovery_points_listed = True
except ClientError as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
@@ -256,4 +288,5 @@ class RecoveryPoint(BaseModel):
backup_vault_name: str
encrypted: bool
backup_vault_region: str
creation_date: Optional[datetime] = None
tags: Optional[list] = None
@@ -4,15 +4,16 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_group_kms_encryption_enabled(Check):
def execute(self):
findings = []
if logs_client.log_groups:
for log_group in logs_client.log_groups.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
if log_group.kms_id:
report.status = "PASS"
report.status_extended = f"Log Group {log_group.name} does have AWS KMS key {log_group.kms_id} associated."
else:
report.status = "FAIL"
report.status_extended = f"Log Group {log_group.name} does not have AWS KMS keys associated."
findings.append(report)
return findings
reports = []
for log_group in logs_client.iter_log_groups():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
if log_group.kms_id:
report.status = "PASS"
report.status_extended = f"Log Group {log_group.name} does have AWS KMS key {log_group.kms_id} associated."
else:
report.status = "FAIL"
report.status_extended = (
f"Log Group {log_group.name} does not have AWS KMS keys associated."
)
reports.append(report)
return reports
@@ -10,97 +10,93 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_group_no_secrets_in_logs(Check):
def execute(self):
findings = []
if logs_client.log_groups:
secrets_ignore_patterns = logs_client.audit_config.get(
"secrets_ignore_patterns", []
)
for log_group in logs_client.log_groups.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
report.status = "PASS"
report.status_extended = (
f"No secrets found in {log_group.name} log group."
)
log_group_secrets = []
if log_group.log_streams:
for log_stream_name in log_group.log_streams:
log_stream_secrets = {}
log_stream_data = "\n".join(
if not logs_client.log_groups:
return []
secrets_ignore_patterns = logs_client.audit_config.get(
"secrets_ignore_patterns", []
)
reports = []
for log_group in logs_client.iter_log_groups(with_events=True):
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
report.status = "PASS"
report.status_extended = f"No secrets found in {log_group.name} log group."
log_group_secrets = []
if log_group.log_streams:
for log_stream_name in log_group.log_streams:
log_stream_secrets = {}
log_stream_data = "\n".join(
[
dumps(event["message"])
for event in log_group.log_streams[log_stream_name]
]
)
log_stream_secrets_output = detect_secrets_scan(
data=log_stream_data,
excluded_secrets=secrets_ignore_patterns,
detect_secrets_plugins=logs_client.audit_config.get(
"detect_secrets_plugins",
),
)
if log_stream_secrets_output:
for secret in log_stream_secrets_output:
flagged_event = log_group.log_streams[log_stream_name][
secret["line_number"] - 1
]
cloudwatch_timestamp = (
convert_to_cloudwatch_timestamp_format(
flagged_event["timestamp"]
)
)
if cloudwatch_timestamp not in log_stream_secrets.keys():
log_stream_secrets[cloudwatch_timestamp] = SecretsDict()
try:
log_event_data = dumps(
loads(flagged_event["message"]), indent=2
)
except Exception:
log_event_data = dumps(
flagged_event["message"], indent=2
)
if len(log_event_data.split("\n")) > 1:
# Can get more informative output if there is more than 1 line.
# Will rescan just this event to get the type of secret and the line number
event_detect_secrets_output = detect_secrets_scan(
data=log_event_data,
detect_secrets_plugins=logs_client.audit_config.get(
"detect_secrets_plugins"
),
)
if event_detect_secrets_output:
for secret in event_detect_secrets_output:
log_stream_secrets[
cloudwatch_timestamp
].add_secret(
secret["line_number"], secret["type"]
)
else:
log_stream_secrets[cloudwatch_timestamp].add_secret(
1, secret["type"]
)
if log_stream_secrets:
secrets_string = "; ".join(
[
dumps(event["message"])
for event in log_group.log_streams[log_stream_name]
f"at {timestamp} - {log_stream_secrets[timestamp].to_string()}"
for timestamp in log_stream_secrets
]
)
log_stream_secrets_output = detect_secrets_scan(
data=log_stream_data,
excluded_secrets=secrets_ignore_patterns,
detect_secrets_plugins=logs_client.audit_config.get(
"detect_secrets_plugins",
),
log_group_secrets.append(
f"in log stream {log_stream_name} {secrets_string}"
)
if log_stream_secrets_output:
for secret in log_stream_secrets_output:
flagged_event = log_group.log_streams[log_stream_name][
secret["line_number"] - 1
]
cloudwatch_timestamp = (
convert_to_cloudwatch_timestamp_format(
flagged_event["timestamp"]
)
)
if (
cloudwatch_timestamp
not in log_stream_secrets.keys()
):
log_stream_secrets[cloudwatch_timestamp] = (
SecretsDict()
)
try:
log_event_data = dumps(
loads(flagged_event["message"]), indent=2
)
except Exception:
log_event_data = dumps(
flagged_event["message"], indent=2
)
if len(log_event_data.split("\n")) > 1:
# Can get more informative output if there is more than 1 line.
# Will rescan just this event to get the type of secret and the line number
event_detect_secrets_output = detect_secrets_scan(
data=log_event_data,
detect_secrets_plugins=logs_client.audit_config.get(
"detect_secrets_plugins"
),
)
if event_detect_secrets_output:
for secret in event_detect_secrets_output:
log_stream_secrets[
cloudwatch_timestamp
].add_secret(
secret["line_number"], secret["type"]
)
else:
log_stream_secrets[cloudwatch_timestamp].add_secret(
1, secret["type"]
)
if log_stream_secrets:
secrets_string = "; ".join(
[
f"at {timestamp} - {log_stream_secrets[timestamp].to_string()}"
for timestamp in log_stream_secrets
]
)
log_group_secrets.append(
f"in log stream {log_stream_name} {secrets_string}"
)
if log_group_secrets:
secrets_string = "; ".join(log_group_secrets)
report.status = "FAIL"
report.status_extended = f"Potential secrets found in log group {log_group.name} {secrets_string}."
findings.append(report)
return findings
if log_group_secrets:
secrets_string = "; ".join(log_group_secrets)
report.status = "FAIL"
report.status_extended = f"Potential secrets found in log group {log_group.name} {secrets_string}."
reports.append(report)
return reports
class SecretsDict(dict):
@@ -5,40 +5,36 @@ from prowler.providers.aws.services.iam.lib.policy import is_policy_public
class cloudwatch_log_group_not_publicly_accessible(Check):
def execute(self):
findings = []
if logs_client.resource_policies is None or logs_client.log_groups is None:
return []
public_log_groups = []
if (
logs_client.resource_policies is not None
and logs_client.log_groups is not None
):
for resource_policies in logs_client.resource_policies.values():
if resource_policies is not None:
for resource_policy in resource_policies:
if is_policy_public(
resource_policy.policy, logs_client.audited_account
):
for statement in resource_policy.policy.get(
"Statement", []
):
public_resources = statement.get("Resource", [])
if isinstance(public_resources, str):
public_resources = [public_resources]
for resource in public_resources:
for log_group in logs_client.log_groups.values():
if log_group.arn in resource or resource == "*":
public_log_groups.append(log_group.arn)
for log_group in logs_client.log_groups.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
report.status = "PASS"
for resource_policies in logs_client.resource_policies.values():
if resource_policies is not None:
for resource_policy in resource_policies:
if is_policy_public(
resource_policy.policy, logs_client.audited_account
):
for statement in resource_policy.policy.get("Statement", []):
public_resources = statement.get("Resource", [])
if isinstance(public_resources, str):
public_resources = [public_resources]
for resource in public_resources:
for log_group in logs_client.log_groups.values():
if log_group.arn in resource or resource == "*":
public_log_groups.append(log_group.arn)
reports = []
for log_group in logs_client.iter_log_groups():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
report.status = "PASS"
report.status_extended = (
f"Log Group {log_group.name} is not publicly accessible."
)
if log_group.arn in public_log_groups:
report.status = "FAIL"
report.status_extended = (
f"Log Group {log_group.name} is not publicly accessible."
f"Log Group {log_group.name} is publicly accessible."
)
if log_group.arn in public_log_groups:
report.status = "FAIL"
report.status_extended = (
f"Log Group {log_group.name} is publicly accessible."
)
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,26 +4,25 @@ from prowler.providers.aws.services.cloudwatch.logs_client import logs_client
class cloudwatch_log_group_retention_policy_specific_days_enabled(Check):
def execute(self):
findings = []
# log_group_retention_days, default: 365 days
specific_retention_days = logs_client.audit_config.get(
"log_group_retention_days", 365
)
if logs_client.log_groups:
for log_group in logs_client.log_groups.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
if (
log_group.never_expire is False
and log_group.retention_days < specific_retention_days
):
report.status = "FAIL"
report.status_extended = f"Log Group {log_group.name} has less than {specific_retention_days} days retention period ({log_group.retention_days} days)."
reports = []
for log_group in logs_client.iter_log_groups():
report = Check_Report_AWS(metadata=self.metadata(), resource=log_group)
if (
log_group.never_expire is False
and log_group.retention_days < specific_retention_days
):
report.status = "FAIL"
report.status_extended = f"Log Group {log_group.name} has less than {specific_retention_days} days retention period ({log_group.retention_days} days)."
else:
report.status = "PASS"
if log_group.never_expire is True:
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it never expires."
else:
report.status = "PASS"
if log_group.never_expire is True:
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it never expires."
else:
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it has {log_group.retention_days} days."
findings.append(report)
return findings
report.status_extended = f"Log Group {log_group.name} comply with {specific_retention_days} days retention period since it has {log_group.retention_days} days."
reports.append(report)
return reports
@@ -5,6 +5,7 @@ from typing import Optional
from botocore.exceptions import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.check.resource_limit import get_resource_scan_limit, limit_resources
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
@@ -83,24 +84,22 @@ class Logs(AWSService):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.log_group_arn_template = f"arn:{self.audited_partition}:logs:{self.region}:{self.audited_account}:log-group"
# log_groups is listed eagerly (it also feeds metric filters), but the
# expensive per-log-group hydration (tags, and log events for the
# no-secrets check) is deferred to iter_log_groups() so only selected
# log groups are enriched and analyzed.
self.log_groups = {}
self._log_groups_hydrated = set()
self.log_group_limit = get_resource_scan_limit(
self.audit_config, "max_cloudwatch_log_groups"
)
# The threshold for number of events to return per log group.
self.events_per_log_group_threshold = 1000
self.__threading_call__(self._describe_log_groups)
self.resource_policies = {}
self.__threading_call__(self._describe_resource_policies)
self.metric_filters = []
self.__threading_call__(self._describe_metric_filters)
if self.log_groups:
if (
"cloudwatch_log_group_no_secrets_in_logs"
in provider.audit_metadata.expected_checks
):
self.events_per_log_group_threshold = (
1000 # The threshold for number of events to return per log group.
)
self.__threading_call__(self._get_log_events)
self.__threading_call__(
self._list_tags_for_resource, self.log_groups.values()
)
def _describe_metric_filters(self, regional_client):
logger.info("CloudWatch Logs - Describing metric filters...")
@@ -123,6 +122,10 @@ class Logs(AWSService):
log_group = lg
break
if log_group and log_group.arn not in self._log_groups_hydrated:
self._list_tags_for_resource(log_group)
self._log_groups_hydrated.add(log_group.arn)
self.metric_filters.append(
MetricFilter(
arn=arn,
@@ -174,6 +177,7 @@ class Logs(AWSService):
retention_days=retention_days,
never_expire=never_expire,
kms_id=kms,
creation_time=log_group.get("creationTime"),
region=regional_client.region,
)
except ClientError as error:
@@ -192,37 +196,52 @@ class Logs(AWSService):
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def _get_log_events(self, regional_client):
regional_log_groups = [
log_group
for log_group in self.log_groups.values()
if log_group.region == regional_client.region
]
total_log_groups = len(regional_log_groups)
def _get_log_events(self, log_group):
logger.info(
f"CloudWatch Logs - Retrieving log events for {total_log_groups} log groups in {regional_client.region}..."
f"CloudWatch Logs - Retrieving log events for log group {log_group.name}..."
)
try:
for count, log_group in enumerate(regional_log_groups, start=1):
events = regional_client.filter_log_events(
logGroupName=log_group.name,
limit=self.events_per_log_group_threshold,
)["events"]
for event in events:
if event["logStreamName"] not in log_group.log_streams:
log_group.log_streams[event["logStreamName"]] = []
log_group.log_streams[event["logStreamName"]].append(event)
if count % 10 == 0:
logger.info(
f"CloudWatch Logs - Retrieved log events for {count}/{total_log_groups} log groups in {regional_client.region}..."
)
regional_client = self.regional_clients[log_group.region]
events = regional_client.filter_log_events(
logGroupName=log_group.name,
limit=self.events_per_log_group_threshold,
)["events"]
for event in events:
if event["logStreamName"] not in log_group.log_streams:
log_group.log_streams[event["logStreamName"]] = []
log_group.log_streams[event["logStreamName"]].append(event)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
f"{log_group.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(
f"CloudWatch Logs - Finished retrieving log events in {regional_client.region}..."
)
def iter_log_groups(self, with_events: bool = False):
"""Yield log groups lazily, hydrating tags (and events) on demand.
``describe_log_groups`` has no server-side ordering, so newest-first
is best-effort by ``creationTime``. Tags, and log events for the
no-secrets check, are fetched only for the log groups the consumer
actually pulls, memoized per ARN and shared across checks (checks run
sequentially, so no locking needed).
"""
if not self.log_groups:
return
for log_group in limit_resources(
sorted(
self.log_groups.values(),
key=lambda lg: lg.creation_time or 0,
reverse=True,
),
self.log_group_limit,
):
if log_group.arn not in self._log_groups_hydrated:
self._list_tags_for_resource(log_group)
if with_events:
self._get_log_events(log_group)
self._log_groups_hydrated.add(log_group.arn)
elif with_events and not log_group.log_streams:
self._get_log_events(log_group)
yield log_group
def _describe_resource_policies(self, regional_client):
logger.info("CloudWatch Logs - Describing resource policies...")
@@ -292,6 +311,7 @@ class LogGroup(BaseModel):
retention_days: int
never_expire: bool
kms_id: Optional[str]
creation_time: Optional[int] = None
region: str
log_streams: dict[str, list[str]] = (
{}
@@ -10,27 +10,26 @@ from prowler.providers.aws.services.codeartifact.codeartifact_service import (
class codeartifact_packages_external_public_publishing_disabled(Check):
def execute(self):
findings = []
for repository in codeartifact_client.repositories.values():
for package in repository.packages:
report = Check_Report_AWS(metadata=self.metadata(), resource=repository)
report.resource_id = f"{repository.domain_name}/{package.name}"
report.resource_arn = f"{repository.arn}/{package.namespace + ':' if package.namespace else ''}{package.name}"
reports = []
for repository, package in codeartifact_client.iter_packages():
if package.latest_version.origin.origin_type not in (
OriginInformationValues.INTERNAL,
OriginInformationValues.UNKNOWN,
):
continue
if package.latest_version.origin.origin_type in (
OriginInformationValues.INTERNAL,
OriginInformationValues.UNKNOWN,
):
if (
package.origin_configuration.restrictions.upstream
== RestrictionValues.ALLOW
):
report.status = "FAIL"
report.status_extended = f"Internal package {package.name} is vulnerable to dependency confusion in repository {repository.domain_name}."
else:
report.status = "PASS"
report.status_extended = f"Internal package {package.name} is not vulnerable to dependency confusion in repository {repository.domain_name}."
report = Check_Report_AWS(metadata=self.metadata(), resource=repository)
report.resource_id = f"{repository.domain_name}/{package.name}"
report.resource_arn = f"{repository.arn}/{package.namespace + ':' if package.namespace else ''}{package.name}"
findings.append(report)
return findings
if (
package.origin_configuration.restrictions.upstream
== RestrictionValues.ALLOW
):
report.status = "FAIL"
report.status_extended = f"Internal package {package.name} is vulnerable to dependency confusion in repository {repository.domain_name}."
else:
report.status = "PASS"
report.status_extended = f"Internal package {package.name} is not vulnerable to dependency confusion in repository {repository.domain_name}."
reports.append(report)
return reports
@@ -1,9 +1,10 @@
from enum import Enum
from typing import Optional
from typing import Iterator, Optional, Tuple
from botocore.exceptions import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.check.resource_limit import get_resource_scan_limit
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
@@ -15,8 +16,13 @@ class CodeArtifact(AWSService):
super().__init__(__class__.__name__, provider)
# repositories is a dictionary containing all the codeartifact service information
self.repositories = {}
# repository ARNs whose packages have been fully listed and memoized
# into repository.packages by the lazy iter_packages() generator
self._packages_listed = set()
self.package_limit = get_resource_scan_limit(
self.audit_config, "max_codeartifact_packages"
)
self.__threading_call__(self._list_repositories)
self.__threading_call__(self._list_packages)
self._list_tags_for_resource()
def _list_repositories(self, regional_client):
@@ -51,124 +57,129 @@ class CodeArtifact(AWSService):
f" {error}"
)
def _list_packages(self, regional_client):
logger.info("CodeArtifact - Listing Packages and retrieving information...")
for repository in self.repositories:
try:
if self.repositories[repository].region == regional_client.region:
list_packages_paginator = regional_client.get_paginator(
"list_packages"
)
list_packages_parameters = {
"domain": self.repositories[repository].domain_name,
"domainOwner": self.repositories[repository].domain_owner,
"repository": self.repositories[repository].name,
def _iter_repository_packages(self, repository) -> Iterator["Package"]:
"""Yield packages for a single repository, hydrating each one lazily.
Each package requires an extra ``list_package_versions`` call to
resolve its latest version, so producing them lazily lets the resource
limit stop before extra package version calls.
"""
regional_client = self.regional_clients[repository.region]
try:
list_packages_paginator = regional_client.get_paginator("list_packages")
list_packages_parameters = {
"domain": repository.domain_name,
"domainOwner": repository.domain_owner,
"repository": repository.name,
}
for page in list_packages_paginator.paginate(**list_packages_parameters):
for package in page["packages"]:
# Package information
package_format = package["format"]
package_namespace = package.get("namespace")
package_name = package["package"]
package_origin_configuration_restrictions_publish = package[
"originConfiguration"
]["restrictions"]["publish"]
package_origin_configuration_restrictions_upstream = package[
"originConfiguration"
]["restrictions"]["upstream"]
# Get Latest Package Version
list_package_versions_parameters = {
"domain": repository.domain_name,
"domainOwner": repository.domain_owner,
"repository": repository.name,
"format": package_format,
"package": package_name,
"sortBy": "PUBLISHED_TIME",
"maxResults": 1,
}
packages = []
for page in list_packages_paginator.paginate(
**list_packages_parameters
):
for package in page["packages"]:
# Package information
package_format = package["format"]
package_namespace = package.get("namespace")
package_name = package["package"]
package_origin_configuration_restrictions_publish = package[
"originConfiguration"
]["restrictions"]["publish"]
package_origin_configuration_restrictions_upstream = (
package["originConfiguration"]["restrictions"][
"upstream"
]
)
# Get Latest Package Version
if package_namespace:
latest_version_information = (
regional_client.list_package_versions(
domain=self.repositories[
repository
].domain_name,
domainOwner=self.repositories[
repository
].domain_owner,
repository=self.repositories[repository].name,
format=package_format,
namespace=package_namespace,
package=package_name,
sortBy="PUBLISHED_TIME",
maxResults=1,
)
)
else:
latest_version_information = (
regional_client.list_package_versions(
domain=self.repositories[
repository
].domain_name,
domainOwner=self.repositories[
repository
].domain_owner,
repository=self.repositories[repository].name,
format=package_format,
package=package_name,
sortBy="PUBLISHED_TIME",
maxResults=1,
)
)
latest_version = ""
latest_origin_type = "UNKNOWN"
latest_status = "Published"
if latest_version_information.get("versions"):
latest_version = latest_version_information["versions"][
0
].get("version")
latest_origin_type = (
latest_version_information["versions"][0]
.get("origin", {})
.get("originType", "UNKNOWN")
)
latest_status = latest_version_information["versions"][
0
].get("status", "Published")
packages.append(
Package(
name=package_name,
namespace=package_namespace,
format=package_format,
origin_configuration=OriginConfiguration(
restrictions=Restrictions(
publish=package_origin_configuration_restrictions_publish,
upstream=package_origin_configuration_restrictions_upstream,
)
),
latest_version=LatestPackageVersion(
version=latest_version,
status=latest_status,
origin=OriginInformation(
origin_type=latest_origin_type
),
),
)
)
# Save all the packages information
self.repositories[repository].packages = packages
except ClientError as error:
if error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.warning(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
if package_namespace:
list_package_versions_parameters["namespace"] = (
package_namespace
)
latest_version_information = regional_client.list_package_versions(
**list_package_versions_parameters
)
continue
latest_version = ""
latest_origin_type = "UNKNOWN"
latest_status = "Published"
if latest_version_information.get("versions"):
latest_version = latest_version_information["versions"][0].get(
"version"
)
latest_origin_type = (
latest_version_information["versions"][0]
.get("origin", {})
.get("originType", "UNKNOWN")
)
latest_status = latest_version_information["versions"][0].get(
"status", "Published"
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
yield Package(
name=package_name,
namespace=package_namespace,
format=package_format,
origin_configuration=OriginConfiguration(
restrictions=Restrictions(
publish=package_origin_configuration_restrictions_publish,
upstream=package_origin_configuration_restrictions_upstream,
)
),
latest_version=LatestPackageVersion(
version=latest_version,
status=latest_status,
origin=OriginInformation(origin_type=latest_origin_type),
),
)
except ClientError as error:
if error.response["Error"]["Code"] == "ResourceNotFoundException":
logger.warning(
f"{repository.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
else:
logger.error(
f"{repository.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
except Exception as error:
logger.error(
f"{repository.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def iter_packages(self) -> Iterator[Tuple["Repository", "Package"]]:
"""Yield ``(repository, package)`` pairs lazily, memoized per repository.
Packages already fetched are cached in ``repository.packages`` and
reused on subsequent passes (checks run sequentially, so no locking is
needed).
"""
yielded = 0
for repository in list(self.repositories.values()):
if repository.arn in self._packages_listed:
for package in repository.packages:
yield repository, package
yielded += 1
if self.package_limit and yielded >= self.package_limit:
return
continue
collected = []
for package in self._iter_repository_packages(repository):
collected.append(package)
repository.packages = collected
yield repository, package
yielded += 1
if self.package_limit and yielded >= self.package_limit:
self._packages_listed.add(repository.arn)
return
self._packages_listed.add(repository.arn)
def _list_tags_for_resource(self):
logger.info("CodeArtifact - List Tags...")
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ec2.ec2_client import ec2_client
class ec2_ebs_public_snapshot(Check):
def execute(self):
findings = []
for snapshot in ec2_client.snapshots:
reports = []
for snapshot in ec2_client.iter_snapshots(determine_public=True):
report = Check_Report_AWS(metadata=self.metadata(), resource=snapshot)
report.status = "PASS"
report.status_extended = f"EBS Snapshot {snapshot.id} is not Public."
@@ -14,6 +14,5 @@ class ec2_ebs_public_snapshot(Check):
report.status_extended = (
f"EBS Snapshot {snapshot.id} is currently Public."
)
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,14 +4,13 @@ from prowler.providers.aws.services.ec2.ec2_client import ec2_client
class ec2_ebs_snapshots_encrypted(Check):
def execute(self):
findings = []
for snapshot in ec2_client.snapshots:
reports = []
for snapshot in ec2_client.iter_snapshots():
report = Check_Report_AWS(metadata=self.metadata(), resource=snapshot)
report.status = "PASS"
report.status_extended = f"EBS Snapshot {snapshot.id} is encrypted."
if not snapshot.encrypted:
report.status = "FAIL"
report.status_extended = f"EBS Snapshot {snapshot.id} is unencrypted."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -5,6 +5,7 @@ from typing import Optional, Union
from botocore.client import ClientError
from pydantic.v1 import BaseModel
from prowler.lib.check.resource_limit import get_resource_scan_limit, limit_resources
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
@@ -26,8 +27,13 @@ class EC2(AWSService):
self.snapshots = []
self.volumes_with_snapshots = {}
self.regions_with_snapshots = {}
# Snapshot IDs whose public status has already been hydrated by the
# lazy iter_snapshots() generator (memoization, sequential checks)
self._public_snapshots_determined = set()
self.snapshot_limit = get_resource_scan_limit(
self.audit_config, "max_ebs_snapshots"
)
self.__threading_call__(self._describe_snapshots)
self.__threading_call__(self._determine_public_snapshots, self.snapshots)
self.network_interfaces = {}
self.__threading_call__(self._describe_network_interfaces)
self.images = []
@@ -207,6 +213,7 @@ class EC2(AWSService):
arn=arn,
region=regional_client.region,
encrypted=snapshot.get("Encrypted", False),
start_time=snapshot.get("StartTime"),
tags=snapshot.get("Tags"),
volume=snapshot["VolumeId"],
)
@@ -243,6 +250,31 @@ class EC2(AWSService):
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
def iter_snapshots(self, determine_public: bool = False):
"""Yield snapshots newest-first (best-effort), hydrating public status lazily.
``self.snapshots`` is already listed eagerly (it also feeds
``volumes_with_snapshots``/``regions_with_snapshots`` used by other
checks). The expensive ``describe_snapshot_attribute`` call used to
determine public access is deferred here and only issued for selected
snapshots, memoized per snapshot id so it is shared across checks
(checks run sequentially, so no locking).
"""
for snapshot in limit_resources(
sorted(
self.snapshots,
key=lambda s: (s.start_time.timestamp() if s.start_time else 0.0),
reverse=True,
),
self.snapshot_limit,
):
if determine_public and snapshot.id not in (
self._public_snapshots_determined
):
self._determine_public_snapshots(snapshot)
self._public_snapshots_determined.add(snapshot.id)
yield snapshot
def _describe_network_interfaces(self, regional_client):
try:
# Get Network Interfaces with Public IPs
@@ -686,6 +718,7 @@ class Snapshot(BaseModel):
region: str
encrypted: bool
public: bool = False
start_time: Optional[datetime] = None
tags: Optional[list] = []
volume: Optional[str]
@@ -1,8 +1,10 @@
from datetime import datetime
from re import sub
from typing import Optional
from typing import Iterator, Optional
from pydantic.v1 import BaseModel
from prowler.lib.check.resource_limit import get_resource_scan_limit, limit_resources
from prowler.lib.logger import logger
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService
@@ -12,39 +14,72 @@ class ECS(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
# task_definitions is the memoization cache for the lazy
# iter_task_definitions() generator; it is populated on demand so only
# the selected task definitions are described and analyzed.
self.task_definitions = {}
self._task_definition_arns = None
self.task_definition_limit = get_resource_scan_limit(
self.audit_config, "max_ecs_task_definitions"
)
self.services = {}
self.clusters = {}
self.task_sets = {}
self.__threading_call__(self._list_task_definitions)
self.__threading_call__(
self._describe_task_definition, self.task_definitions.values()
)
self.__threading_call__(self._list_clusters)
self.__threading_call__(self._describe_clusters, self.clusters.values())
self.__threading_call__(self._describe_services, self.clusters.values())
def _list_task_definitions(self, regional_client):
def _list_task_definition_arns(self) -> list:
"""List task definition ARNs newest-first, memoized.
Uses the ``list_task_definitions`` server-side ``sort=DESC`` so the
latest revisions are scanned first across all regions.
"""
if self._task_definition_arns is not None:
return self._task_definition_arns
logger.info("ECS - Listing Task Definitions...")
try:
list_ecs_paginator = regional_client.get_paginator("list_task_definitions")
for page in list_ecs_paginator.paginate():
for task_definition in page["taskDefinitionArns"]:
if not self.audit_resources or (
is_resource_filtered(task_definition, self.audit_resources)
):
self.task_definitions[task_definition] = TaskDefinition(
# we want the family name without the revision
name=sub(":.*", "", task_definition.split("/")[-1]),
arn=task_definition,
revision=task_definition.split(":")[-1],
region=regional_client.region,
environment_variables=[],
)
except Exception as error:
logger.error(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
arns = []
for region, regional_client in self.regional_clients.items():
try:
list_ecs_paginator = regional_client.get_paginator(
"list_task_definitions"
)
for page in list_ecs_paginator.paginate(sort="DESC"):
for task_definition in page["taskDefinitionArns"]:
if not self.audit_resources or (
is_resource_filtered(task_definition, self.audit_resources)
):
arns.append((task_definition, region))
except Exception as error:
logger.error(
f"{region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
self._task_definition_arns = arns
return arns
def iter_task_definitions(self) -> Iterator["TaskDefinition"]:
"""Yield task definitions lazily, describing each one on demand.
Resources already fetched are memoized in ``self.task_definitions`` and
reused across checks (checks run sequentially, so no locking is needed).
The configured resource limit bounds ``describe_task_definition`` calls.
"""
for arn, region in limit_resources(
self._list_task_definition_arns(), self.task_definition_limit
):
task_definition = self.task_definitions.get(arn)
if task_definition is None:
task_definition = TaskDefinition(
# we want the family name without the revision
name=sub(":.*", "", arn.split("/")[-1]),
arn=arn,
revision=arn.split(":")[-1],
region=region,
environment_variables=[],
)
self._describe_task_definition(task_definition)
self.task_definitions[arn] = task_definition
yield task_definition
def _describe_task_definition(self, task_definition):
logger.info("ECS - Describing Task Definition...")
@@ -84,6 +119,9 @@ class ECS(AWSService):
)
)
task_definition.pid_mode = response["taskDefinition"].get("pidMode", "")
task_definition.registered_at = response["taskDefinition"].get(
"registeredAt"
)
task_definition.tags = response.get("tags")
task_definition.network_mode = response["taskDefinition"].get(
"networkMode", "bridge"
@@ -208,6 +246,7 @@ class TaskDefinition(BaseModel):
region: str
container_definitions: list[ContainerDefinition] = []
pid_mode: Optional[str]
registered_at: Optional[datetime] = None
tags: Optional[list] = []
network_mode: Optional[str]
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_containers_readonly_access(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -21,6 +21,5 @@ class ecs_task_definitions_containers_readonly_access(Check):
if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has containers with write access to the root filesystem: {', '.join(failed_containers)}"
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_host_namespace_not_shared(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -15,5 +15,5 @@ class ecs_task_definitions_host_namespace_not_shared(Check):
if task_definition.pid_mode == "host":
report.status = "FAIL"
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} is configured to share a host's process namespace with its containers."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_host_networking_mode_users(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -25,5 +25,5 @@ class ecs_task_definitions_host_networking_mode_users(Check):
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has containers with host network mode and non-privileged containers running as root or with no user specified: {', '.join(failed_containers)}"
else:
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has host network mode but no containers running as root or with no user specified."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_logging_block_mode(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -24,6 +24,7 @@ class ecs_task_definitions_logging_block_mode(Check):
if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} running with logging set to blocking mode on containers: {', '.join(failed_containers)}"
# Only task definitions with logging-enabled containers are reported
if containers > 0:
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_logging_enabled(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -21,5 +21,5 @@ class ecs_task_definitions_logging_enabled(Check):
if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has containers running with no logging configuration: {', '.join(failed_containers)}"
findings.append(report)
return findings
reports.append(report)
return reports
@@ -7,11 +7,12 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_no_environment_secrets(Check):
def execute(self):
findings = []
secrets_ignore_patterns = ecs_client.audit_config.get(
"secrets_ignore_patterns", []
)
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -58,6 +59,5 @@ class ecs_task_definitions_no_environment_secrets(Check):
)
else:
report.status_extended = f"No secrets found in variables of ECS task definition {task_definition.name} with revision {task_definition.revision}."
findings.append(report)
return findings
reports.append(report)
return reports
@@ -4,8 +4,8 @@ from prowler.providers.aws.services.ecs.ecs_client import ecs_client
class ecs_task_definitions_no_privileged_containers(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
reports = []
for task_definition in ecs_client.iter_task_definitions():
report = Check_Report_AWS(
metadata=self.metadata(), resource=task_definition
)
@@ -20,5 +20,5 @@ class ecs_task_definitions_no_privileged_containers(Check):
if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has privileged containers: {', '.join(failed_containers)}"
findings.append(report)
return findings
reports.append(report)
return reports
+71
View File
@@ -0,0 +1,71 @@
from prowler.lib.check.resource_limit import get_resource_scan_limit, limit_resources
class Test_limit_resources:
def test_no_limit_returns_all_in_order(self):
resources = ["PASS", "FAIL", "PASS"]
result = list(limit_resources(iter(resources), None))
assert result == ["PASS", "FAIL", "PASS"]
def test_limit_zero_or_negative_is_unlimited(self):
resources = list(range(5))
assert list(limit_resources(iter(resources), 0)) == resources
assert list(limit_resources(iter(resources), -3)) == resources
def test_positive_limit_stops_after_selected_resources(self):
pulled = []
def gen():
for i in range(1000):
pulled.append(i)
yield i
result = list(limit_resources(gen(), 100))
assert result == list(range(100))
assert len(pulled) == 100
def test_does_not_reorder_or_inspect_resource_status(self):
resources = ["PASS", "FAIL", "PASS", "FAIL"]
result = list(limit_resources(iter(resources), 3))
assert result == ["PASS", "FAIL", "PASS"]
class Test_get_resource_scan_limit:
def test_per_service_override_wins(self):
config = {
"max_scanned_resources_per_service": 100,
"max_ecs_task_definitions": 25,
}
assert get_resource_scan_limit(config, "max_ecs_task_definitions") == 25
def test_falls_back_to_global_default(self):
config = {"max_scanned_resources_per_service": 50}
assert get_resource_scan_limit(config, "max_ecs_task_definitions") == 50
def test_default_is_unlimited_when_unset(self):
assert get_resource_scan_limit({}, "max_ecs_task_definitions") is None
def test_null_per_service_override_falls_back_to_unlimited_global_default(self):
config = {"max_ecs_task_definitions": None}
assert get_resource_scan_limit(config, "max_ecs_task_definitions") is None
def test_non_positive_means_unlimited(self):
assert (
get_resource_scan_limit(
{"max_scanned_resources_per_service": 0}, "max_lambda_functions"
)
is None
)
assert (
get_resource_scan_limit(
{"max_lambda_functions": -1}, "max_lambda_functions"
)
is None
)
@@ -30,6 +30,10 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
@mock_aws
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.functions = {}
from prowler.providers.aws.services.cloudtrail.cloudtrail_service import (
@@ -68,6 +72,10 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
def test_lambda_not_recorded_by_cloudtrail(self):
# Lambda Client
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -136,6 +144,10 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
def test_lambda_recorded_by_cloudtrail_classic_event_selector(self):
# Lambda Client
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -216,6 +228,10 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
def test_lambda_recorded_by_cloudtrail_advanced_event_selector(self):
# Lambda Client
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -299,6 +315,10 @@ class Test_awslambda_function_invoke_api_operations_cloudtrail_logging_enabled:
def test_all_lambdas_recorded_by_cloudtrail(self):
# Lambda Client
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = "arn:aws:lambda"
@@ -11,6 +11,10 @@ from tests.providers.aws.utils import (
class Test_awslambda_function_no_secrets_in_variables:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.functions = {}
lambda_client.audit_config = {"secrets_ignore_patterns": []}
@@ -36,6 +40,10 @@ class Test_awslambda_function_no_secrets_in_variables:
def test_function_no_variables(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -84,6 +92,10 @@ class Test_awslambda_function_no_secrets_in_variables:
def test_function_secrets_in_keyword(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -132,6 +144,10 @@ class Test_awslambda_function_no_secrets_in_variables:
def test_function_secrets_in_keyword_and_variable(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -180,6 +196,10 @@ class Test_awslambda_function_no_secrets_in_variables:
def test_function_secrets_in_variables_telegram_token(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -226,6 +246,10 @@ class Test_awslambda_function_no_secrets_in_variables:
def test_function_no_secrets_in_variables(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -278,6 +278,9 @@ class Test_awslambda_function_not_publicly_accessible:
def test_function_public_with_canonical(self):
lambda_client = mock.MagicMock
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.audited_account = AWS_ACCOUNT_NUMBER
lambda_client.audit_config = {}
function_name = "test-lambda"
@@ -518,6 +521,9 @@ class Test_awslambda_function_not_publicly_accessible:
def test_function_could_be_invoked_by_specific_aws_account(self):
lambda_client = mock.MagicMock
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.audited_account = AWS_ACCOUNT_NUMBER
lambda_client.audit_config = {}
function_name = "test-lambda"
@@ -583,6 +589,9 @@ class Test_awslambda_function_not_publicly_accessible:
def test_function_could_be_invoked_by_specific_other_aws_account(self):
lambda_client = mock.MagicMock
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.audited_account = AWS_ACCOUNT_NUMBER
lambda_client.audit_config = {}
function_name = "test-lambda"
@@ -648,6 +657,9 @@ class Test_awslambda_function_not_publicly_accessible:
def test_function_public_policy_with_several_statements(self):
lambda_client = mock.MagicMock
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.audited_account = AWS_ACCOUNT_NUMBER
lambda_client.audit_config = {}
function_name = "test-lambda"
@@ -19,6 +19,10 @@ from tests.providers.aws.utils import (
class Test_awslambda_function_url_cors_policy:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.functions = {}
with (
@@ -43,6 +47,10 @@ class Test_awslambda_function_url_cors_policy:
def test_function_cors_asterisk(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -92,6 +100,10 @@ class Test_awslambda_function_url_cors_policy:
def test_function_cors_not_wide(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -141,6 +153,10 @@ class Test_awslambda_function_url_cors_policy:
def test_function_cors_wide_with_two_origins(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -16,6 +16,10 @@ from tests.providers.aws.utils import (
class Test_awslambda_function_url_public:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.functions = {}
with (
@@ -40,6 +44,10 @@ class Test_awslambda_function_url_public:
def test_function_public_url(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -89,6 +97,10 @@ class Test_awslambda_function_url_public:
def test_function_private_url(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -11,6 +11,10 @@ from tests.providers.aws.utils import (
class Test_awslambda_function_using_supported_runtimes:
def test_no_functions(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
lambda_client.functions = {}
with (
@@ -35,6 +39,10 @@ class Test_awslambda_function_using_supported_runtimes:
def test_function_obsolete_runtime(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "nodejs4.3"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -106,6 +114,10 @@ class Test_awslambda_function_using_supported_runtimes:
def test_function_supported_runtime(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_runtime = "python3.9"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
@@ -177,6 +189,10 @@ class Test_awslambda_function_using_supported_runtimes:
def test_function_no_runtime(self):
lambda_client = mock.MagicMock
lambda_client.audit_config = {}
lambda_client.iter_functions = lambda: iter(
list(lambda_client.functions.values())
)
function_name = "test-lambda"
function_arn = f"arn:aws:lambda:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}"
lambda_client.functions = {
@@ -9,7 +9,11 @@ import mock
from boto3 import client, resource
from moto import mock_aws
from prowler.providers.aws.services.awslambda.awslambda_service import AuthType, Lambda
from prowler.providers.aws.services.awslambda.awslambda_service import (
AuthType,
Function,
Lambda,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
@@ -85,6 +89,58 @@ class Test_Lambda_Service:
awslambda = Lambda(set_mocked_aws_provider([AWS_REGION_US_EAST_1]))
assert awslambda.service == "lambda"
def test_function_limit_bounds_per_function_hydration_to_latest_selected(self):
awslambda = Lambda.__new__(Lambda)
awslambda.functions = {
"old": Function(
name="old",
arn="old",
security_groups=[],
last_modified="2024-01-01T00:00:00.000+0000",
region=AWS_REGION_EU_WEST_1,
),
"new": Function(
name="new",
arn="new",
security_groups=[],
last_modified="2024-01-02T00:00:00.000+0000",
region=AWS_REGION_EU_WEST_1,
),
}
awslambda.function_limit = 1
awslambda._functions_hydrated = set()
awslambda._event_source_mappings_listed_functions = set()
awslambda.regional_clients = {AWS_REGION_EU_WEST_1: object()}
event_source_calls = []
policy_calls = []
url_calls = []
tag_calls = []
def list_event_source_mappings(function):
event_source_calls.append(function.name)
def get_policy(function):
policy_calls.append(function.name)
def get_function_url_config(function):
url_calls.append(function.name)
def list_tags_for_resource(function):
tag_calls.append(function.name)
awslambda._list_event_source_mappings = list_event_source_mappings
awslambda._get_policy = get_policy
awslambda._get_function_url_config = get_function_url_config
awslambda._list_tags_for_resource = list_tags_for_resource
functions = list(awslambda.iter_functions())
assert [function.name for function in functions] == ["new"]
assert event_source_calls == ["new"]
assert policy_calls == ["new"]
assert url_calls == ["new"]
assert tag_calls == ["new"]
@mock_aws
def test_list_functions(self):
# Create IAM Lambda Role
@@ -196,6 +252,9 @@ class Test_Lambda_Service:
)
assert awslambda.functions
assert len(awslambda.functions) == 2
# Policy, URL config and tags are hydrated lazily on iteration
assert awslambda.functions[lambda_arn_1].policy == {}
list(awslambda.iter_functions())
# Lambda 1
assert awslambda.functions[lambda_arn_1].name == lambda_name_1
assert awslambda.functions[lambda_arn_1].arn == lambda_arn_1
@@ -253,3 +312,66 @@ class Test_Lambda_Service:
f"{tmp_dir_name}/{files_in_zip[0]}", "r"
) as lambda_code_file:
assert lambda_code_file.read() == LAMBDA_FUNCTION_CODE
@mock_aws
def test_iter_functions_limits_enriched_functions(self):
lambda_client = client("lambda", region_name=AWS_REGION_US_EAST_1)
iam_client = client("iam", region_name=AWS_REGION_US_EAST_1)
iam_role = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument="{}",
)["Role"]["Arn"]
for name in ("function-1", "function-2"):
lambda_client.create_function(
FunctionName=name,
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": create_zip_file().read()},
PackageType="ZIP",
)
awslambda = Lambda(
set_mocked_aws_provider(
audited_regions=[AWS_REGION_US_EAST_1],
audit_config={"max_lambda_functions": 1},
)
)
functions = list(awslambda.iter_functions())
assert len(functions) == 1
assert len(awslambda._functions_hydrated) == 1
@mock_aws
def test_get_function_code_fetches_only_selected_functions(self):
lambda_client = client("lambda", region_name=AWS_REGION_US_EAST_1)
iam_client = client("iam", region_name=AWS_REGION_US_EAST_1)
iam_role = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument="{}",
)["Role"]["Arn"]
for name in ("function-1", "function-2"):
lambda_client.create_function(
FunctionName=name,
Runtime="python3.7",
Role=iam_role,
Handler="lambda_function.lambda_handler",
Code={"ZipFile": create_zip_file().read()},
PackageType="ZIP",
)
awslambda = Lambda(
set_mocked_aws_provider(
audited_regions=[AWS_REGION_US_EAST_1],
audit_config={"max_lambda_functions": 1},
)
)
fetched = []
def fetch_function_code(function_name, _function_region):
fetched.append(function_name)
return mock.MagicMock()
awslambda._fetch_function_code = fetch_function_code
assert len(list(awslambda._get_function_code())) == 1
assert len(fetched) == 1
@@ -1,11 +1,12 @@
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import patch
import botocore
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.backup.backup_service import Backup
from prowler.providers.aws.services.backup.backup_service import Backup, BackupVault
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
@@ -283,6 +284,14 @@ class TestBackupService:
def test_list_recovery_points(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
backup = Backup(aws_provider)
# Recovery points are fetched lazily via iter_recovery_points()
assert backup.recovery_points == []
recovery_points = list(backup.iter_recovery_points())
assert len(recovery_points) == 1
# Memoized: a second pass reuses the cache
assert list(backup.iter_recovery_points()) == recovery_points
assert len(backup.recovery_points) == 1
assert (
backup.recovery_points[0].arn
@@ -292,3 +301,95 @@ class TestBackupService:
assert backup.recovery_points[0].backup_vault_region == "eu-west-1"
assert backup.recovery_points[0].tags == []
assert backup.recovery_points[0].encrypted is True
def test_recovery_point_limit_bounds_tag_calls_to_selected_points(self):
class FakePaginator:
def paginate(self, **kwargs):
return [
{
"RecoveryPoints": [
{
"RecoveryPointArn": "arn:aws:backup:eu-west-1:123456789012:recovery-point:new",
"IsEncrypted": True,
"CreationDate": datetime(2024, 1, 2),
},
{
"RecoveryPointArn": "arn:aws:backup:eu-west-1:123456789012:recovery-point:old",
"IsEncrypted": True,
"CreationDate": datetime(2024, 1, 1),
},
]
}
]
class FakeBackupClient:
def __init__(self):
self.tag_calls = []
def get_paginator(self, name):
assert name == "list_recovery_points_by_backup_vault"
return FakePaginator()
def list_tags(self, **kwargs):
self.tag_calls.append(kwargs["ResourceArn"])
return {"Tags": {}}
regional_client = FakeBackupClient()
backup = Backup.__new__(Backup)
backup.backup_vaults = [
BackupVault(
arn="arn:aws:backup:eu-west-1:123456789012:backup-vault:vault",
name="vault",
region=AWS_REGION_EU_WEST_1,
encryption="",
recovery_points=2,
locked=False,
)
]
backup.recovery_points = []
backup._recovery_points_listed = False
backup.recovery_point_limit = 1
backup.regional_clients = {AWS_REGION_EU_WEST_1: regional_client}
recovery_points = list(backup.iter_recovery_points())
assert [rp.id for rp in recovery_points] == ["new"]
assert regional_client.tag_calls == [
"arn:aws:backup:eu-west-1:123456789012:recovery-point:new"
]
def test_iter_recovery_points_limits_tagged_resources(self):
backup = Backup.__new__(Backup)
backup.recovery_point_limit = 2
backup.recovery_points = []
backup._recovery_points_listed = False
backup.backup_vaults = [SimpleNamespace(name="vault", region="eu-west-1")]
class Paginator:
def paginate(self, **_kwargs):
return [
{
"RecoveryPoints": [
{
"RecoveryPointArn": f"arn:aws:backup:eu-west-1:123456789012:recovery-point:{i}",
"IsEncrypted": True,
}
for i in range(3)
]
}
]
backup.regional_clients = {
"eu-west-1": SimpleNamespace(get_paginator=lambda _: Paginator())
}
tagged = []
def list_tags(recovery_point):
tagged.append(recovery_point.arn)
backup._list_tags = list_tags
recovery_points = list(backup.iter_recovery_points())
assert len(recovery_points) == 2
assert len(tagged) == 2
@@ -3,6 +3,7 @@ from moto import mock_aws
from prowler.providers.aws.services.cloudwatch.cloudwatch_service import (
CloudWatch,
LogGroup,
Logs,
)
from tests.providers.aws.utils import (
@@ -190,6 +191,8 @@ class Test_CloudWatch_Service:
assert logs.log_groups[arn].kms_id == "test_kms_id"
assert not logs.log_groups[arn].never_expire
assert logs.log_groups[arn].region == AWS_REGION_US_EAST_1
assert logs.log_groups[arn].tags == []
list(logs.iter_log_groups())
assert logs.log_groups[arn].tags == [{}]
@mock_aws
@@ -215,4 +218,44 @@ class Test_CloudWatch_Service:
assert logs.log_groups[arn].retention_days == 9999
assert logs.log_groups[arn].kms_id == "test_kms_id"
assert logs.log_groups[arn].region == AWS_REGION_US_EAST_1
assert logs.log_groups[arn].tags == [{}]
assert logs.log_groups[arn].tags == []
def test_iter_log_groups_limits_enriched_resources(self):
class FakeLogsClient:
def __init__(self):
self.filter_calls = []
def filter_log_events(self, **kwargs):
self.filter_calls.append(kwargs["logGroupName"])
return {"events": []}
regional_client = FakeLogsClient()
logs = Logs.__new__(Logs)
logs.log_group_limit = 1
logs._log_groups_hydrated = set()
logs.regional_clients = {AWS_REGION_US_EAST_1: regional_client}
logs.events_per_log_group_threshold = 1000
logs.log_groups = {
f"arn:{i}": LogGroup(
arn=f"arn:{i}",
name=f"log-{i}",
retention_days=30,
never_expire=False,
kms_id=None,
creation_time=i,
region=AWS_REGION_US_EAST_1,
)
for i in range(3)
}
tagged = []
def list_tags(log_group):
tagged.append(log_group.arn)
logs._list_tags_for_resource = list_tags
log_groups = list(logs.iter_log_groups(with_events=True))
assert [log_group.arn for log_group in log_groups] == ["arn:2"]
assert tagged == ["arn:2"]
assert regional_client.filter_calls == ["log-2"]
@@ -16,9 +16,20 @@ from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER
AWS_REGION = "eu-west-1"
def _mock_iter_packages(client):
"""Emulate CodeArtifact.iter_packages() over hand-built repositories."""
for repository in client.repositories.values():
for package in repository.packages:
yield repository, package
class Test_codeartifact_packages_external_public_publishing_disabled:
def test_no_repositories(self):
codeartifact_client = mock.MagicMock
codeartifact_client.audit_config = {}
codeartifact_client.iter_packages = lambda: _mock_iter_packages(
codeartifact_client
)
codeartifact_client.repositories = {}
with (
mock.patch(
@@ -42,6 +53,10 @@ class Test_codeartifact_packages_external_public_publishing_disabled:
def test_repository_without_packages(self):
codeartifact_client = mock.MagicMock
codeartifact_client.audit_config = {}
codeartifact_client.iter_packages = lambda: _mock_iter_packages(
codeartifact_client
)
codeartifact_client.repositories = {
"test-repository": Repository(
name="test-repository",
@@ -74,6 +89,10 @@ class Test_codeartifact_packages_external_public_publishing_disabled:
def test_repository_package_public_publishing_origin_internal(self):
codeartifact_client = mock.MagicMock
codeartifact_client.audit_config = {}
codeartifact_client.iter_packages = lambda: _mock_iter_packages(
codeartifact_client
)
package_name = "test-package"
package_namespace = "test-namespace"
repository_arn = f"arn:aws:codebuild:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:repository/test-repository"
@@ -140,6 +159,10 @@ class Test_codeartifact_packages_external_public_publishing_disabled:
def test_repository_package_private_publishing_origin_internal(self):
codeartifact_client = mock.MagicMock
codeartifact_client.audit_config = {}
codeartifact_client.iter_packages = lambda: _mock_iter_packages(
codeartifact_client
)
package_name = "test-package"
package_namespace = "test-namespace"
repository_arn = f"arn:aws:codebuild:{AWS_REGION}:{AWS_ACCOUNT_NUMBER}:repository/test-repository"
@@ -1,3 +1,4 @@
from types import SimpleNamespace
from unittest.mock import patch
import botocore
@@ -6,6 +7,7 @@ from prowler.providers.aws.services.codeartifact.codeartifact_service import (
CodeArtifact,
LatestPackageVersionStatus,
OriginInformationValues,
Repository,
RestrictionValues,
)
from tests.providers.aws.utils import (
@@ -158,6 +160,13 @@ class Test_CodeArtifact_Service:
== AWS_REGION_EU_WEST_1
)
# Packages are fetched lazily via iter_packages()
assert codeartifact.repositories[TEST_REPOSITORY_ARN].packages == []
pairs = list(codeartifact.iter_packages())
assert len(pairs) == 1
assert pairs[0][0].arn == TEST_REPOSITORY_ARN
assert codeartifact.repositories[
f"arn:aws:codebuild:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:repository/test-repository"
].packages
@@ -208,6 +217,96 @@ class Test_CodeArtifact_Service:
== OriginInformationValues.INTERNAL
)
def test_package_limit_bounds_package_version_lookups_to_selected_packages(self):
class FakePaginator:
def paginate(self, **kwargs):
return [
{
"packages": [
{
"format": "pypi",
"package": "first-package",
"originConfiguration": {
"restrictions": {
"publish": "ALLOW",
"upstream": "ALLOW",
}
},
},
{
"format": "pypi",
"package": "second-package",
"originConfiguration": {
"restrictions": {
"publish": "ALLOW",
"upstream": "ALLOW",
}
},
},
]
}
]
class FakeCodeArtifactClient:
def __init__(self):
self.version_calls = []
def get_paginator(self, name):
assert name == "list_packages"
return FakePaginator()
def list_package_versions(self, **kwargs):
self.version_calls.append(kwargs["package"])
return {
"versions": [
{
"version": "1.0.0",
"status": "Published",
"origin": {"originType": "INTERNAL"},
}
]
}
regional_client = FakeCodeArtifactClient()
codeartifact = CodeArtifact.__new__(CodeArtifact)
codeartifact.repositories = {
TEST_REPOSITORY_ARN: Repository(
name="test-repository",
arn=TEST_REPOSITORY_ARN,
domain_name="test-domain",
domain_owner=AWS_ACCOUNT_NUMBER,
region=AWS_REGION_EU_WEST_1,
)
}
codeartifact._packages_listed = set()
codeartifact.package_limit = 1
codeartifact.regional_clients = {AWS_REGION_EU_WEST_1: regional_client}
pairs = list(codeartifact.iter_packages())
assert [package.name for _, package in pairs] == ["first-package"]
assert regional_client.version_calls == ["first-package"]
def test_iter_packages_limits_version_enrichment(self):
codeartifact = CodeArtifact.__new__(CodeArtifact)
codeartifact.package_limit = 2
codeartifact._packages_listed = set()
repository = SimpleNamespace(arn="repo", packages=[])
codeartifact.repositories = {repository.arn: repository}
enriched = []
def iter_repository_packages(repository):
for index in range(3):
enriched.append(index)
yield SimpleNamespace(name=f"package-{index}")
codeartifact._iter_repository_packages = iter_repository_packages
packages = list(codeartifact.iter_packages())
assert [package.name for _, package in packages] == ["package-0", "package-1"]
assert enriched == [0, 1]
def mock_make_api_call_no_namespace(self, operation_name, kwarg):
"""Mock for packages without a namespace to exercise the else branch"""
@@ -287,6 +386,8 @@ class Test_CodeArtifact_Service_No_Namespace:
set_mocked_aws_provider([AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1])
)
assert codeartifact.repositories[TEST_REPOSITORY_ARN].packages == []
assert len(list(codeartifact.iter_packages())) == 1
assert len(codeartifact.repositories[TEST_REPOSITORY_ARN].packages) == 1
package = codeartifact.repositories[TEST_REPOSITORY_ARN].packages[0]
@@ -29,7 +29,8 @@ class Test_ec2_ebs_public_snapshot:
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
audit_config={"max_ebs_snapshots": 0},
)
with (
@@ -70,7 +71,8 @@ class Test_ec2_ebs_public_snapshot:
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
audit_config={"max_ebs_snapshots": 0},
)
with (
@@ -120,7 +122,8 @@ class Test_ec2_ebs_public_snapshot:
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
audit_config={"max_ebs_snapshots": 0},
)
with (
@@ -29,7 +29,8 @@ class Test_ec2_ebs_snapshots_encrypted:
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
audit_config={"max_ebs_snapshots": 0},
)
with (
@@ -63,7 +64,8 @@ class Test_ec2_ebs_snapshots_encrypted:
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
audit_config={"max_ebs_snapshots": 0},
)
with (
@@ -113,7 +115,8 @@ class Test_ec2_ebs_snapshots_encrypted:
from prowler.providers.aws.services.ec2.ec2_service import EC2
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1],
audit_config={"max_ebs_snapshots": 0},
)
with (
@@ -11,7 +11,7 @@ from freezegun import freeze_time
from moto import mock_aws
from prowler.config.config import encoding_format_utf_8
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.ec2.ec2_service import EC2, Snapshot
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
@@ -103,6 +103,44 @@ class Test_EC2_Service:
ec2 = EC2(aws_provider)
assert ec2.audited_account == AWS_ACCOUNT_NUMBER
def test_snapshot_limit_bounds_public_attribute_calls_to_latest_selected(self):
class FakeEC2Client:
def __init__(self):
self.calls = []
def describe_snapshot_attribute(self, **kwargs):
self.calls.append(kwargs["SnapshotId"])
return {"CreateVolumePermissions": []}
regional_client = FakeEC2Client()
ec2 = EC2.__new__(EC2)
ec2.snapshots = [
Snapshot(
id="snap-old",
arn="arn:aws:ec2:eu-west-1:123456789012:snapshot/snap-old",
region=AWS_REGION_EU_WEST_1,
encrypted=True,
start_time=datetime(2024, 1, 1),
volume="vol-old",
),
Snapshot(
id="snap-new",
arn="arn:aws:ec2:eu-west-1:123456789012:snapshot/snap-new",
region=AWS_REGION_EU_WEST_1,
encrypted=True,
start_time=datetime(2024, 1, 2),
volume="vol-new",
),
]
ec2.snapshot_limit = 1
ec2._public_snapshots_determined = set()
ec2.regional_clients = {AWS_REGION_EU_WEST_1: regional_client}
snapshots = list(ec2.iter_snapshots(determine_public=True))
assert [snapshot.id for snapshot in snapshots] == ["snap-new"]
assert regional_client.calls == ["snap-new"]
# Test EC2 Describe Instances
@mock_aws
@freeze_time(MOCK_DATETIME)
@@ -333,9 +371,12 @@ class Test_EC2_Service:
[AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1]
)
ec2 = EC2(aws_provider)
ec2.snapshot_limit = None
assert snapshot_id in str(ec2.snapshots)
for snapshot in ec2.snapshots:
# Public status is hydrated lazily via iter_snapshots(determine_public=True)
snapshots = list(ec2.iter_snapshots(determine_public=True))
assert snapshot_id in str(snapshots)
for snapshot in snapshots:
if snapshot.id == snapshot_id:
assert re.match(r"snap-[0-9a-z]{8}", snapshot.id)
assert (
@@ -346,6 +387,27 @@ class Test_EC2_Service:
assert not snapshot.encrypted
assert snapshot.public
@mock_aws
def test_iter_snapshots_limits_public_status_hydration(self):
ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1)
ec2_resource = resource("ec2", region_name=AWS_REGION_US_EAST_1)
volume_id = ec2_resource.create_volume(
AvailabilityZone="us-east-1a",
Size=80,
VolumeType="gp2",
).id
for _ in range(3):
ec2_client.create_snapshot(VolumeId=volume_id)
aws_provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1], audit_config={"max_ebs_snapshots": 1}
)
ec2 = EC2(aws_provider)
snapshots = list(ec2.iter_snapshots(determine_public=True))
assert len(snapshots) == 1
assert len(ec2._public_snapshots_determined) == 1
# Test EC2 Instance User Data
@mock_aws
def test_get_instance_user_data(self):
@@ -139,7 +139,7 @@ class Test_ECS_Service:
ecs = ECS(aws_provider)
assert ecs.session.__class__.__name__ == "Session"
# Test list ECS task definitions
# Task definitions are now fetched lazily via iter_task_definitions()
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_list_task_definitions(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
@@ -147,6 +147,12 @@ class Test_ECS_Service:
task_arn = "arn:aws:ecs:eu-west-1:123456789012:task-definition/test_cluster_1/test_ecs_task:1"
# Nothing fetched until the generator is consumed
assert ecs.task_definitions == {}
task_definitions = list(ecs.iter_task_definitions())
assert len(task_definitions) == 1
assert len(ecs.task_definitions) == 1
assert ecs.task_definitions[task_arn].name == "test_ecs_task"
assert ecs.task_definitions[task_arn].arn == task_arn
@@ -161,6 +167,8 @@ class Test_ECS_Service:
task_arn = "arn:aws:ecs:eu-west-1:123456789012:task-definition/test_cluster_1/test_ecs_task:1"
list(ecs.iter_task_definitions())
assert len(ecs.task_definitions) == 1
assert ecs.task_definitions[task_arn].name == "test_ecs_task"
assert ecs.task_definitions[task_arn].arn == task_arn
@@ -201,6 +209,127 @@ class Test_ECS_Service:
.readonly_rootfilesystem
)
def test_iter_task_definitions_is_lazy_and_memoized(self):
describe_calls = []
list_calls = []
def counting_make_api_call(self, operation_name, kwarg):
if operation_name == "ListTaskDefinitions":
list_calls.append(kwarg)
return {
"taskDefinitionArns": [
f"arn:aws:ecs:eu-west-1:123456789012:task-definition/fam:{i}"
for i in (3, 2, 1)
]
}
if operation_name == "DescribeTaskDefinition":
describe_calls.append(kwarg["taskDefinition"])
return {
"taskDefinition": {
"containerDefinitions": [],
"networkMode": "bridge",
"pidMode": "",
"tags": [],
}
}
return make_api_call(self, operation_name, kwarg)
with patch(
"botocore.client.BaseClient._make_api_call", new=counting_make_api_call
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
gen = ecs.iter_task_definitions()
first = next(gen)
# Lazy: only the first task definition has been described so far
assert first.revision == "3"
assert list_calls == [{"sort": "DESC"}]
assert len(describe_calls) == 1
# Drain the rest
rest = list(gen)
assert [td.revision for td in rest] == ["2", "1"]
assert len(describe_calls) == 3
# Memoized: a second full pass does not describe anything again
assert len(list(ecs.iter_task_definitions())) == 3
assert len(describe_calls) == 3
def test_iter_task_definitions_limits_described_resources(self):
describe_calls = []
def counting_make_api_call(self, operation_name, kwarg):
if operation_name == "ListTaskDefinitions":
return {
"taskDefinitionArns": [
f"arn:aws:ecs:eu-west-1:123456789012:task-definition/fam:{i}"
for i in (3, 2, 1)
]
}
if operation_name == "DescribeTaskDefinition":
describe_calls.append(kwarg["taskDefinition"])
return {
"taskDefinition": {
"containerDefinitions": [],
"networkMode": "bridge",
"pidMode": "",
"tags": [],
}
}
return make_api_call(self, operation_name, kwarg)
with patch(
"botocore.client.BaseClient._make_api_call", new=counting_make_api_call
):
aws_provider = set_mocked_aws_provider(
[AWS_REGION_EU_WEST_1], audit_config={"max_ecs_task_definitions": 2}
)
ecs = ECS(aws_provider)
task_definitions = list(ecs.iter_task_definitions())
assert [td.revision for td in task_definitions] == ["3", "2"]
assert len(describe_calls) == 2
def test_task_definition_limit_bounds_describe_calls(self):
describe_calls = []
def counting_make_api_call(self, operation_name, kwarg):
if operation_name == "ListTaskDefinitions":
return {
"taskDefinitionArns": [
f"arn:aws:ecs:eu-west-1:123456789012:task-definition/fam:{i}"
for i in (3, 2, 1)
]
}
if operation_name == "DescribeTaskDefinition":
describe_calls.append(kwarg["taskDefinition"])
return {
"taskDefinition": {
"containerDefinitions": [],
"networkMode": "bridge",
"pidMode": "",
"tags": [],
}
}
return mock_make_api_call(self, operation_name, kwarg)
with patch(
"botocore.client.BaseClient._make_api_call", new=counting_make_api_call
):
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
ecs = ECS(aws_provider)
ecs.task_definition_limit = 1
task_definitions = list(ecs.iter_task_definitions())
assert [td.revision for td in task_definitions] == ["3"]
assert describe_calls == [
"arn:aws:ecs:eu-west-1:123456789012:task-definition/fam:3"
]
# Test list ECS clusters
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_list_clusters(self):