feat(aws): add fixers for threat detection checks (#7085)

This commit is contained in:
Sergio Garcia
2025-03-03 14:20:23 +01:00
committed by GitHub
parent bbeef0299f
commit 48c2c8567c
11 changed files with 232 additions and 6 deletions

View File

@@ -308,6 +308,8 @@ aws:
"ListFoundationModelAgreementOffers", # Lists available agreement offers for accessing foundation models (List).
"ListFoundationModels", # Lists the available foundation models in Bedrock (List).
"ListProvisionedModelThroughputs", # Lists the provisioned throughput for previously created models (List).
"SearchAgreements", # Searches for agreements based on specified criteria (List).
"AcceptAgreementRequest", # Accepts a request for an agreement to use a foundation model (Write).
]
# AWS RDS Configuration

View File

@@ -358,6 +358,15 @@ def run_fixer(check_findings: list) -> int:
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
elif "resource_arn" in fixer.__code__.co_varnames:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_arn}... "
)
if fixer(resource_arn=finding.resource_arn):
fixed_findings += 1
print(f"\t{Fore.GREEN}DONE{Style.RESET_ALL}")
else:
print(f"\t{Fore.RED}ERROR{Style.RESET_ALL}")
else:
print(
f"\t{orange_color}FIXING{Style.RESET_ALL} Resource {finding.resource_id}... "

View File

@@ -166,7 +166,7 @@ class cloudtrail_threat_detection_enumeration(Check):
report.resource_id = aws_identity_arn.split("/")[-1]
report.resource_arn = aws_identity_arn
report.status = "FAIL"
report.status_extended = f"Potential enumeration attack detected from AWS {aws_identity_type} {aws_identity_arn.split('/')[-1]} with an threshold of {identity_threshold}."
report.status_extended = f"Potential enumeration attack detected from AWS {aws_identity_type} {aws_identity_arn.split('/')[-1]} with a threshold of {identity_threshold}."
findings.append(report)
if not found_potential_enumeration:
report = Check_Report_AWS(

View File

@@ -0,0 +1,71 @@
import json
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_arn: str) -> bool:
"""
Restricts access to a compromised AWS entity by attaching a deny-all inline policy to the user or role.
Requires the following permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PutUserPolicy",
"iam:PutRolePolicy",
],
"Resource": "*"
}
]
}
Args:
resource_arn (str): The ARN of the compromised AWS entity (IAM User or Role).
Returns:
bool: True if the fix was applied successfully, False otherwise.
"""
try:
if ":user/" in resource_arn:
entity_type = "user"
entity_name = resource_arn.split("/")[-1]
elif ":role/" in resource_arn:
entity_type = "role"
entity_name = resource_arn.split("/")[-1]
else:
return False
deny_policy = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "*", "Resource": "*"}],
}
policy_name = "DenyAllAccess"
if entity_type == "user":
iam_client.client.put_user_policy(
UserName=entity_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy),
)
logger.info(f"Applied Deny policy to user {entity_name}")
elif entity_type == "role":
iam_client.client.put_role_policy(
RoleName=entity_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy),
)
logger.info(f"Applied Deny policy to role {entity_name}")
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -18,6 +18,8 @@ default_threat_detection_llm_jacking_actions = [
"ListFoundationModelAgreementOffers",
"ListFoundationModels",
"ListProvisionedModelThroughputs",
"SearchAgreements",
"AcceptAgreementRequest",
]
@@ -88,7 +90,7 @@ class cloudtrail_threat_detection_llm_jacking(Check):
report.resource_id = aws_identity_arn.split("/")[-1]
report.resource_arn = aws_identity_arn
report.status = "FAIL"
report.status_extended = f"Potential LLM Jacking attack detected from AWS {aws_identity_type} {aws_identity_arn.split('/')[-1]} with an threshold of {identity_threshold}."
report.status_extended = f"Potential LLM Jacking attack detected from AWS {aws_identity_type} {aws_identity_arn.split('/')[-1]} with a threshold of {identity_threshold}."
findings.append(report)
if not found_potential_llm_jacking:
report = Check_Report_AWS(

View File

@@ -0,0 +1,71 @@
import json
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_arn: str) -> bool:
"""
Restricts access to a compromised AWS entity by attaching a deny-all inline policy to the user or role.
Requires the following permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PutUserPolicy",
"iam:PutRolePolicy",
],
"Resource": "*"
}
]
}
Args:
resource_arn (str): The ARN of the compromised AWS entity (IAM User or Role).
Returns:
bool: True if the fix was applied successfully, False otherwise.
"""
try:
if ":user/" in resource_arn:
entity_type = "user"
entity_name = resource_arn.split("/")[-1]
elif ":role/" in resource_arn:
entity_type = "role"
entity_name = resource_arn.split("/")[-1]
else:
return False
deny_policy = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "*", "Resource": "*"}],
}
policy_name = "DenyAllAccess"
if entity_type == "user":
iam_client.client.put_user_policy(
UserName=entity_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy),
)
logger.info(f"Applied Deny policy to user {entity_name}")
elif entity_type == "role":
iam_client.client.put_role_policy(
RoleName=entity_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy),
)
logger.info(f"Applied Deny policy to role {entity_name}")
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -129,7 +129,7 @@ class cloudtrail_threat_detection_privilege_escalation(Check):
report.resource_id = aws_identity_arn.split("/")[-1]
report.resource_arn = aws_identity_arn
report.status = "FAIL"
report.status_extended = f"Potential privilege escalation attack detected from AWS {aws_identity_type} {aws_identity_arn.split('/')[-1]} with an threshold of {identity_threshold}."
report.status_extended = f"Potential privilege escalation attack detected from AWS {aws_identity_type} {aws_identity_arn.split('/')[-1]} with a threshold of {identity_threshold}."
findings.append(report)
if not found_potential_privilege_escalation:
report = Check_Report_AWS(

View File

@@ -0,0 +1,71 @@
import json
from prowler.lib.logger import logger
from prowler.providers.aws.services.iam.iam_client import iam_client
def fixer(resource_arn: str) -> bool:
"""
Restricts access to a compromised AWS entity by attaching a deny-all inline policy to the user or role.
Requires the following permissions:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iam:PutUserPolicy",
"iam:PutRolePolicy",
],
"Resource": "*"
}
]
}
Args:
resource_arn (str): The ARN of the compromised AWS entity (IAM User or Role).
Returns:
bool: True if the fix was applied successfully, False otherwise.
"""
try:
if ":user/" in resource_arn:
entity_type = "user"
entity_name = resource_arn.split("/")[-1]
elif ":role/" in resource_arn:
entity_type = "role"
entity_name = resource_arn.split("/")[-1]
else:
return False
deny_policy = {
"Version": "2012-10-17",
"Statement": [{"Effect": "Deny", "Action": "*", "Resource": "*"}],
}
policy_name = "DenyAllAccess"
if entity_type == "user":
iam_client.client.put_user_policy(
UserName=entity_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy),
)
logger.info(f"Applied Deny policy to user {entity_name}")
elif entity_type == "role":
iam_client.client.put_role_policy(
RoleName=entity_name,
PolicyName=policy_name,
PolicyDocument=json.dumps(deny_policy),
)
logger.info(f"Applied Deny policy to role {entity_name}")
return True
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return False

View File

@@ -176,7 +176,7 @@ class Test_cloudtrail_threat_detection_enumeration:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential enumeration attack detected from AWS IAMUser Attacker with an threshold of 1.0."
== "Potential enumeration attack detected from AWS IAMUser Attacker with a threshold of 1.0."
)
assert result[0].resource_id == "Attacker"
assert result[0].region == AWS_REGION_US_EAST_1

View File

@@ -173,7 +173,7 @@ class Test_cloudtrail_threat_detection_llm_jacking:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential LLM Jacking attack detected from AWS IAMUser Attacker with an threshold of 1.0."
== "Potential LLM Jacking attack detected from AWS IAMUser Attacker with a threshold of 1.0."
)
assert result[0].resource_id == "Attacker"
assert result[0].region == AWS_REGION_US_EAST_1

View File

@@ -175,7 +175,7 @@ class Test_cloudtrail_threat_detection_privilege_escalation:
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== "Potential privilege escalation attack detected from AWS IAMUser Attacker with an threshold of 1.0."
== "Potential privilege escalation attack detected from AWS IAMUser Attacker with a threshold of 1.0."
)
assert result[0].resource_id == "Attacker"
assert result[0].region == AWS_REGION_US_EAST_1