feat(awslambda): enrich Function model with inventory fields and add 3 security checks (#10381)

Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
This commit is contained in:
Sandiyo Christan
2026-03-26 15:03:39 +05:30
committed by GitHub
parent 2cf45c72b6
commit 834d1bca49
14 changed files with 818 additions and 3 deletions
+2 -2
View File
@@ -6,10 +6,10 @@ All notable changes to the **Prowler SDK** are documented in this file.
### 🚀 Added
- `apikeys_api_restricted_with_gemini_api` check for GCP provider [(#10280)](https://github.com/prowler-cloud/prowler/pull/10280)
- `gemini_api_disabled` check for GCP provider [(#10280)](https://github.com/prowler-cloud/prowler/pull/10280)
- `apikeys_api_restricted_with_gemini_api` and `gemini_api_disabled`checks for GCP provider [(#10280)](https://github.com/prowler-cloud/prowler/pull/10280)
- `cloudfront_distributions_logging_enabled` detects Standard Logging v2 via CloudWatch Log Delivery [(#10090)](https://github.com/prowler-cloud/prowler/pull/10090)
- `glue_etl_jobs_no_secrets_in_arguments` check for plaintext secrets in AWS Glue ETL job arguments [(#10368)](https://github.com/prowler-cloud/prowler/pull/10368)
- `awslambda_function_no_dead_letter_queue`, `awslambda_function_using_cross_account_layers`, and `awslambda_function_env_vars_not_encrypted_with_cmk` checks for AWS Lambda [(#10381)](https://github.com/prowler-cloud/prowler/pull/10381)
### 🔄 Changed
@@ -0,0 +1,41 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_env_vars_not_encrypted_with_cmk",
"CheckTitle": "Lambda function environment variables are encrypted with a customer-managed KMS key",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "awslambda",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsLambdaFunction",
"ResourceGroup": "serverless",
"Description": "**AWS Lambda function** environment variables are encrypted at rest using a **customer-managed KMS key (CMK)** rather than the default AWS-managed Lambda service key.\n\nThe presence of a `KMSKeyArn` on the function configuration indicates CMK-based encryption is active.",
"Risk": "Without a CMK, environment variables are encrypted with an AWS-managed key, removing **customer control** over rotation, auditing, and revocation.\n\nIf variables contain secrets or connection strings, loss of key control weakens **confidentiality** and can fail compliance requirements (PCI-DSS, HIPAA, FedRAMP) that mandate customer-controlled encryption.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/lambda/latest/dg/configuration-envvars.html#configuration-envvars-encryption",
"https://docs.aws.amazon.com/securityhub/latest/userguide/lambda-controls.html",
"https://docs.aws.amazon.com/kms/latest/developerguide/services-lambda.html"
],
"Remediation": {
"Code": {
"CLI": "aws lambda update-function-configuration --function-name <function-name> --kms-key-arn <cmk-arn>",
"NativeIaC": "```yaml\nAWSTemplateFormatVersion: '2010-09-09'\nResources:\n LambdaFunction:\n Type: AWS::Lambda::Function\n Properties:\n FunctionName: <example_resource_name>\n Role: <example_role_arn>\n Handler: index.handler\n Runtime: python3.12\n Code:\n S3Bucket: <example_code_bucket>\n S3Key: <example_code_key>\n KmsKeyArn: <cmk-arn>\n Environment:\n Variables:\n MY_CONFIG: <value>\n```",
"Other": "1. Create or identify a KMS CMK in the same region as the function\n2. Grant the Lambda execution role `kms:Decrypt` and `kms:GenerateDataKey` on the key\n3. In the Lambda console go to Configuration > Environment variables > Edit\n4. Under Encryption configuration, select your CMK\n5. Save — Lambda re-encrypts all environment variables with the chosen key",
"Terraform": "```hcl\nresource \"aws_kms_key\" \"lambda_env\" {\n description = \"Lambda env var encryption key\"\n enable_key_rotation = true\n deletion_window_in_days = 30\n}\n\nresource \"aws_lambda_function\" \"example\" {\n function_name = \"<example_resource_name>\"\n role = \"<example_role_arn>\"\n handler = \"index.handler\"\n runtime = \"python3.12\"\n filename = \"<example_package.zip>\"\n kms_key_arn = aws_kms_key.lambda_env.arn\n\n environment {\n variables = {\n MY_CONFIG = \"<value>\"\n }\n }\n}\n```"
},
"Recommendation": {
"Text": "Encrypt Lambda environment variables with a customer-managed KMS key to maintain full control over key lifecycle and access.\n- Create a dedicated KMS key per application or per function for blast-radius isolation\n- Enable **automatic key rotation** (`EnableKeyRotation: true`)\n- Grant only the Lambda execution role decrypt access via a key policy condition on `kms:ViaService`\n- Prefer **AWS Secrets Manager** or **SSM Parameter Store (SecureString)** for secrets — environment variables should hold non-secret configuration only",
"Url": "https://hub.prowler.com/check/awslambda_function_env_vars_not_encrypted_with_cmk"
}
},
"Categories": [
"encryption"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,28 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_client
class awslambda_function_env_vars_not_encrypted_with_cmk(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if not function.environment:
report.status = "PASS"
report.status_extended = (
f"Lambda function {function.name} has no environment variables."
)
elif function.kms_key_arn:
report.status = "PASS"
report.status_extended = (
f"Lambda function {function.name} environment variables are "
f"encrypted with KMS key {function.kms_key_arn}."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Lambda function {function.name} has environment variables "
f"but they are not encrypted with a customer-managed KMS key."
)
findings.append(report)
return findings
@@ -0,0 +1,41 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_no_dead_letter_queue",
"CheckTitle": "Lambda function has a Dead Letter Queue configured",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"Software and Configuration Checks/Industry and Regulatory Standards/AWS Foundational Security Best Practices"
],
"ServiceName": "awslambda",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "AwsLambdaFunction",
"ResourceGroup": "serverless",
"Description": "**AWS Lambda functions** have a **Dead Letter Queue (DLQ)** configured — an SQS queue or SNS topic that receives records of failed asynchronous invocations.\n\nWithout a DLQ, failed invocations are silently discarded after exhausting retries.",
"Risk": "Without a DLQ, failed asynchronous invocations are permanently lost. This harms **availability** by hiding processing failures, and weakens **integrity** by making it impossible to replay or audit unprocessed events.\n\nIn security-sensitive pipelines (e.g., audit log processors, alerting functions), silent failure can mask security events entirely.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html#invocation-dlq",
"https://docs.aws.amazon.com/securityhub/latest/userguide/lambda-controls.html#lambda-4",
"https://repost.aws/knowledge-center/lambda-dead-letter-queue"
],
"Remediation": {
"Code": {
"CLI": "aws lambda update-function-configuration --function-name <function-name> --dead-letter-config TargetArn=<sqs-queue-arn-or-sns-topic-arn>",
"NativeIaC": "```yaml\nAWSTemplateFormatVersion: '2010-09-09'\nResources:\n LambdaFunction:\n Type: AWS::Lambda::Function\n Properties:\n FunctionName: <example_resource_name>\n Role: <example_role_arn>\n Handler: index.handler\n Runtime: python3.12\n Code:\n S3Bucket: <example_code_bucket>\n S3Key: <example_code_key>\n DeadLetterConfig:\n TargetArn: <sqs-queue-arn-or-sns-topic-arn>\n```",
"Other": "1. Open the AWS Lambda console and select your function\n2. Go to Configuration > Asynchronous invocation\n3. Under Dead-letter queue service, select SQS or SNS\n4. Choose or create the target queue/topic\n5. Save changes",
"Terraform": "```hcl\nresource \"aws_lambda_function\" \"example\" {\n function_name = \"<example_resource_name>\"\n role = \"<example_role_arn>\"\n handler = \"index.handler\"\n runtime = \"python3.12\"\n filename = \"<example_package.zip>\"\n\n dead_letter_config {\n target_arn = \"<sqs-queue-arn-or-sns-topic-arn>\"\n }\n}\n```"
},
"Recommendation": {
"Text": "Configure a Dead Letter Queue for every Lambda function that handles asynchronous invocations.\n- Prefer an **SQS queue** as the DLQ target for retry and replay capability\n- Ensure the Lambda execution role has `sqs:SendMessage` permission on the DLQ\n- Monitor DLQ depth with a CloudWatch alarm to alert on processing failures",
"Url": "https://hub.prowler.com/check/awslambda_function_no_dead_letter_queue"
}
},
"Categories": [
"resilience"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,17 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_client
class awslambda_function_no_dead_letter_queue(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
if function.dead_letter_config:
report.status = "PASS"
report.status_extended = f"Lambda function {function.name} has a Dead Letter Queue configured at {function.dead_letter_config.target_arn}."
else:
report.status = "FAIL"
report.status_extended = f"Lambda function {function.name} does not have a Dead Letter Queue configured."
findings.append(report)
return findings
@@ -0,0 +1,41 @@
{
"Provider": "aws",
"CheckID": "awslambda_function_using_cross_account_layers",
"CheckTitle": "Lambda function does not use cross-account layers",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices",
"TTPs/Initial Access"
],
"ServiceName": "awslambda",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "AwsLambdaFunction",
"ResourceGroup": "serverless",
"Description": "**AWS Lambda functions** use only **layers published within the same AWS account**, rather than layers owned by external accounts.\n\nA Lambda layer bundles shared code or dependencies that are injected into the function execution environment at runtime.",
"Risk": "A layer from an external account is a **supply chain dependency outside your control**. If that account is compromised or the layer is updated maliciously, every consumer function executes attacker code with its IAM role — a direct **privilege escalation** and **lateral movement** path across all functions using that layer.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://docs.aws.amazon.com/lambda/latest/dg/configuration-layers.html",
"https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html",
"https://unit42.paloaltonetworks.com/lambda-layers-supply-chain/"
],
"Remediation": {
"Code": {
"CLI": "# Copy the cross-account layer version into your own account first:\naws lambda publish-layer-version --layer-name <layer-name> --zip-file fileb://<layer.zip>\n# Then update the function to use your own layer ARN:\naws lambda update-function-configuration --function-name <function-name> --layers <your-account-layer-arn>",
"NativeIaC": "```yaml\nAWSTemplateFormatVersion: '2010-09-09'\nResources:\n OwnedLayer:\n Type: AWS::Lambda::LayerVersion\n Properties:\n LayerName: <layer-name>\n Content:\n S3Bucket: <bucket>\n S3Key: <key>\n LambdaFunction:\n Type: AWS::Lambda::Function\n Properties:\n FunctionName: <example_resource_name>\n Role: <example_role_arn>\n Handler: index.handler\n Runtime: python3.12\n Code:\n S3Bucket: <example_code_bucket>\n S3Key: <example_code_key>\n Layers:\n - !Ref OwnedLayer\n```",
"Other": "1. Download the cross-account layer ZIP\n2. Publish the layer in your own account: Lambda > Layers > Create layer\n3. Update the function configuration to reference your layer ARN\n4. Remove the cross-account layer ARN from the function",
"Terraform": "```hcl\nresource \"aws_lambda_layer_version\" \"example\" {\n layer_name = \"<layer-name>\"\n filename = \"<layer.zip>\"\n}\n\nresource \"aws_lambda_function\" \"example\" {\n function_name = \"<example_resource_name>\"\n role = \"<example_role_arn>\"\n handler = \"index.handler\"\n runtime = \"python3.12\"\n filename = \"<example_package.zip>\"\n\n layers = [aws_lambda_layer_version.example.arn]\n}\n```"
},
"Recommendation": {
"Text": "Eliminate cross-account layer dependencies by hosting all layers in your own AWS account.\n- Audit all layers with `aws lambda get-function-configuration` and inspect `Layers[].Arn`\n- Extract the account ID from the ARN (field 5 in colon-split) and compare against your account\n- For approved vendor layers, pin to a specific immutable version ARN and review on each update\n- Enforce this via SCP: deny `lambda:UpdateFunctionConfiguration` when `lambda:Layer` ARN does not match your account ID",
"Url": "https://hub.prowler.com/check/awslambda_function_using_cross_account_layers"
}
},
"Categories": [
"software-supply-chain"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
@@ -0,0 +1,34 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.awslambda.awslambda_client import awslambda_client
class awslambda_function_using_cross_account_layers(Check):
def execute(self):
findings = []
for function in awslambda_client.functions.values():
report = Check_Report_AWS(metadata=self.metadata(), resource=function)
cross_account_layers = [
layer
for layer in function.layers
if layer.account_id != awslambda_client.audited_account
]
if not function.layers:
report.status = "PASS"
report.status_extended = (
f"Lambda function {function.name} does not use any layers."
)
elif cross_account_layers:
report.status = "FAIL"
layer_arns = ", ".join(layer.arn for layer in cross_account_layers)
report.status_extended = (
f"Lambda function {function.name} uses cross-account "
f"layer(s): {layer_arns}."
)
else:
report.status = "PASS"
report.status_extended = (
f"Lambda function {function.name} only uses layers "
f"from the same account ({awslambda_client.audited_account})."
)
findings.append(report)
return findings
@@ -23,6 +23,7 @@ class Lambda(AWSService):
self._list_tags_for_resource()
self.__threading_call__(self._get_policy)
self.__threading_call__(self._get_function_url_config)
self.__threading_call__(self._list_event_source_mappings)
def _list_functions(self, regional_client):
logger.info("Lambda - Listing Functions...")
@@ -54,6 +55,19 @@ class Lambda(AWSService):
"Variables"
)
self.functions[lambda_arn].environment = lambda_environment
if "KMSKeyArn" in function:
self.functions[lambda_arn].kms_key_arn = function[
"KMSKeyArn"
]
if "Layers" in function:
self.functions[lambda_arn].layers = [
Layer(arn=layer["Arn"]) for layer in function["Layers"]
]
dlq_arn = function.get("DeadLetterConfig", {}).get("TargetArn")
if dlq_arn:
self.functions[lambda_arn].dead_letter_config = (
DeadLetterConfig(target_arn=dlq_arn)
)
except Exception as error:
logger.error(
@@ -62,6 +76,33 @@ class Lambda(AWSService):
f" {error}"
)
def _list_event_source_mappings(self, regional_client):
logger.info("Lambda - Listing Event Source Mappings...")
try:
paginator = regional_client.get_paginator("list_event_source_mappings")
for page in paginator.paginate():
for mapping in page.get("EventSourceMappings", []):
function_arn = mapping.get("FunctionArn", "")
# Normalise to unqualified ARN (strip :qualifier suffix if present)
base_arn = ":".join(function_arn.split(":")[:7])
if base_arn not in self.functions:
continue
self.functions[base_arn].event_source_mappings.append(
EventSourceMapping(
uuid=mapping["UUID"],
event_source_arn=mapping.get("EventSourceArn", ""),
state=mapping.get("State", ""),
batch_size=mapping.get("BatchSize"),
starting_position=mapping.get("StartingPosition"),
)
)
except Exception as error:
logger.error(
f"{regional_client.region} --"
f" {error.__class__.__name__}[{error.__traceback__.tb_lineno}]:"
f" {error}"
)
def _get_function_code(self):
logger.info("Lambda - Getting Function Code...")
# Use a thread pool handle the queueing and execution of the _fetch_function_code tasks, up to max_workers tasks concurrently.
@@ -192,16 +233,42 @@ class URLConfig(BaseModel):
cors_config: URLConfigCORS
class Layer(BaseModel):
arn: str
@property
def account_id(self) -> str:
"""Extract the account ID from the layer ARN."""
parts = self.arn.split(":")
return parts[4] if len(parts) >= 5 else ""
class DeadLetterConfig(BaseModel):
target_arn: str
class EventSourceMapping(BaseModel):
uuid: str
event_source_arn: str
state: str
batch_size: Optional[int] = None
starting_position: Optional[str] = None
class Function(BaseModel):
name: str
arn: str
security_groups: list
runtime: Optional[str] = None
environment: dict = None
environment: Optional[dict] = None
region: str
policy: dict = {}
code: LambdaCode = None
url_config: URLConfig = None
vpc_id: Optional[str] = None
subnet_ids: Optional[set] = None
kms_key_arn: Optional[str] = None
layers: list[Layer] = []
dead_letter_config: Optional[DeadLetterConfig] = None
event_source_mappings: list[EventSourceMapping] = []
tags: Optional[list] = []
@@ -0,0 +1,197 @@
from json import dumps
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
ROLE_POLICY = dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
class Test_awslambda_function_env_vars_not_encrypted_with_cmk:
def test_no_functions(self):
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk import (
awslambda_function_env_vars_not_encrypted_with_cmk,
)
check = awslambda_function_env_vars_not_encrypted_with_cmk()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_function_no_env_vars(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=ROLE_POLICY,
)["Role"]["Arn"]
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-fn-no-env"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk import (
awslambda_function_env_vars_not_encrypted_with_cmk,
)
check = awslambda_function_env_vars_not_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "no environment variables" in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
def test_function_env_vars_no_kms(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=ROLE_POLICY,
)["Role"]["Arn"]
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-fn-env-no-kms"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
Environment={"Variables": {"DB_HOST": "localhost"}},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk import (
awslambda_function_env_vars_not_encrypted_with_cmk,
)
check = awslambda_function_env_vars_not_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert "customer-managed KMS key" in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
def test_function_env_vars_with_cmk(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=ROLE_POLICY,
)["Role"]["Arn"]
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-fn-env-with-kms"
key_arn = (
f"arn:aws:kms:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:key/test-key-id"
)
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
Environment={"Variables": {"DB_HOST": "localhost"}},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
lambda_service = Lambda(aws_provider)
# moto does not return KMSKeyArn in list_functions; inject it to test PASS branch.
lambda_service.functions[function_arn].kms_key_arn = key_arn
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_client",
new=lambda_service,
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_env_vars_not_encrypted_with_cmk.awslambda_function_env_vars_not_encrypted_with_cmk import (
awslambda_function_env_vars_not_encrypted_with_cmk,
)
check = awslambda_function_env_vars_not_encrypted_with_cmk()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert key_arn in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].region == AWS_REGION_EU_WEST_1
@@ -0,0 +1,149 @@
from json import dumps
from unittest import mock
from boto3 import client
from moto import mock_aws
from prowler.providers.aws.services.awslambda.awslambda_service import DeadLetterConfig
from tests.providers.aws.utils import AWS_REGION_EU_WEST_1, set_mocked_aws_provider
ROLE_POLICY = dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
class Test_awslambda_function_no_dead_letter_queue:
def test_no_functions(self):
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_no_dead_letter_queue.awslambda_function_no_dead_letter_queue.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_no_dead_letter_queue.awslambda_function_no_dead_letter_queue import (
awslambda_function_no_dead_letter_queue,
)
check = awslambda_function_no_dead_letter_queue()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_function_without_dlq(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=ROLE_POLICY,
)["Role"]["Arn"]
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-function-no-dlq"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_no_dead_letter_queue.awslambda_function_no_dead_letter_queue.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_no_dead_letter_queue.awslambda_function_no_dead_letter_queue import (
awslambda_function_no_dead_letter_queue,
)
check = awslambda_function_no_dead_letter_queue()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert function_name in result[0].status_extended
assert "Dead Letter Queue" in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].region == AWS_REGION_EU_WEST_1
@mock_aws
def test_function_with_sqs_dlq(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=ROLE_POLICY,
)["Role"]["Arn"]
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-function-with-dlq"
queue_arn = f"arn:aws:sqs:{AWS_REGION_EU_WEST_1}:123456789012:test-dlq"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
lambda_service = Lambda(aws_provider)
# moto does not return DeadLetterConfig in list_functions;
# set it directly to test the PASS branch of the check logic.
lambda_service.functions[function_arn].dead_letter_config = DeadLetterConfig(
target_arn=queue_arn
)
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_no_dead_letter_queue.awslambda_function_no_dead_letter_queue.awslambda_client",
new=lambda_service,
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_no_dead_letter_queue.awslambda_function_no_dead_letter_queue import (
awslambda_function_no_dead_letter_queue,
)
check = awslambda_function_no_dead_letter_queue()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert function_name in result[0].status_extended
assert queue_arn in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].region == AWS_REGION_EU_WEST_1
@@ -0,0 +1,200 @@
from json import dumps
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
ROLE_POLICY = dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
)
EXTERNAL_ACCOUNT = "999999999999"
def _create_role(iam_client):
return iam_client.create_role(
RoleName="test-role",
AssumeRolePolicyDocument=ROLE_POLICY,
)["Role"]["Arn"]
class Test_awslambda_function_using_cross_account_layers:
def test_no_functions(self):
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers import (
awslambda_function_using_cross_account_layers,
)
check = awslambda_function_using_cross_account_layers()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_function_no_layers(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = _create_role(iam_client)
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-fn-no-layers"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import Lambda
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers.awslambda_client",
new=Lambda(aws_provider),
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers import (
awslambda_function_using_cross_account_layers,
)
check = awslambda_function_using_cross_account_layers()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert "does not use any layers" in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
@mock_aws
def test_function_own_account_layer(self):
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = _create_role(iam_client)
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-fn-own-layer"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import (
Lambda,
Layer,
)
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
lambda_service = Lambda(aws_provider)
# moto does not return Layers in list_functions; inject an own-account layer.
own_layer_arn = f"arn:aws:lambda:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:layer:my-layer:1"
lambda_service.functions[function_arn].layers = [Layer(arn=own_layer_arn)]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers.awslambda_client",
new=lambda_service,
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers import (
awslambda_function_using_cross_account_layers,
)
check = awslambda_function_using_cross_account_layers()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert AWS_ACCOUNT_NUMBER in result[0].status_extended
@mock_aws
def test_function_cross_account_layer(self):
"""Function uses a layer from an external account — FAIL."""
iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1)
role_arn = _create_role(iam_client)
lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1)
function_name = "test-fn-cross-layer"
function_arn = lambda_client.create_function(
FunctionName=function_name,
Runtime="python3.11",
Role=role_arn,
Handler="index.handler",
Code={"ZipFile": b"file not used"},
)["FunctionArn"]
from prowler.providers.aws.services.awslambda.awslambda_service import (
Lambda,
Layer,
)
cross_layer_arn = f"arn:aws:lambda:{AWS_REGION_EU_WEST_1}:{EXTERNAL_ACCOUNT}:layer:ext-layer:1"
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
lambda_service = Lambda(aws_provider)
# moto does not return Layers; inject a cross-account layer to test FAIL branch.
lambda_service.functions[function_arn].layers = [Layer(arn=cross_layer_arn)]
with (
mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
),
mock.patch(
"prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers.awslambda_client",
new=lambda_service,
),
):
from prowler.providers.aws.services.awslambda.awslambda_function_using_cross_account_layers.awslambda_function_using_cross_account_layers import (
awslambda_function_using_cross_account_layers,
)
check = awslambda_function_using_cross_account_layers()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert cross_layer_arn in result[0].status_extended
assert result[0].resource_id == function_name
assert result[0].resource_arn == function_arn
assert result[0].region == AWS_REGION_EU_WEST_1