Compare commits

...

16 Commits

Author SHA1 Message Date
Prowler Bot a70f0652b6 fix(ui): hide line numbers in CLI command remediation block (#11061)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-06 15:06:00 +01:00
Prowler Bot fae4fbc0ae fix: PR number in changelog entry for #10529 (#11058)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-06 11:56:21 +01:00
Prowler Bot bbe45ed708 fix(oci): scan identity in known valid region (#11056)
Co-authored-by: rchotacode <32524742+rchotacode@users.noreply.github.com>
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-06 11:44:26 +01:00
Prowler Bot 6b6d22bb31 chore(api): Bump version to v1.26.3 (#10996)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:45:57 +02:00
Prowler Bot a3b4f94368 chore(sdk): Bump version to v5.25.3 (#10994)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:45:35 +02:00
Prowler Bot 178cdb1b57 chore(ui): Bump version to v5.25.3 (#10995)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-05 10:44:56 +02:00
Prowler Bot d58343e11f chore(changelog): prepare for v5.25.2 (#10992)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-05 08:51:50 +02:00
Prowler Bot 952ca2d505 fix(sdk): cover CNAME → dangling S3 in route53 takeover check (#10990)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-05 08:47:00 +02:00
Prowler Bot 9de9a26821 fix(k8s): match RBAC rules by apiGroup, not just core (#10988)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
2026-05-04 19:59:31 +02:00
Prowler Bot e4da9741b2 fix(timeline): Return a compact actor name from CloudTrail events (#10987)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-04 19:41:36 +02:00
Prowler Bot 35e867e4f5 fix(k8s): deduplicate RBAC findings by unique subject (#10984)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-04 18:22:56 +02:00
Prowler Bot 0719f69828 fix(ui): compliance card layout polish (#10977)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-04 15:53:58 +01:00
Prowler Bot b7ee0ce9b1 fix(ui): clean up findings expanded resource row layout (#10973)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-04 14:59:06 +01:00
Prowler Bot 53f6cb52cb chore(ui): Bump version to v5.25.2 (#10941)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:39 +02:00
Prowler Bot 429c5f6789 chore(sdk): Bump version to v5.25.2 (#10943)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:30 +02:00
Prowler Bot 592bc4a944 chore(api): Bump version to v1.26.2 (#10942)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-04-29 18:44:19 +02:00
32 changed files with 1075 additions and 365 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.25.1
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.25.3
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
+1 -1
View File
@@ -50,7 +50,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.26.1"
version = "1.26.3"
[project.scripts]
celery = "src.backend.config.settings.celery"
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.26.1
version: 1.26.3
description: |-
Prowler API specification.
+1 -1
View File
@@ -422,7 +422,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.26.1"
spectacular_settings.VERSION = "1.26.3"
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)
+19
View File
@@ -2,6 +2,25 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.25.3] (Prowler UNRELEASED)
### 🐞 Fixed
- Oracle cloud identity scans now scan known or supplied regions to better support non ashburn tenancies [(#10529)](https://github.com/prowler-cloud/prowler/pull/10529)
---
## [5.25.2] (Prowler v5.25.2)
### 🐞 Fixed
- `route53_dangling_ip_subdomain_takeover` now also flags `CNAME` records pointing to S3 website endpoints whose buckets are missing from the account [(#10920)](https://github.com/prowler-cloud/prowler/pull/10920)
- Duplicate Kubernetes RBAC findings when the same User or Group subject appeared in multiple ClusterRoleBindings [(#10242)](https://github.com/prowler-cloud/prowler/pull/10242)
- Match K8s RBAC rules by `apiGroup` [(#10969)](https://github.com/prowler-cloud/prowler/pull/10969)
- Return a compact actor name from CloudTrail `userIdentity` events [(#10986)](https://github.com/prowler-cloud/prowler/pull/10986)
---
## [5.25.1] (Prowler v5.25.1)
### 🐞 Fixed
+1 -1
View File
@@ -48,7 +48,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.25.1"
prowler_version = "5.25.3"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
@@ -221,27 +221,12 @@ class CloudTrailTimeline(TimelineService):
@staticmethod
def _extract_actor(user_identity: Dict[str, Any]) -> str:
"""Extract a human-readable actor name from CloudTrail userIdentity."""
# Try ARN first - most reliable
"""Return a compact actor name from CloudTrail userIdentity.
For ARNs, returns the resource portion (everything after the last
`:`) — e.g. `user/alice`, `assumed-role/MyRole/session-name`,
`root`. The full ARN is preserved separately in `actor_uid`.
"""
if arn := user_identity.get("arn"):
if "/" in arn:
parts = arn.split("/")
# For assumed-role, return the role name (second-to-last part)
if "assumed-role" in arn and len(parts) >= 2:
return parts[-2]
return parts[-1]
return arn.split(":")[-1]
# Fall back to userName
if username := user_identity.get("userName"):
return username
# Fall back to principalId
if principal_id := user_identity.get("principalId"):
return principal_id
# For service-invoked actions
if invoking_service := user_identity.get("invokedBy"):
return invoking_service
return "Unknown"
return arn.rsplit(":", 1)[-1]
return user_identity.get("invokedBy") or "Unknown"
@@ -1,7 +1,7 @@
{
"Provider": "aws",
"CheckID": "route53_dangling_ip_subdomain_takeover",
"CheckTitle": "Route53 A record does not point to a dangling IP address",
"CheckTitle": "Route53 record does not point to a dangling AWS resource",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices/Network Reachability",
"TTPs/Initial Access",
@@ -13,13 +13,14 @@
"Severity": "high",
"ResourceType": "AwsRoute53HostedZone",
"ResourceGroup": "network",
"Description": "**Route 53 `A` records** (non-alias) that use literal IPs are evaluated for **public AWS addresses** not currently assigned to resources in the account. Entries that match AWS ranges yet lack ownership are identified as potential **dangling IP targets**.",
"Risk": "**Dangling DNS `A` records** pointing to released AWS IPs enable **subdomain takeover**. An attacker who later obtains that IP can:\n- Redirect or alter content (integrity)\n- Capture credentials/cookies (confidentiality)\n- Disrupt or impersonate services (availability)",
"Description": "**Route 53 records** are evaluated for two **subdomain takeover** vectors: (1) non-alias **`A` records** using literal IPs in **public AWS ranges** that are not assigned to resources in the account (released EIPs/ENI public IPs); and (2) non-alias **`CNAME` records** targeting an **S3 website endpoint** (`*.s3-website[.-]<region>.amazonaws.com`) whose bucket no longer exists in the account.",
"Risk": "**Dangling DNS records** pointing to released AWS resources enable **subdomain takeover**. An attacker who later claims the IP — or registers an S3 bucket with the same name in any AWS account — can:\n- Redirect or alter content (integrity)\n- Capture credentials/cookies (confidentiality)\n- Disrupt or impersonate services (availability)",
"RelatedUrl": "",
"AdditionalURLs": [
"https://support.icompaas.com/support/solutions/articles/62000233461-ensure-route53-records-contains-dangling-ips-",
"https://www.trendmicro.com/trendaivisiononecloudriskmanagement/knowledge-base/aws/Route53/dangling-dns-records.html",
"https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resource-record-sets-deleting.html"
"https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/resource-record-sets-deleting.html",
"https://docs.aws.amazon.com/AmazonS3/latest/userguide/WebsiteEndpoints.html"
],
"Remediation": {
"Code": {
@@ -29,7 +30,7 @@
"Terraform": "```hcl\n# Terraform: convert A record to Alias to avoid dangling public IPs\nresource \"aws_route53_record\" \"<example_resource_name>\" {\n zone_id = \"<example_resource_id>\"\n name = \"<example_resource_name>\"\n type = \"A\"\n\n alias { # CRITICAL: Alias to AWS resource (no direct IP)\n name = \"<ALIAS_TARGET_DNS_NAME>\" # e.g., dualstack.<alb>.amazonaws.com\n zone_id = \"<ALIAS_TARGET_HOSTED_ZONE_ID>\"\n evaluate_target_health = false\n }\n}\n```"
},
"Recommendation": {
"Text": "Remove or update any record that points to an unassigned IP. Avoid hard-coding AWS public IPs in `A` records; use **aliases/CNAMEs** to managed endpoints. Enforce **asset lifecycle** decommissioning, routine DNS-asset reconciliation, and **change control** with monitoring to prevent and detect drift.",
"Text": "Remove or update any record that points to an unowned AWS resource: unassigned public IPs in `A` records and S3 website endpoints in `CNAME` records whose bucket has been deleted. Avoid hard-coding AWS public IPs in `A` records; prefer **aliases** to managed endpoints (ALB, CloudFront, S3) and delete CNAMEs as soon as the backing bucket is removed. Enforce **asset lifecycle** decommissioning, routine DNS-asset reconciliation, and **change control** with monitoring to prevent and detect drift.",
"Url": "https://hub.prowler.com/check/route53_dangling_ip_subdomain_takeover"
}
},
@@ -1,3 +1,4 @@
import re
from ipaddress import ip_address
import awsipranges
@@ -6,6 +7,14 @@ from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.lib.utils.utils import validate_ip_address
from prowler.providers.aws.services.ec2.ec2_client import ec2_client
from prowler.providers.aws.services.route53.route53_client import route53_client
from prowler.providers.aws.services.s3.s3_client import s3_client
# S3 website endpoint formats:
# <bucket>.s3-website-<region>.amazonaws.com (legacy, dash)
# <bucket>.s3-website.<region>.amazonaws.com (newer, dot)
S3_WEBSITE_ENDPOINT_REGEX = re.compile(
r"^(?P<bucket>[^.]+(?:\.[^.]+)*)\.s3-website[.-](?P<region>[a-z0-9-]+)\.amazonaws\.com\.?$"
)
class route53_dangling_ip_subdomain_takeover(Check):
@@ -24,11 +33,14 @@ class route53_dangling_ip_subdomain_takeover(Check):
if ni.association and ni.association.get("PublicIp"):
public_ips.append(ni.association.get("PublicIp"))
owned_bucket_names = {bucket.name for bucket in s3_client.buckets.values()}
for record_set in route53_client.record_sets:
# Check only A records and avoid aliases (only need to check IPs not AWS Resources)
hosted_zone = route53_client.hosted_zones[record_set.hosted_zone_id]
# A records: dangling-IP path (released EIPs / unowned AWS IPs)
if record_set.type == "A" and not record_set.is_alias:
for record in record_set.records:
# Check if record is an IP Address
if validate_ip_address(record):
report = Check_Report_AWS(
metadata=self.metadata(), resource=record_set
@@ -36,25 +48,45 @@ class route53_dangling_ip_subdomain_takeover(Check):
report.resource_id = (
f"{record_set.hosted_zone_id}/{record_set.name}/{record}"
)
report.resource_arn = route53_client.hosted_zones[
record_set.hosted_zone_id
].arn
report.resource_tags = route53_client.hosted_zones[
record_set.hosted_zone_id
].tags
report.resource_arn = hosted_zone.arn
report.resource_tags = hosted_zone.tags
report.status = "PASS"
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {route53_client.hosted_zones[record_set.hosted_zone_id].name} is not a dangling IP."
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {hosted_zone.name} is not a dangling IP."
# If Public IP check if it is in the AWS Account
if (
not ip_address(record).is_private
and record not in public_ips
):
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {route53_client.hosted_zones[record_set.hosted_zone_id].name} does not belong to AWS and it is not a dangling IP."
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {hosted_zone.name} does not belong to AWS and it is not a dangling IP."
# Check if potential dangling IP is within AWS Ranges
aws_ip_ranges = awsipranges.get_ranges()
if aws_ip_ranges.get(record):
report.status = "FAIL"
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {route53_client.hosted_zones[record_set.hosted_zone_id].name} is a dangling IP which can lead to a subdomain takeover attack."
report.status_extended = f"Route53 record {record} (name: {record_set.name}) in Hosted Zone {hosted_zone.name} is a dangling IP which can lead to a subdomain takeover attack."
findings.append(report)
# CNAME records: dangling S3 website endpoint
# (deleted bucket whose name can be re-registered by anyone)
elif record_set.type == "CNAME" and not record_set.is_alias:
for record in record_set.records:
match = S3_WEBSITE_ENDPOINT_REGEX.match(record.lower())
if not match:
continue
bucket_name = match.group("bucket")
report = Check_Report_AWS(
metadata=self.metadata(), resource=record_set
)
report.resource_id = (
f"{record_set.hosted_zone_id}/{record_set.name}/{record}"
)
report.resource_arn = hosted_zone.arn
report.resource_tags = hosted_zone.tags
if bucket_name in owned_bucket_names:
report.status = "PASS"
report.status_extended = f"Route53 CNAME {record_set.name} in Hosted Zone {hosted_zone.name} points to S3 website endpoint of bucket {bucket_name} which exists in the account."
else:
report.status = "FAIL"
report.status_extended = f"Route53 CNAME {record_set.name} in Hosted Zone {hosted_zone.name} points to S3 website endpoint of bucket {bucket_name} which does not exist in the account and can lead to a subdomain takeover attack."
findings.append(report)
return findings
@@ -1,36 +1,37 @@
def is_rule_allowing_permissions(rules, resources, verbs):
def is_rule_allowing_permissions(rules, resources, verbs, api_groups=("",)):
"""
Check Kubernetes role permissions.
Check whether any RBAC rule grants the specified verbs on the specified
resources within the specified API groups.
This function takes in Kubernetes role rules, resources, and verbs,
and checks if any of the rules grant permissions on the specified
resources with the specified verbs.
A rule matches when its `apiGroups` includes any of `api_groups` (or "*"),
its `resources` includes any of `resources` (or "*"), and its `verbs`
includes any of `verbs` (or "*").
Args:
rules (List[Rule]): The list of Kubernetes role rules.
resources (List[str]): The list of resources to check permissions for.
verbs (List[str]): The list of verbs to check permissions for.
rules (List[Rule]): RBAC rules from a Role or ClusterRole.
resources (List[str]): Resources (or sub-resources) to check.
verbs (List[str]): Verbs to check.
api_groups (Iterable[str]): API groups the resources live in. Defaults
to ("",), the core API group, which matches the most common case.
Pass an explicit value for resources outside the core group, e.g.
("admissionregistration.k8s.io",) for webhook configurations.
Returns:
bool: True if any of the rules grant permissions, False otherwise.
bool: True if any rule grants the permission, False otherwise.
"""
if rules:
# Iterate through each rule in the list of rules
for rule in rules:
# Ensure apiGroups are relevant ("" or "v1" for secrets)
if rule.apiGroups and all(api not in ["", "v1"] for api in rule.apiGroups):
continue # Skip rules with unrelated apiGroups
# Check if the rule has resources, verbs, and matches any of the specified resources and verbs
if (
rule.resources
and (
any(resource in rule.resources for resource in resources)
or "*" in rule.resources
)
and rule.verbs
and (any(verb in rule.verbs for verb in verbs) or "*" in rule.verbs)
):
# If the rule matches, return True
return True
# If no rule matches, return False
if not rules:
return False
for rule in rules:
rule_api_groups = rule.apiGroups or [""]
if not (
any(g in rule_api_groups for g in api_groups) or "*" in rule_api_groups
):
continue
if (
rule.resources
and (any(r in rule.resources for r in resources) or "*" in rule.resources)
and rule.verbs
and (any(v in rule.verbs for v in verbs) or "*" in rule.verbs)
):
return True
return False
@@ -6,29 +6,40 @@ from prowler.providers.kubernetes.services.rbac.rbac_client import rbac_client
verbs = ["update", "patch"]
resources = ["certificatesigningrequests/approval"]
api_groups = ["certificates.k8s.io"]
class rbac_minimize_csr_approval_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to update the CSR approval sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to update the CSR approval sub-resource."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["nodes/proxy"]
class rbac_minimize_node_proxy_subresource_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to the node proxy sub-resource."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to the node proxy sub-resource."
break
findings.append(report)
return findings
@@ -11,21 +11,32 @@ resources = ["persistentvolumes"]
class rbac_minimize_pv_creation_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Check each ClusterRoleBinding for access to create PersistentVolumes
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create PersistentVolumes."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create PersistentVolumes."
break
findings.append(report)
return findings
@@ -11,20 +11,32 @@ resources = ["serviceaccounts/token"]
class rbac_minimize_service_account_token_creation(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create service account tokens."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(cr.rules, resources, verbs):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create service account tokens."
break
findings.append(report)
return findings
@@ -9,29 +9,40 @@ resources = [
"mutatingwebhookconfigurations",
]
verbs = ["create", "update", "delete"]
api_groups = ["admissionregistration.k8s.io"]
class rbac_minimize_webhook_config_access(Check):
def execute(self) -> Check_Report_Kubernetes:
findings = []
# Collect unique subjects and the ClusterRole names bound to them
subjects_bound_roles = {}
for crb in rbac_client.cluster_role_bindings.values():
for subject in crb.subjects:
# CIS benchmarks scope these checks to human identities only
if subject.kind in ["User", "Group"]:
report = Check_Report_Kubernetes(
metadata=self.metadata(), resource=subject
)
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for cr in rbac_client.cluster_roles.values():
if cr.metadata.name == crb.roleRef.name:
if is_rule_allowing_permissions(
cr.rules,
resources,
verbs,
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
key = (subject.kind, subject.name, subject.namespace)
if key not in subjects_bound_roles:
subjects_bound_roles[key] = (subject, set())
subjects_bound_roles[key][1].add(crb.roleRef.name)
cluster_roles_by_name = {
cr.metadata.name: cr for cr in rbac_client.cluster_roles.values()
}
for _, (subject, role_names) in subjects_bound_roles.items():
report = Check_Report_Kubernetes(metadata=self.metadata(), resource=subject)
report.resource_name = f"{subject.kind}:{subject.name}"
report.resource_id = f"{subject.kind}/{subject.name}"
report.status = "PASS"
report.status_extended = f"User or group '{subject.name}' does not have access to create, update, or delete webhook configurations."
for role_name in role_names:
cr = cluster_roles_by_name.get(role_name)
if cr and is_rule_allowing_permissions(
cr.rules, resources, verbs, api_groups
):
report.status = "FAIL"
report.status_extended = f"User or group '{subject.name}' has access to create, update, or delete webhook configurations."
break
findings.append(report)
return findings
@@ -66,6 +66,7 @@ class OraclecloudProvider(Provider):
_compartments: list = []
_mutelist: OCIMutelist
audit_metadata: Audit_Metadata
_home_region: str = "us-ashburn-1"
def __init__(
self,
@@ -160,6 +161,13 @@ class OraclecloudProvider(Provider):
# Get regions
self._regions = self.get_regions_to_audit(region)
self._home_region = None
if self._regions:
self._home_region = next(
(region.key for region in self._regions if region.is_home_region),
self._regions[0].key,
)
logger.info(f"Home region is: {self._home_region}")
# Get compartments
self._compartments = self.get_compartments_to_audit(
@@ -217,6 +225,10 @@ class OraclecloudProvider(Provider):
def regions(self):
return self._regions
@property
def home_region(self):
return self._home_region
@property
def compartments(self):
return self._compartments
@@ -1,6 +1,7 @@
"""OCI Identity Service Module."""
from datetime import datetime
from threading import Lock
from typing import Optional
import oci
@@ -26,6 +27,7 @@ class Identity(OCIService):
self.policies = []
self.dynamic_groups = []
self.domains = []
self._domains_lock = Lock()
self.password_policy = None
self.root_compartment_resources = []
self.active_non_root_compartments = []
@@ -61,8 +63,8 @@ class Identity(OCIService):
regional_client: Regional OCI client
"""
try:
# Identity is a global service, use home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global users
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -312,7 +314,8 @@ class Identity(OCIService):
def __list_groups__(self, regional_client):
"""List all IAM groups."""
try:
if regional_client.region not in self.provider.identity.region:
# Only use one region for global groups
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -355,7 +358,8 @@ class Identity(OCIService):
def __list_policies__(self, regional_client):
"""List all IAM policies."""
try:
if regional_client.region not in self.provider.identity.region:
# Only use one region for global policies
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -399,8 +403,8 @@ class Identity(OCIService):
def __list_dynamic_groups__(self, regional_client):
"""List all dynamic groups in the tenancy."""
try:
# Dynamic groups are only in the home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global dynamic groups
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -447,10 +451,6 @@ class Identity(OCIService):
def __list_domains__(self, regional_client):
"""List all identity domains."""
try:
# Domains are only in the home region
if regional_client.region not in self.provider.identity.region:
return
identity_client = self.__get_client__(regional_client.region)
logger.info("Identity - Listing Identity Domains...")
@@ -458,6 +458,7 @@ class Identity(OCIService):
try:
# List all domains in the tenancy
for compartment in self.audited_compartments:
domains = oci.pagination.list_call_get_all_results(
identity_client.list_domains,
compartment_id=compartment.id,
@@ -465,20 +466,38 @@ class Identity(OCIService):
).data
for domain in domains:
self.domains.append(
IdentityDomain(
id=domain.id,
display_name=domain.display_name,
description=domain.description or "",
url=domain.url,
home_region=domain.home_region,
compartment_id=compartment.id,
lifecycle_state=domain.lifecycle_state,
time_created=domain.time_created,
region=regional_client.region,
password_policies=[],
# Threads run __list_domains__ concurrently per
# region; serialize the dedupe-then-append so two
# regions returning the same domain cannot race
# past each other and produce duplicates or lose
# the home-region preference.
with self._domains_lock:
existing = next(
(d for d in self.domains if d.id == domain.id),
None,
)
if existing is not None:
# Prefer the entry from the domain's home region
if domain.home_region == regional_client.region:
self.domains.remove(existing)
else:
continue
self.domains.append(
IdentityDomain(
id=domain.id,
display_name=domain.display_name,
description=domain.description or "",
url=domain.url,
home_region=domain.home_region,
compartment_id=compartment.id,
lifecycle_state=domain.lifecycle_state,
time_created=domain.time_created,
region=regional_client.region,
password_policies=[],
)
)
)
except Exception as error:
logger.error(
@@ -493,8 +512,8 @@ class Identity(OCIService):
def __list_domain_password_policies__(self, regional_client):
"""List password policies for all identity domains."""
try:
# Password policies are only in the home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for all domain scan
if regional_client.region != self.provider.home_region:
return
logger.info("Identity - Listing Domain Password Policies...")
@@ -551,7 +570,8 @@ class Identity(OCIService):
def __get_password_policy__(self, regional_client):
"""Get the password policy for the tenancy."""
try:
if regional_client.region not in self.provider.identity.region:
# Only use one region for global password policies
if regional_client.region != self.provider.home_region:
return
identity_client = self.__get_client__(regional_client.region)
@@ -578,8 +598,8 @@ class Identity(OCIService):
def __search_root_compartment_resources__(self, regional_client):
"""Search for resources in the root compartment using OCI Resource Search."""
try:
# Search is a global service, use home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global search
if regional_client.region != self.provider.home_region:
return
logger.info("Identity - Searching for resources in root compartment...")
@@ -626,10 +646,9 @@ class Identity(OCIService):
def __search_active_non_root_compartments__(self, regional_client):
"""Search for active non-root compartments using OCI Resource Search."""
try:
# Search is a global service, use home region
if regional_client.region not in self.provider.identity.region:
# Only use one region for global search
if regional_client.region != self.provider.home_region:
return
logger.info("Identity - Searching for active non-root compartments...")
# Create search client using the helper method for proper authentication
+1 -1
View File
@@ -95,7 +95,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">=3.10,<3.13"
version = "5.25.1"
version = "5.25.3"
[project.scripts]
prowler = "prowler.__main__:prowler"
@@ -100,7 +100,7 @@ class TestCloudTrailTimeline:
assert len(result) == 1
assert result[0]["event_name"] == "RunInstances"
assert result[0]["actor"] == "admin"
assert result[0]["actor"] == "user/admin"
assert result[0]["source_ip_address"] == "203.0.113.1"
def test_get_resource_timeline_with_resource_uid(
@@ -304,14 +304,28 @@ class TestExtractActor:
"arn": "arn:aws:iam::123456789012:user/alice",
"userName": "alice",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "alice"
assert CloudTrailTimeline._extract_actor(user_identity) == "user/alice"
def test_extract_actor_assumed_role(self):
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/MyRole/session-name",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "MyRole"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/MyRole/session-name"
)
def test_extract_actor_assumed_role_sso(self):
"""SSO sessions store the user identity in the session name."""
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com",
}
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "assumed-role/AWSReservedSSO_AdministratorAccess_abcdef1234567890/user@example.com"
)
def test_extract_actor_root(self):
user_identity = {"type": "Root", "arn": "arn:aws:iam::123456789012:root"}
@@ -327,21 +341,33 @@ class TestExtractActor:
== "elasticloadbalancing.amazonaws.com"
)
def test_extract_actor_fallback_to_principal_id(self):
user_identity = {"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
assert (
CloudTrailTimeline._extract_actor(user_identity) == "AROAEXAMPLEID:session"
)
def test_extract_actor_unknown(self):
assert CloudTrailTimeline._extract_actor({}) == "Unknown"
def test_extract_actor_username_only_returns_unknown(self):
"""When userIdentity carries only userName/principalId (no arn or
invokedBy), we deliberately return "Unknown" — we rely on the ARN
from the upstream service for the actor."""
assert (
CloudTrailTimeline._extract_actor({"type": "IAMUser", "userName": "alice"})
== "Unknown"
)
assert (
CloudTrailTimeline._extract_actor(
{"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
)
== "Unknown"
)
def test_extract_actor_federated_user(self):
user_identity = {
"type": "FederatedUser",
"arn": "arn:aws:sts::123456789012:federated-user/developer",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "developer"
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "federated-user/developer"
)
class TestParseEvent:
@@ -380,7 +406,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["event_source"] == "ec2.amazonaws.com"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
assert result["actor_uid"] == "arn:aws:iam::123456789012:user/admin"
assert result["actor_type"] == "IAMUser"
@@ -424,7 +450,10 @@ class TestParseEvent:
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": {
"userIdentity": {"type": "IAMUser", "userName": "admin"},
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
},
},
}
timeline = CloudTrailTimeline(session=mock_session)
@@ -432,7 +461,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
def test_parse_event_missing_event_id(self, mock_session):
"""Test parsing event without EventId returns None (event_id is required)."""
@@ -506,7 +535,7 @@ class TestParseEvent:
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
assert result["actor"] == "user/admin"
# actor_type should be None when not present in userIdentity
assert result["actor_type"] is None
@@ -4,6 +4,7 @@ from boto3 import client, resource
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_REGION_EU_WEST_1,
AWS_REGION_US_EAST_1,
AWS_REGION_US_WEST_2,
set_mocked_aws_provider,
@@ -502,3 +503,276 @@ class Test_route53_dangling_ip_subdomain_takeover:
result[0].status_extended
== f"Route53 record {record_ip} (name: {record_set_name}) in Hosted Zone {HOSTED_ZONE_NAME} is not a dangling IP."
)
@mock_aws
def test_hosted_zone_cname_to_existing_s3_website_bucket(self):
bucket_name = "my-static-site"
s3 = client("s3", region_name=AWS_REGION_US_EAST_1)
s3.create_bucket(Bucket=bucket_name)
conn = client("route53", region_name=AWS_REGION_US_EAST_1)
zone_id = conn.create_hosted_zone(
Name=HOSTED_ZONE_NAME, CallerReference=str(hash("foo"))
)["HostedZone"]["Id"]
record_set_name = "www.testdns.aws.com."
cname_target = f"{bucket_name}.s3-website-us-east-1.amazonaws.com"
conn.change_resource_record_sets(
HostedZoneId=zone_id,
ChangeBatch={
"Changes": [
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": record_set_name,
"Type": "CNAME",
"TTL": 60,
"ResourceRecords": [{"Value": cname_target}],
},
}
]
},
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.route53.route53_service import Route53
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.route53_client",
new=Route53(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.ec2_client",
new=EC2(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.s3_client",
new=S3(aws_provider),
):
from prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover import (
route53_dangling_ip_subdomain_takeover,
)
check = route53_dangling_ip_subdomain_takeover()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Route53 CNAME {record_set_name} in Hosted Zone {HOSTED_ZONE_NAME} points to S3 website endpoint of bucket {bucket_name} which exists in the account."
)
assert (
result[0].resource_id
== zone_id.replace("/hostedzone/", "")
+ "/"
+ record_set_name
+ "/"
+ cname_target
)
assert (
result[0].resource_arn
== f"arn:{aws_provider.identity.partition}:route53:::hostedzone/{zone_id.replace('/hostedzone/', '')}"
)
@mock_aws
def test_hosted_zone_cname_to_dangling_s3_website_bucket(self):
# Bucket name referenced by the CNAME is NOT created in the account
# (simulates a deleted bucket whose name is now claimable by anyone)
missing_bucket = "deleted-static-site"
conn = client("route53", region_name=AWS_REGION_US_EAST_1)
zone_id = conn.create_hosted_zone(
Name=HOSTED_ZONE_NAME, CallerReference=str(hash("foo"))
)["HostedZone"]["Id"]
record_set_name = "www.testdns.aws.com."
cname_target = f"{missing_bucket}.s3-website-us-east-1.amazonaws.com"
conn.change_resource_record_sets(
HostedZoneId=zone_id,
ChangeBatch={
"Changes": [
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": record_set_name,
"Type": "CNAME",
"TTL": 60,
"ResourceRecords": [{"Value": cname_target}],
},
}
]
},
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.route53.route53_service import Route53
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.route53_client",
new=Route53(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.ec2_client",
new=EC2(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.s3_client",
new=S3(aws_provider),
):
from prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover import (
route53_dangling_ip_subdomain_takeover,
)
check = route53_dangling_ip_subdomain_takeover()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Route53 CNAME {record_set_name} in Hosted Zone {HOSTED_ZONE_NAME} points to S3 website endpoint of bucket {missing_bucket} which does not exist in the account and can lead to a subdomain takeover attack."
)
assert (
result[0].resource_id
== zone_id.replace("/hostedzone/", "")
+ "/"
+ record_set_name
+ "/"
+ cname_target
)
@mock_aws
def test_hosted_zone_cname_to_dangling_s3_website_bucket_dot_format(self):
# Newer regions use the dot-style endpoint:
# <bucket>.s3-website.<region>.amazonaws.com
missing_bucket = "deleted-eu-site"
conn = client("route53", region_name=AWS_REGION_US_EAST_1)
zone_id = conn.create_hosted_zone(
Name=HOSTED_ZONE_NAME, CallerReference=str(hash("foo"))
)["HostedZone"]["Id"]
record_set_name = "eu.testdns.aws.com."
cname_target = (
f"{missing_bucket}.s3-website.{AWS_REGION_EU_WEST_1}.amazonaws.com"
)
conn.change_resource_record_sets(
HostedZoneId=zone_id,
ChangeBatch={
"Changes": [
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": record_set_name,
"Type": "CNAME",
"TTL": 60,
"ResourceRecords": [{"Value": cname_target}],
},
}
]
},
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.route53.route53_service import Route53
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.route53_client",
new=Route53(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.ec2_client",
new=EC2(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.s3_client",
new=S3(aws_provider),
):
from prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover import (
route53_dangling_ip_subdomain_takeover,
)
check = route53_dangling_ip_subdomain_takeover()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert missing_bucket in result[0].status_extended
@mock_aws
def test_hosted_zone_cname_to_non_s3_target_is_ignored(self):
# CNAMEs that do not target an S3 website endpoint must not yield a finding
conn = client("route53", region_name=AWS_REGION_US_EAST_1)
zone_id = conn.create_hosted_zone(
Name=HOSTED_ZONE_NAME, CallerReference=str(hash("foo"))
)["HostedZone"]["Id"]
conn.change_resource_record_sets(
HostedZoneId=zone_id,
ChangeBatch={
"Changes": [
{
"Action": "CREATE",
"ResourceRecordSet": {
"Name": "blog.testdns.aws.com.",
"Type": "CNAME",
"TTL": 60,
"ResourceRecords": [{"Value": "external-host.example.com"}],
},
}
]
},
)
from prowler.providers.aws.services.ec2.ec2_service import EC2
from prowler.providers.aws.services.route53.route53_service import Route53
from prowler.providers.aws.services.s3.s3_service import S3
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.route53_client",
new=Route53(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.ec2_client",
new=EC2(aws_provider),
):
with mock.patch(
"prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover.s3_client",
new=S3(aws_provider),
):
from prowler.providers.aws.services.route53.route53_dangling_ip_subdomain_takeover.route53_dangling_ip_subdomain_takeover import (
route53_dangling_ip_subdomain_takeover,
)
check = route53_dangling_ip_subdomain_takeover()
result = check.execute()
assert len(result) == 0
@@ -6,90 +6,92 @@ from prowler.providers.kubernetes.services.rbac.rbac_service import Rule
class TestCheckRolePermissions:
def test_is_rule_allowing_permissions(self):
# Define some sample rules, resources, and verbs for testing
rules = [
# Rule 1: Allows 'get' and 'list' on 'pods' and 'services'
Rule(resources=["pods", "services"], verbs=["get", "list"]),
# Rule 2: Allows 'create' and 'delete' on 'deployments'
Rule(resources=["deployments"], verbs=["create", "delete"]),
]
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert is_rule_allowing_permissions(rules, resources, verbs)
assert is_rule_allowing_permissions(
rules, ["pods", "deployments"], ["get", "create"]
)
def test_no_permissions(self):
# Test when there are no rules
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_no_matching_rules(self):
# Test when there are rules, but none match the specified resources and verbs
rules = [
Rule(resources=["services"], verbs=["get", "list"]),
Rule(resources=["pods"], verbs=["create", "delete"]),
]
resources = ["deployments", "configmaps"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions(
rules, ["deployments", "configmaps"], ["get", "create"]
)
def test_empty_rules(self):
# Test when the rules list is empty
rules = []
resources = ["pods", "deployments"]
verbs = ["get", "create"]
assert not is_rule_allowing_permissions(rules, resources, verbs)
assert not is_rule_allowing_permissions([], ["pods"], ["get"])
def test_empty_resources_and_verbs(self):
# Test when resources and verbs are empty lists
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
]
resources = []
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], [])
def test_matching_rule_with_empty_resources_or_verbs(self):
# Test when a rule matches, but either resources or verbs are empty
rules = [Rule(resources=["pods"], verbs=["get"])]
assert not is_rule_allowing_permissions(rules, [], ["get"])
assert not is_rule_allowing_permissions(rules, ["pods"], [])
def test_rule_with_non_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["apps"])]
assert not is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_matching_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_default_api_group_is_core(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
def test_rule_with_empty_api_groups_does_not_match_non_core_request(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=None)]
assert not is_rule_allowing_permissions(
rules, ["pods"], ["get"], ["admissionregistration.k8s.io"]
)
def test_non_core_rule_does_not_match_without_api_groups_argument(self):
rules = [
Rule(resources=["pods"], verbs=["get"]),
Rule(resources=["services"], verbs=["list"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = []
verbs = ["get"]
assert not is_rule_allowing_permissions(
rules, ["validatingwebhookconfigurations"], ["create"]
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
resources = ["pods"]
verbs = []
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_ignored_api_groups(self):
# Test when a rule has apiGroups that are not relevant
def test_explicit_non_core_api_group(self):
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["test"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
Rule(
resources=["validatingwebhookconfigurations"],
verbs=["create"],
apiGroups=["admissionregistration.k8s.io"],
)
]
resources = ["pods"]
verbs = ["get"]
assert is_rule_allowing_permissions(
rules,
["validatingwebhookconfigurations"],
["create"],
["admissionregistration.k8s.io"],
)
assert not is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_api_group(self):
rules = [Rule(resources=["pods"], verbs=["get"], apiGroups=["*"])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, ["pods"], ["get"], ["apps"])
def test_rule_with_relevant_api_groups(self):
# Test when a rule has apiGroups that are relevant
rules = [
Rule(resources=["pods"], verbs=["get"], apiGroups=["", "v1"]),
Rule(resources=["services"], verbs=["list"], apiGroups=["test2"]),
]
resources = ["pods"]
verbs = ["get"]
def test_rule_with_wildcard_resources(self):
rules = [Rule(resources=["*"], verbs=["get"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
assert is_rule_allowing_permissions(rules, resources, verbs)
def test_rule_with_wildcard_verbs(self):
rules = [Rule(resources=["pods"], verbs=["*"], apiGroups=[""])]
assert is_rule_allowing_permissions(rules, ["pods"], ["get"])
@@ -37,6 +37,7 @@ def set_mocked_oraclecloud_provider(
signer=MagicMock(),
profile="DEFAULT",
)
provider.home_region = region
# Mock identity
provider.identity = OCIIdentityInfo(
@@ -6,7 +6,7 @@ from prowler.providers.oraclecloud.exceptions.exceptions import (
OCIAuthenticationError,
OCIInvalidConfigError,
)
from prowler.providers.oraclecloud.models import OCISession
from prowler.providers.oraclecloud.models import OCIIdentityInfo, OCIRegion, OCISession
from prowler.providers.oraclecloud.oraclecloud_provider import OraclecloudProvider
@@ -199,3 +199,59 @@ MIIEpQIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8n0sMcD/QHWCJ7yGSEtLN2T
)
assert connection.is_connected is True
class TestOraclecloudProviderInit:
"""Tests for OraclecloudProvider initialization"""
def test_init_with_region_set_populates_provider_state(self):
mock_session = OCISession(
config={"region": "us-ashburn-1"}, signer=None, profile="DEFAULT"
)
mock_identity = OCIIdentityInfo(
tenancy_id="ocid1.tenancy.oc1..aaaaaaaexample",
tenancy_name="test-tenancy",
user_id="ocid1.user.oc1..aaaaaaaexample",
region="us-ashburn-1",
profile="DEFAULT",
audited_regions=set(),
audited_compartments=[],
)
mock_regions = [
OCIRegion(key="us-phoenix-1", name="us-phoenix-1", is_home_region=False),
OCIRegion(key="us-ashburn-1", name="us-ashburn-1", is_home_region=True),
]
mock_compartments = ["ocid1.compartment.oc1..aaaaaaaexample"]
with (
patch(
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.setup_session",
return_value=mock_session,
) as mock_setup_session,
patch(
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.set_identity",
return_value=mock_identity,
),
patch(
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_regions_to_audit",
return_value=mock_regions,
),
patch(
"prowler.providers.oraclecloud.oraclecloud_provider.OraclecloudProvider.get_compartments_to_audit",
return_value=mock_compartments,
),
patch(
"prowler.providers.common.provider.Provider.set_global_provider"
) as mock_set_global,
):
provider = OraclecloudProvider(
region={"us-ashburn-1"},
config_content={"dummy": True},
mutelist_content={"Accounts": {}},
)
assert mock_setup_session.call_args.kwargs["region"] == "us-ashburn-1"
assert provider.session == mock_session
assert provider.identity == mock_identity
assert provider.regions == mock_regions
assert provider.compartments == mock_compartments
assert provider.home_region == "us-ashburn-1"
mock_set_global.assert_called_once_with(provider)
@@ -1,4 +1,7 @@
from unittest.mock import patch
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from threading import Lock
from unittest.mock import MagicMock, patch
from tests.providers.oraclecloud.oci_fixtures import set_mocked_oraclecloud_provider
@@ -28,3 +31,184 @@ class TestIdentityService:
# Verify service name
assert identity_client.service == "identity"
assert identity_client.provider == oraclecloud_provider
def test_list_domains_passwords_skipped_outside_home(self):
"""Domains should be skipped when not in home region."""
with patch(
"prowler.providers.oraclecloud.services.identity.identity_service.Identity.__init__",
return_value=None,
):
from prowler.providers.oraclecloud.services.identity.identity_service import (
Identity,
)
identity_client = Identity(None)
identity_client.service = "identity"
identity_client.provider = set_mocked_oraclecloud_provider()
identity_client.provider._home_region = "us-ashburn-1"
identity_client.audited_compartments = [
MagicMock(id="ocid1.compartment.oc1..aaaaaaaexample")
]
identity_client.domains = []
identity_client._domains_lock = Lock()
identity_client.session_signer = None
identity_client.session_config = None
regional_client_ash = MagicMock()
regional_client_ash.region = "us-ashburn-1"
regional_client_chi = MagicMock()
regional_client_chi.region = "us-chicago-1"
policy = MagicMock()
policy.id = "123"
policy.name = "Test Policy"
policy.description = "This is a test policy"
policy.min_length = 8
policy.password_expires_after = 90
policy.num_passwords_in_history = 5
policy.password_expire_warning = 7
policy.min_password_age = 1
domains = []
for region in ["us-phoenix-1", "us-ashburn-1", "us-chicago-1"]:
domain = MagicMock()
domain.id = (
"ocid1.domain.oc1.iad.aaaaaaaaexampleuniqueID"
if region == "us-chicago-1"
else "ocid1.domain.oc1.iad.aaaaaaaaexampleuniqueID2"
)
domain.display_name = "exampledomain"
domain.description = "example"
domain.url = "https://idcs-example.identity.oraclecloud.com"
domain.home_region = region
domain.region = "us-ashburn-1"
domain.lifecycle_state = "ACTIVE"
domain.time_created = datetime.now()
domains.append(domain)
with (
patch(
"prowler.providers.oraclecloud.services.identity.identity_service.Identity.__get_client__",
return_value=MagicMock(),
),
patch(
"prowler.providers.oraclecloud.services.identity.identity_service.oci.pagination.list_call_get_all_results",
return_value=MagicMock(data=domains),
),
patch(
"prowler.providers.oraclecloud.services.identity.identity_service.oci.identity_domains.IdentityDomainsClient",
return_value=MagicMock(
list_password_policies=lambda: MagicMock(
data=MagicMock(resources=[policy])
)
),
),
):
identity_client.__list_domains__(regional_client_ash)
identity_client.__list_domains__(regional_client_chi)
identity_client.__list_domain_password_policies__(regional_client_ash)
identity_client.__list_domain_password_policies__(regional_client_chi)
assert (
len(identity_client.domains) == 2
and any(
domain.home_region == "us-ashburn-1"
and domain.region == "us-ashburn-1"
for domain in identity_client.domains
)
and any(
domain.home_region == "us-chicago-1"
and domain.region == "us-chicago-1"
for domain in identity_client.domains
)
and all(len(d.password_policies) == 1 for d in identity_client.domains)
)
def test_list_domains_concurrent_dedupes_and_prefers_home_region(self):
"""__list_domains__ runs across regions in parallel; the dedupe
must stay correct under concurrent calls (no duplicates, home
region wins)."""
with patch(
"prowler.providers.oraclecloud.services.identity.identity_service.Identity.__init__",
return_value=None,
):
from prowler.providers.oraclecloud.services.identity.identity_service import (
Identity,
)
identity_client = Identity(None)
identity_client.service = "identity"
identity_client.provider = set_mocked_oraclecloud_provider()
identity_client.audited_compartments = [
MagicMock(id="ocid1.compartment.oc1..aaaaaaaexample")
]
identity_client.domains = []
identity_client._domains_lock = Lock()
identity_client.session_signer = None
identity_client.session_config = None
regions = [
"us-ashburn-1",
"us-chicago-1",
"us-phoenix-1",
"eu-frankfurt-1",
]
home_region_by_domain = {
"ocid1.domain.oc1..domainA": "us-ashburn-1",
"ocid1.domain.oc1..domainB": "us-chicago-1",
"ocid1.domain.oc1..domainC": "eu-frankfurt-1",
}
# Each region returns the same set of domains (every domain
# is visible from every region; only one of those regions is
# actually the domain's home region).
def make_domains_for_region(_region):
ds = []
for domain_id, home_region in home_region_by_domain.items():
d = MagicMock()
d.id = domain_id
d.display_name = f"name-{domain_id}"
d.description = ""
d.url = "https://example.identity.oraclecloud.com"
d.home_region = home_region
d.lifecycle_state = "ACTIVE"
d.time_created = datetime.now()
ds.append(d)
return MagicMock(data=ds)
regional_clients = []
for region in regions:
rc = MagicMock()
rc.region = region
regional_clients.append(rc)
with (
patch(
"prowler.providers.oraclecloud.services.identity.identity_service.Identity.__get_client__",
return_value=MagicMock(),
),
patch(
"prowler.providers.oraclecloud.services.identity.identity_service.oci.pagination.list_call_get_all_results",
side_effect=lambda _list_call, compartment_id, lifecycle_state: make_domains_for_region(
compartment_id
),
),
):
# Run several iterations to make any race more likely
# to surface; with the lock removed this loop fails
# frequently with duplicates.
for _ in range(20):
identity_client.domains = []
with ThreadPoolExecutor(
max_workers=len(regional_clients)
) as executor:
futures = [
executor.submit(identity_client.__list_domains__, rc)
for rc in regional_clients
]
for f in futures:
f.result()
assert len(identity_client.domains) == len(home_region_by_domain)
by_id = {d.id: d for d in identity_client.domains}
for domain_id, home_region in home_region_by_domain.items():
assert by_id[domain_id].region == home_region
assert by_id[domain_id].home_region == home_region
+17
View File
@@ -2,6 +2,23 @@
All notable changes to the **Prowler UI** are documented in this file.
## [1.25.3] (Prowler UNRELEASED)
### 🐞 Fixed
- CLI command in the finding drawer no longer renders the line-number gutter, matching the original styled block while removing the leading `1` [(#11059)](https://github.com/prowler-cloud/prowler/pull/11059)
---
## [1.25.2] (Prowler v5.25.2)
### 🔄 Changed
- Compliance cards: progress bar now spans the full card width, the passing-requirements caption sits beside the framework logo under the title, and the ISO 27001 logo asset is recentered within its tile [(#10939)](https://github.com/prowler-cloud/prowler/pull/10939)
- Findings expanded resource rows now drop the redundant cube icons, render Service and Region with the same compact label style as Last seen and Failing for, and reorder columns to Status, Resource, Provider, Severity, then field labels [(#10949)](https://github.com/prowler-cloud/prowler/pull/10949)
---
## [1.25.1] (Prowler v5.25.1)
### 🐞 Fixed
+41 -39
View File
@@ -119,49 +119,33 @@ export const ComplianceCard: React.FC<ComplianceCardProps> = ({
/>
</div>
<CardContent className="p-0">
<div className="flex w-full flex-col gap-3 sm:flex-row sm:items-start">
<div className="flex shrink-0 items-center sm:flex-col sm:items-start sm:gap-2">
<div className="flex w-full flex-col gap-3">
<div className="flex items-center gap-3 pr-9">
{getComplianceIcon(title) && (
<Image
src={getComplianceIcon(title)}
alt={`${title} logo`}
className="h-10 w-10 min-w-10 self-start rounded-md border border-gray-300 bg-white object-contain p-1"
/>
<div className="flex h-10 w-10 min-w-10 shrink-0 items-center justify-center rounded-md border border-gray-300 bg-white">
<Image
src={getComplianceIcon(title)}
alt={`${title} logo`}
width={32}
height={32}
className="h-8 w-8 object-contain"
/>
</div>
)}
</div>
<div className="flex w-full min-w-0 flex-col gap-3">
<Tooltip>
<TooltipTrigger asChild>
<h4 className="text-small truncate pr-9 leading-5 font-bold">
<div className="flex min-w-0 flex-1 flex-col">
<Tooltip>
<TooltipTrigger asChild>
<h4 className="text-small truncate leading-5 font-bold">
{formatTitle(title)}
{version ? ` - ${version}` : ""}
</h4>
</TooltipTrigger>
<TooltipContent>
{formatTitle(title)}
{version ? ` - ${version}` : ""}
</h4>
</TooltipTrigger>
<TooltipContent>
{formatTitle(title)}
{version ? ` - ${version}` : ""}
</TooltipContent>
</Tooltip>
<div className="flex flex-col gap-2">
<div className="flex items-center justify-between gap-3 text-xs">
<span className="text-text-neutral-secondary font-medium tracking-wider">
Score:
</span>
<span className="text-text-neutral-secondary">
{ratingPercentage}%
</span>
</div>
<Progress
aria-label="Compliance score"
value={ratingPercentage}
className="border-border-neutral-secondary h-2.5 border drop-shadow-sm"
indicatorClassName={getScoreIndicatorClass(
getRatingVariant(ratingPercentage),
)}
/>
</div>
<div className="flex flex-col gap-3 sm:flex-row sm:items-center sm:justify-between">
<small className="min-w-0">
</TooltipContent>
</Tooltip>
<small className="truncate">
<span className="mr-1 text-xs font-semibold">
{passingRequirements} / {totalRequirements}
</span>
@@ -169,6 +153,24 @@ export const ComplianceCard: React.FC<ComplianceCardProps> = ({
</small>
</div>
</div>
<div className="flex flex-col gap-2">
<div className="flex items-center justify-between gap-3 text-xs">
<span className="text-text-neutral-secondary font-medium tracking-wider">
Score:
</span>
<span className="text-text-neutral-secondary">
{ratingPercentage}%
</span>
</div>
<Progress
aria-label="Compliance score"
value={ratingPercentage}
className="border-border-neutral-secondary h-2.5 border drop-shadow-sm"
indicatorClassName={getScoreIndicatorClass(
getRatingVariant(ratingPercentage),
)}
/>
</div>
</div>
</CardContent>
</Card>
@@ -1,7 +1,7 @@
"use client";
import { ColumnDef, Row, RowSelectionState } from "@tanstack/react-table";
import { Container, CornerDownRight, VolumeOff, VolumeX } from "lucide-react";
import { CornerDownRight, VolumeOff, VolumeX } from "lucide-react";
import { useContext, useState } from "react";
import { MuteFindingsModal } from "@/components/findings/mute-findings-modal";
@@ -203,23 +203,6 @@ export function getColumnFindingResources({
enableSorting: false,
enableHiding: false,
},
// Resource — name + uid (EntityInfo with resource icon)
{
id: "resource",
header: ({ column }) => (
<DataTableColumnHeader column={column} title="Resource" />
),
cell: ({ row }) => (
<div className="max-w-[240px]">
<EntityInfo
nameIcon={<Container className="size-4" />}
entityAlias={row.original.resourceName}
entityId={row.original.resourceUid}
/>
</div>
),
enableSorting: false,
},
// Status
{
id: "status",
@@ -233,29 +216,35 @@ export function getColumnFindingResources({
},
enableSorting: false,
},
// Service
// Resource — name + uid
{
id: "service",
id: "resource",
header: ({ column }) => (
<DataTableColumnHeader column={column} title="Service" />
<DataTableColumnHeader column={column} title="Resource" />
),
cell: ({ row }) => (
<p className="text-text-neutral-primary max-w-[100px] truncate text-sm">
{row.original.service}
</p>
<div className="max-w-[240px]">
<EntityInfo
entityAlias={row.original.resourceName}
entityId={row.original.resourceUid}
/>
</div>
),
enableSorting: false,
},
// Region
// Provider — alias + uid (same style as Resource)
{
id: "region",
id: "provider",
header: ({ column }) => (
<DataTableColumnHeader column={column} title="Region" />
<DataTableColumnHeader column={column} title="Provider" />
),
cell: ({ row }) => (
<p className="text-text-neutral-primary max-w-[120px] truncate text-sm">
{row.original.region}
</p>
<div className="max-w-[240px]">
<EntityInfo
entityAlias={row.original.providerAlias}
entityId={row.original.providerUid}
/>
</div>
),
enableSorting: false,
},
@@ -268,20 +257,29 @@ export function getColumnFindingResources({
cell: ({ row }) => <SeverityBadge severity={row.original.severity} />,
enableSorting: false,
},
// Account — alias + uid (EntityInfo with provider logo)
// Service
{
id: "account",
id: "service",
header: ({ column }) => (
<DataTableColumnHeader column={column} title="Account" />
<DataTableColumnHeader column={column} title="Service" />
),
cell: ({ row }) => (
<div className="max-w-[240px]">
<EntityInfo
cloudProvider={row.original.providerType}
entityAlias={row.original.providerAlias}
entityId={row.original.providerUid}
/>
</div>
<InfoField label="Service" variant="compact">
{row.original.service || "-"}
</InfoField>
),
enableSorting: false,
},
// Region
{
id: "region",
header: ({ column }) => (
<DataTableColumnHeader column={column} title="Region" />
),
cell: ({ row }) => (
<InfoField label="Region" variant="compact">
{row.original.region || "-"}
</InfoField>
),
enableSorting: false,
},
@@ -70,27 +70,23 @@ function ResourceSkeletonRow({
<div className="bg-bg-input-primary border-border-input-primary size-5 rounded-sm border shadow-[0_1px_2px_0_rgba(0,0,0,0.1)]" />
</div>
</TableCell>
{/* Resource: icon + name + uid */}
<TableCell className={cellClassName}>
<div className="flex items-center gap-2">
<Skeleton className="size-4 rounded" />
<div className="space-y-1.5">
<Skeleton className="h-4 w-32 rounded" />
<Skeleton className="h-3.5 w-20 rounded" />
</div>
</div>
</TableCell>
{/* Status */}
<TableCell className={cellClassName}>
<Skeleton className="h-6 w-11 rounded-md" />
</TableCell>
{/* Service */}
{/* Resource: name + uid */}
<TableCell className={cellClassName}>
<Skeleton className="h-4.5 w-16 rounded" />
<div className="space-y-1.5">
<Skeleton className="h-4 w-32 rounded" />
<Skeleton className="h-3.5 w-20 rounded" />
</div>
</TableCell>
{/* Region */}
{/* Provider: alias + uid */}
<TableCell className={cellClassName}>
<Skeleton className="h-4.5 w-20 rounded" />
<div className="space-y-1.5">
<Skeleton className="h-4 w-24 rounded" />
<Skeleton className="h-3.5 w-16 rounded" />
</div>
</TableCell>
{/* Severity */}
<TableCell className={cellClassName}>
@@ -99,15 +95,13 @@ function ResourceSkeletonRow({
<Skeleton className="h-4.5 w-12 rounded" />
</div>
</TableCell>
{/* Account: provider icon + alias + uid */}
{/* Service */}
<TableCell className={cellClassName}>
<div className="flex items-center gap-2">
<Skeleton className="size-4 rounded" />
<div className="space-y-1.5">
<Skeleton className="h-4 w-24 rounded" />
<Skeleton className="h-3.5 w-16 rounded" />
</div>
</div>
<Skeleton className="h-4.5 w-16 rounded" />
</TableCell>
{/* Region */}
<TableCell className={cellClassName}>
<Skeleton className="h-4.5 w-20 rounded" />
</TableCell>
{/* Last seen */}
<TableCell className={cellClassName}>
@@ -219,21 +219,25 @@ vi.mock("@/components/shared/query-code-editor", () => ({
language,
value,
copyValue,
showLineNumbers = true,
}: {
ariaLabel: string;
language?: string;
value: string;
copyValue?: string;
showLineNumbers?: boolean;
}) => (
<div
data-testid="query-code-editor"
data-aria-label={ariaLabel}
data-language={language}
data-show-line-numbers={String(showLineNumbers)}
>
<span>{ariaLabel}</span>
<span>{value}</span>
<button
type="button"
aria-label={`Copy ${ariaLabel}`}
onClick={() => mockClipboardWriteText(copyValue ?? value)}
>
Copy editor code
@@ -255,7 +259,22 @@ vi.mock("@/components/icons/services/IconServices", () => ({
}));
vi.mock("@/components/ui/code-snippet/code-snippet", () => ({
CodeSnippet: ({ value }: { value: string }) => <span>{value}</span>,
CodeSnippet: ({
value,
formatter,
ariaLabel = "Copy to clipboard",
}: {
value: string;
formatter?: (value: string) => string;
ariaLabel?: string;
}) => (
<div data-testid="code-snippet">
<span>{formatter ? formatter(value) : value}</span>
<button type="button" onClick={() => mockClipboardWriteText(value)}>
{ariaLabel}
</button>
</div>
),
}));
vi.mock("@/components/ui/custom/custom-link", () => ({
@@ -592,7 +611,7 @@ describe("ResourceDetailDrawerContent — Fix 2: Remediation heading labels", ()
expect(allText).toContain("CLI Command");
});
it("should render remediation snippets with the shared code editor and copy CLI without the visual prompt", async () => {
it("should render CLI remediation in the code editor without line numbers and copy without the visual prompt", async () => {
// Given
const user = userEvent.setup();
render(
@@ -612,17 +631,19 @@ describe("ResourceDetailDrawerContent — Fix 2: Remediation heading labels", ()
// When
const editors = screen.getAllByTestId("query-code-editor");
await user.click(
within(editors[0]).getByRole("button", { name: "Copy editor code" }),
);
await user.click(screen.getByRole("button", { name: "Copy CLI Command" }));
// Then
expect(editors).toHaveLength(3);
expect(editors[0]).toHaveAttribute("data-aria-label", "CLI Command");
expect(editors[0]).toHaveAttribute("data-show-line-numbers", "false");
expect(editors[1]).toHaveAttribute("data-show-line-numbers", "true");
expect(editors[2]).toHaveAttribute("data-show-line-numbers", "true");
expect(mockClipboardWriteText).toHaveBeenCalledWith("aws s3 ...");
expect(screen.getByText("$ aws s3 ...")).toBeInTheDocument();
});
it("should pass syntax highlighting languages to each remediation editor", () => {
it("should pass syntax highlighting languages to all remediation editors", () => {
// Given
render(
<ResourceDetailDrawerContent
@@ -643,10 +664,12 @@ describe("ResourceDetailDrawerContent — Fix 2: Remediation heading labels", ()
const editors = screen.getAllByTestId("query-code-editor");
// Then
expect(editors).toHaveLength(3);
expect(editors[0]).toHaveAttribute("data-language", "shell");
expect(editors[1]).toHaveAttribute("data-language", "hcl");
expect(editors[2]).toHaveAttribute("data-language", "yaml");
expect(editors[0]).toHaveAttribute("data-aria-label", "CLI Command");
expect(editors[1]).toHaveAttribute("data-aria-label", "Terraform");
expect(editors[2]).toHaveAttribute("data-aria-label", "CloudFormation");
});
});
@@ -120,11 +120,13 @@ function renderRemediationCodeBlock({
value,
copyValue,
language = QUERY_EDITOR_LANGUAGE.PLAIN_TEXT,
showLineNumbers = true,
}: {
label: string;
value: string;
copyValue?: string;
language?: QueryEditorLanguage;
showLineNumbers?: boolean;
}) {
return (
<QueryCodeEditor
@@ -135,6 +137,7 @@ function renderRemediationCodeBlock({
editable={false}
minHeight={96}
showCopyButton
showLineNumbers={showLineNumbers}
onChange={() => {}}
/>
);
@@ -889,6 +892,7 @@ export function ResourceDetailDrawerContent({
copyValue: stripCodeFences(
checkMeta.remediation.code.cli,
),
showLineNumbers: false,
})}
</div>
)}
File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 139 KiB

After

Width:  |  Height:  |  Size: 139 KiB

+5 -5
View File
@@ -10,8 +10,6 @@ import { EditorState } from "@codemirror/state";
import { tags } from "@lezer/highlight";
import CodeMirror, {
EditorView,
highlightActiveLineGutter,
lineNumbers,
placeholder as codeEditorPlaceholder,
} from "@uiw/react-codemirror";
import { Check, Copy } from "lucide-react";
@@ -1177,6 +1175,7 @@ interface QueryCodeEditorProps
editable?: boolean;
minHeight?: number;
showCopyButton?: boolean;
showLineNumbers?: boolean;
onChange: (value: string) => void;
onBlur?: () => void;
}
@@ -1195,6 +1194,7 @@ export const QueryCodeEditor = ({
editable = true,
minHeight = 320,
showCopyButton = false,
showLineNumbers = true,
onChange,
onBlur,
...props
@@ -1208,8 +1208,6 @@ export const QueryCodeEditor = ({
: lightHighlightStyle;
const extensions = [
lineNumbers(),
highlightActiveLineGutter(),
EditorView.lineWrapping,
codeEditorPlaceholder(placeholder ?? ""),
EditorView.contentAttributes.of({
@@ -1260,6 +1258,7 @@ export const QueryCodeEditor = ({
<div
data-testid="query-code-editor"
data-language={language}
data-show-line-numbers={String(showLineNumbers)}
className={cn(
"border-border-neutral-secondary bg-bg-neutral-primary overflow-hidden rounded-xl border",
invalid && "border-border-error-primary",
@@ -1307,8 +1306,9 @@ export const QueryCodeEditor = ({
basicSetup={{
foldGutter: false,
highlightActiveLine: false,
highlightActiveLineGutter: false,
highlightActiveLineGutter: showLineNumbers,
searchKeymap: false,
lineNumbers: showLineNumbers,
}}
editable={editable}
onChange={onChange}