Compare commits

..

14 Commits

Author SHA1 Message Date
Prowler Bot aa729a9d2d chore(release): Bump versions to v5.28.2 (#11369)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-26 16:49:06 +02:00
Prowler Bot d086a624a0 fix(ui): honor page size select in compliance req findings (#11368)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-26 15:42:32 +02:00
Prowler Bot a7c2b6cbce fix(mcp_server): preserve authorization header in HTTP mode (#11367)
Co-authored-by: Rubén De la Torre Vico <ruben@prowler.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-26 15:31:30 +02:00
Prowler Bot 5da5848509 chore: SDK changelog v5.28.1 (#11364)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-26 12:19:54 +02:00
Prowler Bot 1a397d1024 fix(ui): avoid report preflight timeouts (#11362)
Co-authored-by: Alan Buscaglia <gentlemanprogramming@gmail.com>
Co-authored-by: Adrián Jesús Peña Rodríguez <adrianjpr@gmail.com>
2026-05-26 12:05:03 +02:00
Prowler Bot d9c849bed0 fix(az-m365): asyncio.run() in Azure/M365 Celery worker event (#11361)
Co-authored-by: Pedro Martín <pedromarting3@gmail.com>
2026-05-26 11:50:28 +02:00
Prowler Bot a33c301fcc fix(gcp): match enable-oslogin metadata case-insensitively (#11359)
Co-authored-by: Aline Almeida <aline@tuplita.ai>
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-26 10:42:17 +02:00
Prowler Bot e65bf81bf8 fix(azure): require all SMB channel encryption algorithms to be secure (storage_smb_channel_encryption_with_secure_algorithm) (#11354)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-05-25 18:37:45 +02:00
Prowler Bot ea419b49d8 chore: changelog v5.28.1 (#11348)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-25 10:20:54 +02:00
Prowler Bot 5900d2314a chore(ui): add changelog for scan report fix (#11339)
Co-authored-by: Alan Buscaglia <gentlemanprogramming@gmail.com>
2026-05-22 15:12:48 +02:00
Prowler Bot 3116352931 fix(ui): stream scan report downloads (#11337)
Co-authored-by: Alan Buscaglia <gentlemanprogramming@gmail.com>
2026-05-22 15:05:40 +02:00
Prowler Bot d54bf452ca perf(api): speed up finding-groups endpoint for finding-level filters (#11336)
Co-authored-by: Josema Camacho <josema@prowler.com>
2026-05-22 14:17:49 +02:00
Prowler Bot 8d8f551664 chore(release): Bump versions to v5.28.1 (#11333)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-22 13:34:34 +02:00
Prowler Bot ae961e5065 chore(api): Update prowler dependency to v5.28 for release 5.28.0 (#11331)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-22 12:34:42 +02:00
116 changed files with 513 additions and 5685 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.29.0
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.28.2
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
-8
View File
@@ -2,14 +2,6 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.30.0] (Prowler UNRELEASED)
### 🔄 Changed
- Scan finding ingestion: bulk-resolve `Resource`/`ResourceTag` rows, replace per-mapping `SELECT FOR UPDATE` with deferred `ResourceTagMapping.bulk_create(ignore_conflicts=True)`, wrap each micro-batch in a single `rls_transaction`, and raise `SCAN_DB_BATCH_SIZE` to 1000 [(#11249)](https://github.com/prowler-cloud/prowler/pull/11249)
---
## [1.29.1] (Prowler v5.28.1)
### 🐞 Fixed
+2 -2
View File
@@ -43,7 +43,7 @@ dependencies = [
"defusedxml==0.7.1",
"gunicorn==23.0.0",
"lxml==6.1.0",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@v5.28",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (==1.3.0)",
"sentry-sdk[django] (==2.56.0)",
@@ -68,7 +68,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.30.0"
version = "1.29.2"
[tool.uv]
# Transitive pins matching master to avoid silent drift; bump deliberately.
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.30.0
version: 1.29.2
description: |-
Prowler API specification.
+287 -470
View File
@@ -42,6 +42,7 @@ from api.db_utils import (
SET_CONFIG_QUERY,
psycopg_connection,
rls_transaction,
update_objects_in_batches,
)
from api.exceptions import ProviderConnectionError
from api.models import (
@@ -58,7 +59,6 @@ from api.models import (
ResourceFindingMapping,
ResourceScanSummary,
ResourceTag,
ResourceTagMapping,
Scan,
ScanCategorySummary,
ScanGroupSummary,
@@ -97,16 +97,8 @@ COMPLIANCE_REQUIREMENT_COPY_COLUMNS = (
)
# Controls how many findings we process per micro-batch before flushing to DB writes
FINDINGS_MICRO_BATCH_SIZE = env.int("DJANGO_FINDINGS_MICRO_BATCH_SIZE", default=3000)
# Controls how many rows each ORM bulk_create/bulk_update call sends to Postgres.
SCAN_DB_BATCH_SIZE = env.int("DJANGO_SCAN_DB_BATCH_SIZE", default=1000)
# Throttle scan progress persistence: minimum progress delta (fraction 0-1)
# between two persisted progress updates.
PROGRESS_THROTTLE_DELTA = env.float("DJANGO_SCAN_PROGRESS_THROTTLE_DELTA", default=0.01)
# Throttle scan progress persistence: maximum seconds without persisting progress
# regardless of delta (so slow checks still show progress in the UI).
PROGRESS_THROTTLE_SECONDS = env.float(
"DJANGO_SCAN_PROGRESS_THROTTLE_SECONDS", default=10.0
)
# Controls how many rows each ORM bulk_create/bulk_update call sends to Postgres
SCAN_DB_BATCH_SIZE = env.int("DJANGO_SCAN_DB_BATCH_SIZE", default=500)
ATTACK_SURFACE_PROVIDER_COMPATIBILITY = {
"internet-exposed": None, # Compatible with all providers
@@ -536,26 +528,16 @@ def _process_finding_micro_batch(
"""
# Accumulate objects for bulk operations
findings_to_create = []
mappings_to_create = []
dirty_resources = {}
resources_with_new_tag_mappings: set[str] = set()
resource_denormalized_data = [] # (finding_instance, resource_instance) pairs
tag_mappings_to_create: list[ResourceTagMapping] = []
skipped_findings_count = 0 # Track findings skipped due to UID length
# Separate findings into those persistable (uid <= 300) and over-limit.
# Resources/tags ARE still resolved for over-limit findings to preserve the
# original behavior (resources are persisted even when their finding is dropped).
non_null_findings = [f for f in findings_batch if f is not None]
persistable_findings = [f for f in non_null_findings if len(f.uid) <= 300]
skipped_findings_count = len(non_null_findings) - len(persistable_findings)
none_count = len(findings_batch) - len(non_null_findings)
if none_count:
logger.error(
f"{none_count} None finding(s) detected on scan {scan_instance.id}."
)
# Prefetch last statuses for all persistable findings in this batch (read replica)
finding_uids = [f.uid for f in persistable_findings]
# Prefetch last statuses for all findings in this batch
# TEMPORARY WORKAROUND: Filter out UIDs > 300 chars to avoid query errors
finding_uids = [
f.uid for f in findings_batch if f is not None and len(f.uid) <= 300
]
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
last_statuses = {
item["uid"]: (item["status"], item["first_seen_at"])
@@ -566,411 +548,281 @@ def _process_finding_micro_batch(
.order_by("uid", "-inserted_at")
.distinct("uid")
}
# Update cache
for uid, data in last_statuses.items():
if uid not in last_status_cache:
last_status_cache[uid] = data
# All DB writes for this micro-batch run inside ONE rls_transaction,
# with deadlock-retry at micro-batch granularity instead of per-finding.
for attempt in range(CELERY_DEADLOCK_ATTEMPTS):
try:
with rls_transaction(tenant_id):
# 1) Pre-resolve Resources in bulk
# Collect all uids referenced by this batch that are not in cache yet.
# NOTE: we intentionally include empty-string uids here. The SDK
# explicitly emits findings with `resource_uid=""` for some flows
# (IaC scans, some Azure/GCP/K8s checks). The original
# `get_or_create` behavior was to create/share a Resource with
# uid="" for these findings rather than dropping them. Preserve
# that behavior; do NOT filter by truthiness.
batch_resource_uids: set[str] = set()
for f in non_null_findings:
if f.resource_uid not in resource_cache:
batch_resource_uids.add(f.resource_uid)
# Process each finding in the batch
for finding in findings_batch:
if finding is None:
logger.error(f"None finding detected on scan {scan_instance.id}.")
continue
if batch_resource_uids:
existing_resources = {
r.uid: r
for r in Resource.objects.filter(
tenant_id=tenant_id,
provider_id=provider_instance.id,
uid__in=batch_resource_uids,
)
}
missing_uids = batch_resource_uids - existing_resources.keys()
if missing_uids:
# Build defaults from the first finding referencing each uid.
first_finding_per_uid: dict[str, ProwlerFinding] = {}
for f in non_null_findings:
if f.resource_uid in missing_uids:
first_finding_per_uid.setdefault(f.resource_uid, f)
resources_to_create = []
for uid in missing_uids:
f = first_finding_per_uid[uid]
check_metadata = f.get_metadata()
group = check_metadata.get("resourcegroup") or None
resources_to_create.append(
Resource(
tenant_id=tenant_id,
provider=provider_instance,
uid=uid,
region=f.region,
service=f.service_name,
type=f.resource_type,
name=f.resource_name,
groups=[group] if group else None,
)
)
Resource.objects.bulk_create(
resources_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "provider_id", "uid"],
)
# Re-fetch to obtain instances we just created AND any
# created concurrently by another scan against the same provider.
existing_resources.update(
{
r.uid: r
for r in Resource.objects.filter(
tenant_id=tenant_id,
provider_id=provider_instance.id,
uid__in=missing_uids,
)
}
)
for uid, r in existing_resources.items():
resource_cache[uid] = r
resource_failed_findings_cache.setdefault(uid, 0)
# 2) Pre-resolve ResourceTags in bulk
batch_tag_kv: set[tuple[str, str]] = set()
for f in non_null_findings:
for k, v in f.resource_tags.items():
if (k, v) not in tag_cache:
batch_tag_kv.add((k, v))
if batch_tag_kv:
keys_to_query = {k for k, _ in batch_tag_kv}
existing_tags = {
(t.key, t.value): t
for t in ResourceTag.objects.filter(
tenant_id=tenant_id, key__in=keys_to_query
)
if (t.key, t.value) in batch_tag_kv
}
missing_kv = batch_tag_kv - existing_tags.keys()
if missing_kv:
ResourceTag.objects.bulk_create(
[
ResourceTag(tenant_id=tenant_id, key=k, value=v)
for k, v in missing_kv
],
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "key", "value"],
)
existing_tags.update(
{
(t.key, t.value): t
for t in ResourceTag.objects.filter(
tenant_id=tenant_id,
key__in={k for k, _ in missing_kv},
)
if (t.key, t.value) in missing_kv
}
)
tag_cache.update(existing_tags)
# 3) Per-finding in-memory processing
for finding in non_null_findings:
# Process resource with deadlock retry
for attempt in range(CELERY_DEADLOCK_ATTEMPTS):
try:
with rls_transaction(tenant_id):
resource_uid = finding.resource_uid
resource_instance = resource_cache.get(resource_uid)
if resource_instance is None:
# Should be unreachable after the pre-resolve step. Defensive log.
logger.error(
f"Resource {resource_uid} missing from cache after pre-resolve "
f"on scan {scan_instance.id}; skipping finding."
)
continue
# Detect resource field changes (defer save until end-of-batch bulk_update).
check_metadata = finding.get_metadata()
group = check_metadata.get("resourcegroup") or None
updated = False
if finding.region and resource_instance.region != finding.region:
resource_instance.region = finding.region
updated = True
if resource_instance.service != finding.service_name:
resource_instance.service = finding.service_name
updated = True
if resource_instance.type != finding.resource_type:
resource_instance.type = finding.resource_type
updated = True
if resource_instance.metadata != finding.resource_metadata:
resource_instance.metadata = json.dumps(
finding.resource_metadata, cls=CustomEncoder
)
updated = True
if resource_instance.details != finding.resource_details:
resource_instance.details = finding.resource_details
updated = True
if resource_instance.partition != finding.partition:
resource_instance.partition = finding.partition
updated = True
if group and (
not resource_instance.groups
or group not in resource_instance.groups
):
resource_instance.groups = (resource_instance.groups or []) + [
group
]
updated = True
if updated:
dirty_resources[resource_uid] = resource_instance
# Accumulate ResourceTagMapping rows; bulk_create at end of block.
for k, v in finding.resource_tags.items():
tag_instance = tag_cache.get((k, v))
if tag_instance is None:
# Should not happen after pre-resolve; skip defensively.
continue
tag_mappings_to_create.append(
ResourceTagMapping(
tenant_id=tenant_id,
resource=resource_instance,
tag=tag_instance,
)
)
unique_resources.add(
(resource_instance.uid, resource_instance.region)
)
# TEMPORARY WORKAROUND: Skip findings with UID > 300 chars
# TODO: Remove this after implementing text field migration for finding.uid
if len(finding.uid) > 300:
logger.warning(
f"Skipping finding with UID exceeding 300 characters. "
f"Length: {len(finding.uid)}, "
f"Check: {finding.check_id}, "
f"Resource: {finding.resource_name}, "
f"UID: {finding.uid}"
)
continue
finding_uid = finding.uid
last_status, last_first_seen_at = last_status_cache.get(
finding_uid, (None, None)
)
status = FindingStatus[finding.status]
delta = _create_finding_delta(last_status, status)
if not last_first_seen_at:
last_first_seen_at = datetime.now(tz=timezone.utc)
# Determine if finding should be muted and why
# Priority: mutelist processor (highest) > manual mute rules
is_muted = False
muted_reason = None
if finding.muted:
is_muted = True
muted_reason = "Muted by mutelist"
elif finding_uid in mute_rules_cache:
is_muted = True
muted_reason = mute_rules_cache[finding_uid]
if status == FindingStatus.FAIL and not is_muted:
resource_failed_findings_cache[resource_uid] += 1
check_metadata["compliance"] = finding.compliance
finding_instance = Finding(
tenant_id=tenant_id,
uid=finding_uid,
delta=delta,
check_metadata=check_metadata,
status=status,
status_extended=finding.status_extended,
severity=finding.severity,
impact=finding.severity,
raw_result=finding.raw,
check_id=finding.check_id,
scan=scan_instance,
first_seen_at=last_first_seen_at,
muted=is_muted,
muted_at=datetime.now(tz=timezone.utc) if is_muted else None,
muted_reason=muted_reason,
compliance=finding.compliance,
categories=check_metadata.get("categories", []) or [],
resource_groups=check_metadata.get("resourcegroup") or None,
# Denormalized resource arrays populated directly on insert
# (was previously a separate bulk_update; saves a CASE WHEN
# over thousands of rows per micro-batch).
resource_regions=[resource_instance.region]
if resource_instance.region
else [],
resource_services=[resource_instance.service]
if resource_instance.service
else [],
resource_types=[resource_instance.type]
if resource_instance.type
else [],
)
findings_to_create.append(finding_instance)
resource_denormalized_data.append(
(finding_instance, resource_instance)
)
scan_resource_cache.add(
(
str(resource_instance.id),
resource_instance.service,
resource_instance.region,
resource_instance.type,
)
)
aggregate_category_counts(
categories=check_metadata.get("categories", []) or [],
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
cache=scan_categories_cache,
)
aggregate_resource_group_counts(
resource_group=check_metadata.get("resourcegroup") or None,
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
resource_uid=resource_instance.uid if resource_instance else "",
cache=scan_resource_groups_cache,
group_resources_cache=group_resources_cache,
)
# 4) Bulk create ResourceTagMappings
# Replaces the original per-resource `upsert_or_delete_tags`
# (which did one `update_or_create` + SELECT FOR UPDATE per mapping).
if tag_mappings_to_create:
# Pre-SELECT existing pairs: `bulk_create(ignore_conflicts=True)`
# does not populate `pk`, so we cannot tell new vs existing from
# the result; we need that to bump `updated_at` only on resources
# that actually gain a mapping.
candidate_resource_ids = {
m.resource_id for m in tag_mappings_to_create
}
candidate_tag_ids = {m.tag_id for m in tag_mappings_to_create}
existing_pairs = set(
ResourceTagMapping.objects.filter(
if resource_uid not in resource_cache:
check_metadata = finding.get_metadata()
group = check_metadata.get("resourcegroup") or None
resource_instance, _ = Resource.objects.get_or_create(
tenant_id=tenant_id,
resource_id__in=candidate_resource_ids,
tag_id__in=candidate_tag_ids,
).values_list("resource_id", "tag_id")
)
resource_uid_by_id = {
str(r.id): uid for uid, r in resource_cache.items()
}
for m in tag_mappings_to_create:
if (m.resource_id, m.tag_id) not in existing_pairs:
uid = resource_uid_by_id.get(str(m.resource_id))
if uid is not None:
resources_with_new_tag_mappings.add(uid)
ResourceTagMapping.objects.bulk_create(
tag_mappings_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "resource_id", "tag_id"],
)
# 5) Bulk create Findings
if findings_to_create:
Finding.objects.bulk_create(
findings_to_create, batch_size=SCAN_DB_BATCH_SIZE
)
# 6) Bulk create ResourceFindingMapping rows
mappings_to_create = [
ResourceFindingMapping(
tenant_id=tenant_id,
resource=resource_instance,
finding=finding_instance,
)
for finding_instance, resource_instance in resource_denormalized_data
]
if mappings_to_create:
created_mappings = ResourceFindingMapping.objects.bulk_create(
mappings_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "resource_id", "finding_id"],
)
inserted = sum(1 for m in created_mappings if m.pk)
if inserted != len(mappings_to_create):
logger.error(
f"scan {scan_instance.id}: expected "
f"{len(mappings_to_create)} ResourceFindingMapping rows, "
f"inserted {inserted}. Rolling back micro-batch."
provider=provider_instance,
uid=resource_uid,
defaults={
"region": finding.region,
"service": finding.service_name,
"type": finding.resource_type,
"name": finding.resource_name,
"groups": [group] if group else None,
},
)
resource_cache[resource_uid] = resource_instance
resource_failed_findings_cache[resource_uid] = 0
else:
resource_instance = resource_cache[resource_uid]
break
except (OperationalError, IntegrityError) as db_err:
if attempt < CELERY_DEADLOCK_ATTEMPTS - 1:
logger.warning(
f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} "
f"detected when processing resource {resource_uid} on scan {scan_instance.id}. Retrying..."
)
time.sleep(0.1 * (2**attempt))
continue
else:
raise db_err
# 7) Bulk update Resources
# Union of:
# - resources whose fields changed (dirty_resources)
# - resources that got new tag mappings (need updated_at bump,
# preserving the original `self.save(update_fields=["updated_at"])`
# behavior of `upsert_or_delete_tags`)
all_resource_uids_to_touch = (
set(dirty_resources.keys()) | resources_with_new_tag_mappings
# Track resource field changes (defer save)
updated = False
check_metadata = finding.get_metadata()
group = check_metadata.get("resourcegroup") or None
if finding.region and resource_instance.region != finding.region:
resource_instance.region = finding.region
updated = True
if resource_instance.service != finding.service_name:
resource_instance.service = finding.service_name
updated = True
if resource_instance.type != finding.resource_type:
resource_instance.type = finding.resource_type
updated = True
if resource_instance.metadata != finding.resource_metadata:
resource_instance.metadata = json.dumps(
finding.resource_metadata, cls=CustomEncoder
)
updated = True
if resource_instance.details != finding.resource_details:
resource_instance.details = finding.resource_details
updated = True
if resource_instance.partition != finding.partition:
resource_instance.partition = finding.partition
updated = True
if group and (
not resource_instance.groups or group not in resource_instance.groups
):
resource_instance.groups = (resource_instance.groups or []) + [group]
updated = True
if updated:
dirty_resources[resource_uid] = resource_instance
# Process tags
tags = []
with rls_transaction(tenant_id):
for key, value in finding.resource_tags.items():
tag_key = (key, value)
if tag_key not in tag_cache:
tag_instance, _ = ResourceTag.objects.get_or_create(
tenant_id=tenant_id, key=key, value=value
)
tag_cache[tag_key] = tag_instance
else:
tag_instance = tag_cache[tag_key]
tags.append(tag_instance)
resource_instance.upsert_or_delete_tags(tags=tags)
unique_resources.add((resource_instance.uid, resource_instance.region))
# Prepare finding data
finding_uid = finding.uid
# TEMPORARY WORKAROUND: Skip findings with UID > 300 chars
# TODO: Remove this after implementing text field migration for finding.uid
if len(finding_uid) > 300:
skipped_findings_count += 1
logger.warning(
f"Skipping finding with UID exceeding 300 characters. "
f"Length: {len(finding_uid)}, "
f"Check: {finding.check_id}, "
f"Resource: {finding.resource_name}, "
f"UID: {finding_uid}"
)
continue
last_status, last_first_seen_at = last_status_cache.get(
finding_uid, (None, None)
)
status = FindingStatus[finding.status]
delta = _create_finding_delta(last_status, status)
if not last_first_seen_at:
last_first_seen_at = datetime.now(tz=timezone.utc)
# Determine if finding should be muted and why
# Priority: mutelist processor (highest) > manual mute rules
is_muted = False
muted_reason = None
# Check mutelist processor first (highest priority)
if finding.muted:
is_muted = True
muted_reason = "Muted by mutelist"
# If not muted by mutelist, check manual mute rules
elif finding_uid in mute_rules_cache:
is_muted = True
muted_reason = mute_rules_cache[finding_uid]
# Increment failed_findings_count cache if needed
if status == FindingStatus.FAIL and not is_muted:
resource_failed_findings_cache[resource_uid] += 1
# Create finding object (don't save yet)
check_metadata = finding.get_metadata()
check_metadata["compliance"] = finding.compliance
finding_instance = Finding(
tenant_id=tenant_id,
uid=finding_uid,
delta=delta,
check_metadata=check_metadata,
status=status,
status_extended=finding.status_extended,
severity=finding.severity,
impact=finding.severity,
raw_result=finding.raw,
check_id=finding.check_id,
scan=scan_instance,
first_seen_at=last_first_seen_at,
muted=is_muted,
muted_at=datetime.now(tz=timezone.utc) if is_muted else None,
muted_reason=muted_reason,
compliance=finding.compliance,
categories=check_metadata.get("categories", []) or [],
resource_groups=check_metadata.get("resourcegroup") or None,
)
findings_to_create.append(finding_instance)
resource_denormalized_data.append((finding_instance, resource_instance))
# Track for scan summary
scan_resource_cache.add(
(
str(resource_instance.id),
resource_instance.service,
resource_instance.region,
resource_instance.type,
)
)
# Track categories with counts for ScanCategorySummary by (category, severity)
aggregate_category_counts(
categories=check_metadata.get("categories", []) or [],
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
cache=scan_categories_cache,
)
# Track resource groups with counts for ScanGroupSummary
aggregate_resource_group_counts(
resource_group=check_metadata.get("resourcegroup") or None,
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
resource_uid=resource_instance.uid if resource_instance else "",
cache=scan_resource_groups_cache,
group_resources_cache=group_resources_cache,
)
# Bulk operations within single transaction
with rls_transaction(tenant_id):
# Bulk create findings
if findings_to_create:
Finding.objects.bulk_create(
findings_to_create, batch_size=SCAN_DB_BATCH_SIZE
)
# Bulk create resource-finding mappings
for finding_instance, resource_instance in resource_denormalized_data:
mappings_to_create.append(
ResourceFindingMapping(
tenant_id=tenant_id,
resource=resource_instance,
finding=finding_instance,
)
if all_resource_uids_to_touch:
now_utc = datetime.now(tz=timezone.utc)
resources_to_bulk_update = []
for uid in all_resource_uids_to_touch:
# Use the instance from dirty_resources if present (has mutated
# fields), otherwise the cached one (for updated_at bump only).
r = dirty_resources.get(uid) or resource_cache.get(uid)
if r is None:
continue
# Manually bump updated_at since bulk_update bypasses auto_now.
r.updated_at = now_utc
resources_to_bulk_update.append(r)
if resources_to_bulk_update:
Resource.objects.bulk_update(
resources_to_bulk_update,
[
"metadata",
"details",
"partition",
"region",
"service",
"type",
"groups",
"updated_at",
],
batch_size=1000,
)
# Successful execution: leave deadlock retry loop.
break
except (OperationalError, IntegrityError) as db_err:
if attempt < CELERY_DEADLOCK_ATTEMPTS - 1:
logger.warning(
f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} "
f"on micro-batch for scan {scan_instance.id}. Retrying (attempt {attempt + 1})..."
)
if mappings_to_create:
created_mappings = ResourceFindingMapping.objects.bulk_create(
mappings_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "resource_id", "finding_id"],
)
inserted = sum(1 for m in created_mappings if m.pk)
if inserted != len(mappings_to_create):
logger.error(
f"scan {scan_instance.id}: expected "
f"{len(mappings_to_create)} ResourceFindingMapping rows, "
f"inserted {inserted}. Rolling back micro-batch."
)
time.sleep(0.1 * (2**attempt))
# Clear accumulators that we appended to inside the failed transaction
# so the retry produces consistent results.
findings_to_create.clear()
resource_denormalized_data.clear()
tag_mappings_to_create.clear()
dirty_resources.clear()
resources_with_new_tag_mappings.clear()
continue
raise
# Update finding denormalized arrays
findings_to_update = []
for finding_instance, resource_instance in resource_denormalized_data:
if not finding_instance.resource_regions:
finding_instance.resource_regions = []
if not finding_instance.resource_services:
finding_instance.resource_services = []
if not finding_instance.resource_types:
finding_instance.resource_types = []
if resource_instance.region not in finding_instance.resource_regions:
finding_instance.resource_regions.append(resource_instance.region)
if resource_instance.service not in finding_instance.resource_services:
finding_instance.resource_services.append(resource_instance.service)
if resource_instance.type not in finding_instance.resource_types:
finding_instance.resource_types.append(resource_instance.type)
findings_to_update.append(finding_instance)
if findings_to_update:
Finding.objects.bulk_update(
findings_to_update,
["resource_regions", "resource_services", "resource_types"],
batch_size=SCAN_DB_BATCH_SIZE,
)
# Bulk update dirty resources
if dirty_resources:
update_objects_in_batches(
tenant_id=tenant_id,
model=Resource,
objects=list(dirty_resources.values()),
fields=[
"metadata",
"details",
"partition",
"region",
"service",
"type",
"groups",
],
batch_size=1000,
)
# Log skipped findings summary
if skipped_findings_count > 0:
@@ -1021,7 +873,7 @@ def perform_prowler_scan(
scan_instance = Scan.objects.get(pk=scan_id)
scan_instance.state = StateChoices.EXECUTING
scan_instance.started_at = datetime.now(tz=timezone.utc)
scan_instance.save(update_fields=["state", "started_at", "updated_at"])
scan_instance.save()
# Find the mutelist processor if it exists
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
@@ -1066,13 +918,7 @@ def perform_prowler_scan(
provider_instance.connection_last_checked_at = datetime.now(
tz=timezone.utc
)
provider_instance.save(
update_fields=[
"connected",
"connection_last_checked_at",
"updated_at",
]
)
provider_instance.save()
# If the provider is not connected, raise an exception outside the transaction.
# If raised within the transaction, the transaction will be rolled back and the provider will not be marked
@@ -1087,13 +933,6 @@ def perform_prowler_scan(
last_status_cache = {}
resource_failed_findings_cache = defaultdict(int)
# Throttle scan_instance progress writes to avoid hammering the writer:
# only persist when progress moves by at least `PROGRESS_THROTTLE_DELTA`
# OR `PROGRESS_THROTTLE_SECONDS` have elapsed. The final progress (1.0)
# always persists in the `finally` block below.
last_persisted_progress = -1.0
last_persisted_progress_at = 0.0
for progress, findings in prowler_scan.scan():
# Process findings in micro-batches
findings_list = list(findings)
@@ -1120,20 +959,10 @@ def perform_prowler_scan(
group_resources_cache=group_resources_cache,
)
# Throttled progress save (the final save in the `finally` block
# below always runs regardless of throttle).
now = time.time()
progress_delta = progress - last_persisted_progress
elapsed = now - last_persisted_progress_at
if (
progress_delta >= PROGRESS_THROTTLE_DELTA
or elapsed >= PROGRESS_THROTTLE_SECONDS
):
with rls_transaction(tenant_id):
scan_instance.progress = progress
scan_instance.save(update_fields=["progress", "updated_at"])
last_persisted_progress = progress
last_persisted_progress_at = now
# Update scan progress
with rls_transaction(tenant_id):
scan_instance.progress = progress
scan_instance.save()
scan_instance.state = StateChoices.COMPLETED
@@ -1147,16 +976,13 @@ def perform_prowler_scan(
resources_to_update.append(resource_instance)
if resources_to_update:
# Single rls_transaction wrapping the bulk_update (previously
# `update_objects_in_batches` opened one rls_transaction per
# chunk; for tenants with many resources this collapsed N
# BEGINs/COMMITs into 1).
with rls_transaction(tenant_id):
Resource.objects.bulk_update(
resources_to_update,
["failed_findings_count"],
batch_size=SCAN_DB_BATCH_SIZE,
)
update_objects_in_batches(
tenant_id=tenant_id,
model=Resource,
objects=resources_to_update,
fields=["failed_findings_count"],
batch_size=1000,
)
except Exception as e:
logger.error(f"Error performing scan {scan_id}: {e}")
@@ -1168,16 +994,7 @@ def perform_prowler_scan(
scan_instance.duration = time.time() - start_time
scan_instance.completed_at = datetime.now(tz=timezone.utc)
scan_instance.unique_resource_count = len(unique_resources)
scan_instance.save(
update_fields=[
"state",
"duration",
"completed_at",
"unique_resource_count",
"progress",
"updated_at",
]
)
scan_instance.save()
if exception is not None:
raise exception
Generated
+5 -4
View File
@@ -4410,8 +4410,8 @@ wheels = [
[[package]]
name = "prowler"
version = "5.27.0"
source = { git = "https://github.com/prowler-cloud/prowler.git?rev=master#0abbb7fc590eaf7de6ed354dd5a217bca261d2b0" }
version = "5.28.0"
source = { git = "https://github.com/prowler-cloud/prowler.git?rev=v5.28#3a096b17504fe8f3f743fdc44148d35b9723df92" }
dependencies = [
{ name = "alibabacloud-actiontrail20200706" },
{ name = "alibabacloud-credentials" },
@@ -4484,6 +4484,7 @@ dependencies = [
{ name = "pygithub" },
{ name = "python-dateutil" },
{ name = "pytz" },
{ name = "scaleway" },
{ name = "schema" },
{ name = "shodan" },
{ name = "slack-sdk" },
@@ -4494,7 +4495,7 @@ dependencies = [
[[package]]
name = "prowler-api"
version = "1.30.0"
version = "1.29.2"
source = { virtual = "." }
dependencies = [
{ name = "cartography" },
@@ -4590,7 +4591,7 @@ requires-dist = [
{ name = "matplotlib", specifier = "==3.10.8" },
{ name = "neo4j", specifier = "==6.1.0" },
{ name = "openai", specifier = "==1.109.1" },
{ name = "prowler", git = "https://github.com/prowler-cloud/prowler.git?rev=master" },
{ name = "prowler", git = "https://github.com/prowler-cloud/prowler.git?rev=v5.28" },
{ name = "psycopg2-binary", specifier = "==2.9.9" },
{ name = "pytest-celery", extras = ["redis"], specifier = "==1.3.0" },
{ name = "reportlab", specifier = "==4.4.10" },
@@ -118,8 +118,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.28.0"
PROWLER_API_VERSION="5.28.0"
PROWLER_UI_VERSION="5.27.0"
PROWLER_API_VERSION="5.27.0"
```
<Note>
@@ -166,7 +166,6 @@ The following list includes all the Okta checks with configurable variables that
| Check Name | Value | Type |
|---------------------------------------------------------------|------------------------------------|---------|
| `application_admin_console_session_idle_timeout_15min` | `okta_admin_console_idle_timeout_max_minutes` | Integer |
| `signon_global_session_idle_timeout_15min` | `okta_max_session_idle_minutes` | Integer |
## Config YAML File Structure
@@ -30,49 +30,25 @@ If a different authentication method is needed (SSWS API token, OAuth with user
### Required OAuth Scopes
The bundled checks require the following read-only scopes:
The bundled signon checks require the following read-only scopes:
- `okta.policies.read`
- `okta.brands.read`
- `okta.apps.read`
Additional scopes will be needed as more services and checks are added. These are the current ones needed:
| Scope | Used by |
|---|---|
| `okta.policies.read` | Sign-on, password, and authentication policies |
| `okta.policies.read` | Sign-on / password / authentication policies |
| `okta.brands.read` | Sign-in page customizations (DOD Notice and Consent Banner check) |
| `okta.apps.read` | First-party app settings (Okta Admin Console session), integrated app inventory, and the Authentication Policies bound to Okta applications |
### Required Admin Role
The service application must be assigned **one** of the following Okta admin roles:
The service application must be assigned the built-in **Read-Only Administrator** role.
- **Read-Only Administrator** — covers every `signon` check and runs `application_authentication_policy_network_zone_enforced` against the apps it can see. **Visibility caveat:** under Read-Only Administrator the `/api/v1/apps` endpoint returns only the apps the service application is itself assigned to — typically just the service app's own row (for example, `Prowler Scanner`). The check still produces a finding for that app, but the rest of the org's app inventory is invisible at this role level.
- **Super Administrator** — required additionally to evaluate five application-service checks that target Okta's first-party apps (Okta Admin Console, Okta Dashboard). With Super Administrator, `application_authentication_policy_network_zone_enforced` also evaluates the full org-wide app inventory instead of the service-app-only slice.
Okta's Management API enforces a two-layer authorization model: an OAuth **scope** decides which API endpoints the token can call, and an **admin role** decides whether the call returns data. With only a scope granted, the token mint succeeds but every read returns `403 Forbidden`. The Read-Only Administrator role is the minimum that lets the granted `okta.*.read` scopes actually return configuration data to Prowler's checks — without it, the credential probe at provider startup fails and the scan never gets to evaluate any check.
Okta's Management API enforces a two-layer authorization model: an OAuth **scope** decides which API endpoints the token can call, and an **admin role** decides whether the call returns data. With only a scope granted, the token mint succeeds but every read returns `403 Forbidden`. Read-Only Administrator is the minimum role that lets the granted `okta.*.read` scopes return configuration data to Prowler's checks; without it, the credential probe at provider startup fails and the scan never gets to evaluate any check.
#### When Super Administrator is required
Four checks need to resolve the Authentication Policy bound to Okta's first-party apps (Okta Admin Console, Okta Dashboard) and depend on `/api/v1/apps` returning those system apps — which Okta restricts to Super Administrator:
| Check | STIG |
|---|---|
| `application_admin_console_mfa_required` | V-273193 |
| `application_admin_console_phishing_resistant_authentication` | V-273191 |
| `application_dashboard_mfa_required` | V-273194 |
| `application_dashboard_phishing_resistant_authentication` | V-273190 |
Okta filters the first-party apps (`saasure`, `okta_enduser`) out of `/api/v1/apps` for every role below Super Administrator, so `okta.apps.read` alone is not enough. The `okta.apps.manageFirstPartyApps` permission exists only in the paid Okta Identity Governance role `ACCESS_REQUESTS_ADMIN` and cannot be added to custom roles ([Okta Permissions Catalog](https://developer.okta.com/docs/api/openapi/okta-management/guides/permissions)).
A fifth check — `application_admin_console_session_idle_timeout_15min` (STIG V-273187) — also requires Super Administrator: it calls `GET /api/v1/first-party-app-settings/admin-console`, which returns `403 E0000006` for every role below Super Administrator.
When the service app runs with Read-Only Administrator, the five checks listed in this section return **MANUAL** instead of PASS/FAIL — the rest of the scan keeps running.
<Note>
Read-Only Administrator stays the recommended default for the least-privilege framing that aligns with DISA STIG. Assign Super Administrator on a separate run when full coverage of the first-party app checks is needed.
</Note>
Read-Only Administrator is intentionally the narrowest role that satisfies this requirement and aligns with the least-privilege guidance in DISA STIG.
## Step-by-Step Setup
@@ -122,21 +98,19 @@ Okta displays the private key **only once**. If you close the modal without copy
### 5. Grant the required OAuth scopes
On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled checks require `okta.policies.read`, `okta.brands.read`, and `okta.apps.read`.
On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled signon checks require `okta.policies.read` and `okta.brands.read`.
![Okta — grant OAuth scopes](/user-guide/providers/okta/images/grant-permissions.png)
### 6. Assign an admin role
### 6. Assign the Read-Only Administrator role
On the app, open the **Admin roles** tab and click **Edit assignments → Add assignment**:
- **Role:** Read-Only Administrator (default) — covers every `signon` check and runs the per-app network-zone check against the apps the service app can see (typically only the service app's own row).
- **Role:** Read-Only Administrator
- **Resources:** All resources
Save the changes.
To additionally evaluate the first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, and phishing-resistant authentication) and to widen the per-app network-zone check to the full org-wide app inventory, assign **Super Administrator** instead. Without Super Administrator, the five first-party checks return MANUAL and the network-zone check is limited to the service app's own visibility — the rest of the scan still runs. See [Required Admin Role](#required-admin-role) for the full breakdown.
![Okta — grant Read-Only role](/user-guide/providers/okta/images/grant-roles.png)
### 7. [Optional] Verify DPoP setting
@@ -158,8 +132,8 @@ export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem"
# or
export OKTA_PRIVATE_KEY="$(cat /secure/path/to/prowler-okta.pem)"
# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read"
# Optional — defaults to "okta.policies.read,okta.brands.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read"
uv run python prowler-cli.py okta
```
@@ -200,12 +174,8 @@ Prowler validates credentials at startup by listing one sign-on policy. This err
Raised when the credential probe succeeds at the OAuth layer but the request is rejected because the service app lacks the required scope or admin role:
- **`invalid_scope`** — one of the requested scopes (`okta.policies.read`, `okta.brands.read`, or `okta.apps.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**.
- **`Forbidden` / `not authorized`** — no admin role is assigned to the service app. Assign **Read-Only Administrator** (or **Super Administrator** for the first-party application checks) from **Admin roles**.
### Application-service checks return MANUAL on first-party apps
When the service app runs with Read-Only Administrator, the five application-service checks targeting the Okta Admin Console and Okta Dashboard return MANUAL. This is by design — Okta restricts the underlying endpoints (`/api/v1/first-party-app-settings/{appName}` and `/api/v1/apps` for first-party app `name` values `saasure` / `okta_enduser`) to **Super Administrator**. Assign the Super Administrator role to the service app to evaluate those checks. See [Required Admin Role](#required-admin-role) for the full list.
- **`invalid_scope`** — one of the requested scopes (`okta.policies.read` or `okta.brands.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**.
- **`Forbidden` / `not authorized`** — the **Read-Only Administrator** role is not assigned to the service app. Assign it from **Admin roles**.
### `invalid_dpop_proof`
@@ -12,7 +12,7 @@ Set up authentication for Okta with the [Okta Authentication](/user-guide/provid
- An Okta organization. The UI examples below use **Identity Engine** terminology such as **Global Session Policy**; Classic Engine exposes the equivalent sign-on policy concepts under older names.
- A **Super Administrator** account on that organization for the one-time service-app setup.
- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read`, `okta.brands.read`, and `okta.apps.read` scopes granted and an admin role assigned. **Read-Only Administrator** covers every `signon` check and runs the per-app network-zone check against the apps the service app can see (under Read-Only Administrator that is typically only the service app's own row — the rest of the org's app inventory stays invisible). **Super Administrator** is required additionally to evaluate the five first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, phishing-resistant authentication) and to widen the network-zone check to the full app inventory — see [Okta Authentication](/user-guide/providers/okta/authentication#required-admin-role) for the full breakdown.
- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read` and `okta.brands.read` scopes granted and the **Read-Only Administrator** role assigned.
- Python 3.10+ and Prowler 5.27.0 or later installed locally.
<CardGroup cols={2}>
@@ -85,8 +85,8 @@ Follow the [Okta Authentication](/user-guide/providers/okta/authentication) guid
export OKTA_ORG_DOMAIN="acme.okta.com"
export OKTA_CLIENT_ID="0oa1234567890abcdef"
export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem"
# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read"
# Optional — defaults to "okta.policies.read,okta.brands.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read"
```
The private key file may contain either a PEM-encoded RSA key or a JWK JSON document.
@@ -128,9 +128,6 @@ okta:
# okta.signon_global_session_idle_timeout_15min
# Defaults to 15 minutes per DISA STIG V-273186.
okta_max_session_idle_minutes: 15
# okta.application_admin_console_session_idle_timeout_15min
# Defaults to 15 minutes per DISA STIG V-273187.
okta_admin_console_idle_timeout_max_minutes: 15
```
To use a custom configuration:
@@ -143,10 +140,9 @@ prowler okta --config-file /path/to/config.yaml
Prowler for Okta includes security checks across the following services:
| Service | Description |
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) |
| **Application** | Okta Admin Console sign-on settings plus Authentication Policy controls for Okta applications (session idle, MFA, phishing resistance, network zones) |
| Service | Description |
| ----------- | ----------------------------------------------------------------------------------- |
| **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) |
## Troubleshooting
@@ -158,11 +154,10 @@ This is stricter than simply finding the same timeout value somewhere else in th
### Default Scopes
Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On and Application services:
Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover the bundled signon checks:
- `okta.policies.read`
- `okta.brands.read`
- `okta.apps.read`
The service app must have these scopes granted in the **Okta API Scopes** tab. When the granted set is narrower than the requested set, the token request fails with an `invalid_scope` error and the scan stops at provider initialization.
-15
View File
@@ -2,21 +2,6 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.29.0] (Prowler UNRELEASED)
### 🚀 Added
- `application` service for Okta provider with `application_admin_console_session_idle_timeout_15min`, `application_admin_console_mfa_required`, `application_admin_console_phishing_resistant_authentication`, `application_dashboard_mfa_required`, `application_dashboard_phishing_resistant_authentication`, and `application_authentication_policy_network_zone_enforced` checks [(#11358)](https://github.com/prowler-cloud/prowler/pull/11358)
- AWS AI Security Framework compliance for AWS provider [(#11353)](https://github.com/prowler-cloud/prowler/pull/11353)
- `storage_account_public_network_access_disabled` check for Azure provider and remapped the Azure CIS "Public Network Access is Disabled" requirements to it [(#11334)](https://github.com/prowler-cloud/prowler/pull/11334)
### 🐞 Fixed
- ENS RD 311/2022 (AWS) compliance mapping: `vpc_different_regions` was uncorrectly mapped under the `mp.com.4` family (Network segregation). That check is now mapped to a new `op.cont.2.aws.vpc.1` requirement under the Continuity of Service control [(#11372)](https://github.com/prowler-cloud/prowler/pull/11372)
- Compliance CSV row count now matches the UI per requirement by sourcing rows from the framework JSON's `requirement.Checks` instead of the stale `finding.compliance` snapshot [(#11370)](https://github.com/prowler-cloud/prowler/pull/11370)
---
## [5.28.1] (Prowler 5.28.1)
### 🐞 Fixed
File diff suppressed because it is too large Load Diff
+4 -26
View File
@@ -2539,7 +2539,8 @@
}
],
"Checks": [
"vpc_subnet_separate_private_public"
"vpc_subnet_separate_private_public",
"vpc_different_regions"
]
},
{
@@ -2592,8 +2593,8 @@
}
],
"Checks": [
"vpc_different_regions",
"vpc_subnet_different_az"
"vpc_subnet_different_az",
"vpc_different_regions"
]
},
{
@@ -4261,29 +4262,6 @@
],
"Checks": []
},
{
"Id": "op.cont.2.aws.vpc.1",
"Description": "Plan de continuidad",
"Attributes": [
{
"IdGrupoControl": "op.cont.2",
"Marco": "operacional",
"Categoria": "continuidad del servicio",
"DescripcionControl": "Distribución de las VPCs entre múltiples regiones y zonas de disponibilidad de AWS para garantizar la continuidad del servicio ante fallos regionales o zonales.",
"Nivel": "alto",
"Tipo": "requisito",
"Dimensiones": [
"disponibilidad"
],
"ModoEjecucion": "automático",
"Dependencias": []
}
],
"Checks": [
"vpc_different_regions",
"vpc_subnet_different_az"
]
},
{
"Id": "op.cont.3.aws.drs.1",
"Description": "Pruebas periódicas",
+1 -1
View File
@@ -1383,7 +1383,7 @@
"Id": "3.7",
"Description": "Ensure that 'Public Network Access' is `Disabled' for storage accounts",
"Checks": [
"storage_account_public_network_access_disabled"
"storage_blob_public_access_level_is_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -1651,7 +1651,7 @@
"Id": "4.6",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_account_public_network_access_disabled"
"storage_blob_public_access_level_is_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -3021,7 +3021,7 @@
"Id": "10.3.2.2",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_account_public_network_access_disabled"
"storage_blob_public_access_level_is_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -3182,7 +3182,7 @@
"Id": "9.3.2.2",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_account_public_network_access_disabled"
"storage_blob_public_access_level_is_disabled"
],
"Attributes": [
{
@@ -459,7 +459,7 @@
"Id": "2.2.6",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_account_public_network_access_disabled"
"storage_blob_public_access_level_is_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -48,7 +48,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.29.0"
prowler_version = "5.28.2"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
-6
View File
@@ -669,9 +669,3 @@ okta:
# 15 per DISA STIG V-273186 (OKTA-APP-000020); raise it only with an
# explicit risk acceptance.
okta_max_session_idle_minutes: 15
# Okta Applications
# okta.application_admin_console_session_idle_timeout_15min
# Maximum acceptable Okta Admin Console app idle timeout, in minutes.
# Defaults to 15 per DISA STIG V-273187 (OKTA-APP-000025); raise it only
# with an explicit risk acceptance.
okta_admin_console_idle_timeout_max_minutes: 15
@@ -37,9 +37,9 @@ class ASDEssentialEightAWS(ComplianceOutput):
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ASDEssentialEightAWSModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class AWSWellArchitected(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSWellArchitectedModel(
Provider=finding.provider,
+3 -2
View File
@@ -35,9 +35,10 @@ class AWSC5(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSC5Model(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AzureC5(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureC5Model(
Provider=finding.provider,
+3 -2
View File
@@ -35,9 +35,10 @@ class GCPC5(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPC5Model(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class CCC_AWS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = CCC_AWSModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class CCC_Azure(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = CCC_AzureModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class CCC_GCP(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = CCC_GCPModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AlibabaCloudCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AlibabaCloudCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AWSCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AzureCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GCPCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GithubCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GithubCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GoogleWorkspaceCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GoogleWorkspaceCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class KubernetesCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = KubernetesCISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class M365CIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = M365CISModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class OracleCloudCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = OracleCloudCISModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class GoogleWorkspaceCISASCuBA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GoogleWorkspaceCISASCuBAModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AlibabaCloudCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AlibabaCloudCSAModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AWSCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSCSAModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AzureCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureCSAModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GCPCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPCSAModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class OracleCloudCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = OracleCloudCSAModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AWSENS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSENSModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AzureENS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureENSModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GCPENS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPENSModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GenericCompliance(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GenericComplianceModel(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AWSISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AzureISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AzureISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class GCPISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = GCPISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class KubernetesISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = KubernetesISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,9 @@ class M365ISO27001(ComplianceOutput):
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = M365ISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,9 @@ class NHNISO27001(ComplianceOutput):
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = NHNISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,10 @@ class AWSKISAISMSP(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = AWSKISAISMSPModel(
Provider=finding.provider,
@@ -36,9 +36,10 @@ class AWSMitreAttack(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
compliance_row = AWSMitreAttackModel(
Provider=finding.provider,
Description=compliance.Description,
@@ -36,9 +36,10 @@ class AzureMitreAttack(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
compliance_row = AzureMitreAttackModel(
Provider=finding.provider,
Description=compliance.Description,
@@ -36,9 +36,10 @@ class GCPMitreAttack(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
compliance_row = GCPMitreAttackModel(
Provider=finding.provider,
Description=compliance.Description,
@@ -37,9 +37,10 @@ class ProwlerThreatScoreAlibaba(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAlibabaModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class ProwlerThreatScoreAWS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAWSModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class ProwlerThreatScoreAzure(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAzureModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class ProwlerThreatScoreGCP(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreGCPModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class ProwlerThreatScoreKubernetes(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreKubernetesModel(
Provider=finding.provider,
@@ -37,9 +37,10 @@ class ProwlerThreatScoreM365(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
if requirement.Id in finding_requirements:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreM365Model(
Provider=finding.provider,
@@ -482,7 +482,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -507,9 +506,7 @@
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -1234,21 +1231,6 @@
"aws-us-gov": []
}
},
"aws-devops-agent": {
"regions": {
"aws": [
"ap-northeast-1",
"ap-southeast-2",
"eu-central-1",
"eu-west-1",
"us-east-1",
"us-west-2"
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": []
}
},
"awshealthdashboard": {
"regions": {
"aws": [
@@ -1608,7 +1590,6 @@
"ap-southeast-2",
"ca-central-1",
"eu-central-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"us-east-1",
@@ -2212,7 +2193,6 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -2275,8 +2255,6 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -2466,9 +2444,6 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-central-2",
@@ -2614,9 +2589,7 @@
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -2823,7 +2796,6 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -2834,7 +2806,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -3796,7 +3767,6 @@
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -3859,9 +3829,7 @@
"us-west-2"
],
"aws-cn": [],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -3922,22 +3890,17 @@
"dsql": {
"regions": {
"aws": [
"ap-east-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-4",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-north-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -4718,19 +4681,15 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -4744,7 +4703,6 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -5302,7 +5260,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5353,7 +5310,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5404,7 +5360,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5455,7 +5410,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5506,7 +5460,6 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -6081,7 +6034,6 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -6129,7 +6081,6 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -7730,8 +7681,6 @@
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-north-1",
@@ -8116,7 +8065,6 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -8127,10 +8075,8 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -8142,7 +8088,6 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -8334,31 +8279,22 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
"eu-south-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"il-central-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -8379,7 +8315,6 @@
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
@@ -8639,7 +8574,6 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -8772,19 +8706,13 @@
"aws": [
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-2",
"ap-southeast-4",
"ca-central-1",
"eu-central-1",
"eu-central-2",
"eu-south-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -9106,7 +9034,6 @@
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -9209,14 +9136,11 @@
"regions": {
"aws": [
"ap-northeast-1",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"eu-west-3",
@@ -9453,7 +9377,6 @@
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-central-2",
@@ -9472,9 +9395,7 @@
"aws-cn": [
"cn-northwest-1"
],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-west-1"
]
@@ -9944,12 +9865,10 @@
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -10093,10 +10012,7 @@
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
]
"aws-us-gov": []
}
},
"resource-groups": {
@@ -10777,10 +10693,7 @@
"us-west-1",
"us-west-2"
],
"aws-cn": [
"cn-north-1",
"cn-northwest-1"
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": []
}
@@ -11402,9 +11315,7 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-6",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -11736,6 +11647,26 @@
]
}
},
"simspaceweaver": {
"regions": {
"aws": [
"ap-southeast-1",
"ap-southeast-2",
"eu-central-1",
"eu-north-1",
"eu-west-1",
"us-east-1",
"us-east-2",
"us-west-2"
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
]
}
},
"sms": {
"regions": {
"aws": [
@@ -13132,7 +13063,6 @@
"eu-west-3",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -13484,7 +13414,6 @@
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ca-central-1",
"eu-central-1",
"eu-west-1",
@@ -13493,7 +13422,6 @@
"il-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
],
"aws-cn": [
@@ -1,37 +0,0 @@
{
"Provider": "azure",
"CheckID": "storage_account_public_network_access_disabled",
"CheckTitle": "Storage account has 'Public Network Access' disabled",
"CheckType": [],
"ServiceName": "storage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "microsoft.storage/storageaccounts",
"ResourceGroup": "storage",
"Description": "**Azure Storage accounts** with **public network access** disabled cannot be reached from public networks. Setting `publicNetworkAccess` to `Disabled` overrides the public access settings of individual containers and forces access through private endpoints or trusted services. This is independent from the 'Allow Blob Anonymous Access' setting.",
"Risk": "Leaving **public network access** enabled exposes the storage account endpoints to the **public Internet**, widening the attack surface and undermining **defense in depth**.\n\nThis increases the risk of **unauthorized access**, **data exfiltration**, and reconnaissance against the account, especially when combined with weak network rules or overly permissive access policies.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/azure/storage/common/storage-network-security?tabs=azure-portal#change-the-default-network-access-rule",
"https://learn.microsoft.com/en-us/azure/storage/common/storage-network-security-set-default-access"
],
"Remediation": {
"Code": {
"CLI": "az storage account update --name <storage-account> --resource-group <resource-group> --public-network-access Disabled",
"NativeIaC": "```bicep\n// Storage account with public network access disabled\nresource sa 'Microsoft.Storage/storageAccounts@2023-01-01' = {\n name: '<example_resource_name>'\n location: resourceGroup().location\n kind: 'StorageV2'\n sku: { name: 'Standard_LRS' }\n properties: {\n publicNetworkAccess: 'Disabled' // Critical: disables public network access to the account\n }\n}\n```",
"Other": "1. In the Azure portal, go to Storage accounts and select the target account\n2. Under Security + networking, click Networking\n3. Set Public network access to Disabled\n4. Click Save",
"Terraform": "```hcl\nresource \"azurerm_storage_account\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n resource_group_name = \"<example_resource_name>\"\n location = \"<example_location>\"\n account_tier = \"Standard\"\n account_replication_type = \"LRS\"\n public_network_access_enabled = false # Critical: disables public network access\n}\n```"
},
"Recommendation": {
"Text": "Disable **public network access** on the storage account and reach it through **private endpoints** or trusted Azure services only. Combine this with **least privilege** RBAC, short-lived `SAS` tokens, and network restrictions. Validate client connectivity before disabling public access in production.",
"Url": "https://hub.prowler.com/check/storage_account_public_network_access_disabled"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check evaluates the storage account publicNetworkAccess property. It is independent from the 'Allow Blob Anonymous Access' setting evaluated by storage_blob_public_access_level_is_disabled."
}
@@ -1,38 +0,0 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.storage.storage_client import storage_client
class storage_account_public_network_access_disabled(Check):
"""
Ensure that 'Public Network Access' is 'Disabled' for storage accounts.
This check evaluates the storage account's publicNetworkAccess property, which controls
whether the account is reachable from public networks. It is independent from the
'Allow Blob Anonymous Access' setting (covered by
storage_blob_public_access_level_is_disabled).
- PASS: The storage account has public network access disabled.
- FAIL: The storage account has public network access enabled (or unset, which Azure treats as enabled).
"""
def execute(self) -> list[Check_Report_Azure]:
findings = []
for subscription, storage_accounts in storage_client.storage_accounts.items():
subscription_name = storage_client.subscriptions.get(
subscription, subscription
)
for storage_account in storage_accounts:
report = Check_Report_Azure(
metadata=self.metadata(), resource=storage_account
)
report.subscription = subscription
if storage_account.public_network_access == "Disabled":
report.status = "PASS"
report.status_extended = f"Storage account {storage_account.name} from subscription {subscription_name} ({subscription}) has public network access disabled."
else:
report.status = "FAIL"
report.status_extended = f"Storage account {storage_account.name} from subscription {subscription_name} ({subscription}) has public network access enabled."
findings.append(report)
return findings
@@ -1,7 +1,7 @@
{
"Provider": "azure",
"CheckID": "storage_blob_public_access_level_is_disabled",
"CheckTitle": "Storage account has 'Allow Blob Anonymous Access' disabled",
"CheckTitle": "Storage account has 'Allow blob public access' disabled",
"CheckType": [],
"ServiceName": "storage",
"SubServiceName": "",
@@ -9,7 +9,7 @@
"Severity": "high",
"ResourceType": "microsoft.storage/storageaccounts",
"ResourceGroup": "storage",
"Description": "**Azure Storage accounts** with **blob anonymous (public) access** disabled prevent containers or blobs from being set to a public access level. Setting `allowBlobPublicAccess` to `false` enforces no anonymous reads across the account. This is independent from the account's 'Public Network Access' setting, which is evaluated by storage_account_public_network_access_disabled.",
"Description": "**Azure Storage accounts** with **blob public access** disabled prevent containers or blobs from being set to a public access level. Setting `allow blob public access` to `false` enforces no anonymous reads across the account.",
"Risk": "Allowing public access permits unauthenticated users to read blob data or enumerate container contents when any container is made public, compromising confidentiality.\n\nExposed objects can be scraped at scale, enabling data exfiltration and intelligence gathering without audit attribution.",
"RelatedUrl": "",
"AdditionalURLs": [
@@ -33,5 +33,5 @@
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check evaluates the 'Allow Blob Anonymous Access' (allowBlobPublicAccess) setting. The account's 'Public Network Access' (publicNetworkAccess) setting is evaluated by storage_account_public_network_access_disabled."
"Notes": ""
}
@@ -42,9 +42,6 @@ class Storage(AzureService):
enable_https_traffic_only=storage_account.enable_https_traffic_only,
infrastructure_encryption=storage_account.encryption.require_infrastructure_encryption,
allow_blob_public_access=storage_account.allow_blob_public_access,
public_network_access=getattr(
storage_account, "public_network_access", None
),
network_rule_set=NetworkRuleSet(
bypass=getattr(
storage_account.network_rule_set,
@@ -304,7 +301,6 @@ class Account(BaseModel):
enable_https_traffic_only: bool
infrastructure_encryption: Optional[bool] = None
allow_blob_public_access: bool
public_network_access: Optional[str] = None
network_rule_set: NetworkRuleSet
encryption_type: str
minimum_tls_version: str
@@ -35,7 +35,7 @@ def init_parser(self):
nargs="+",
help=(
"OAuth scopes to request, space-separated "
"(e.g. okta.policies.read okta.brands.read okta.apps.read). "
"(e.g. okta.policies.read okta.brands.read okta.users.read). "
"Defaults to the read scopes required by the bundled checks."
),
default=None,
+1 -1
View File
@@ -32,7 +32,7 @@ from prowler.providers.okta.exceptions.exceptions import (
from prowler.providers.okta.lib.mutelist.mutelist import OktaMutelist
from prowler.providers.okta.models import OktaIdentityInfo, OktaSession
DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read", "okta.apps.read"]
DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read"]
# Accept only Okta-managed domains. Custom (vanity) domains are rejected on
# purpose — they're a recurring source of typos and silent misconfig and
# Prowler's audience overwhelmingly uses Okta-managed hosts. The TLDs below
@@ -1,37 +0,0 @@
{
"Provider": "okta",
"CheckID": "application_admin_console_mfa_required",
"CheckTitle": "Okta Admin Console authentication policy enforces multifactor authentication",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Admin Console** app must require MFA. On its top active rule, *User must authenticate with* must be set to `Password / IdP + Another factor` or `Any 2 factor types` (`factorMode=2FA` in the API).",
"Risk": "Single-factor access to the Okta control plane is the highest-impact identity risk in the tenant.\n\n- **Credential compromise** is enough to take over every administrator account\n- **Lateral movement** into every downstream SaaS that trusts Okta SSO\n- **Privileged configuration changes** with no second-factor barrier",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/policies/about-app-sign-on-policies.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Admin Console** policy.\n4. Edit the top active rule.\n5. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require MFA on the top active rule of the Okta Admin Console authentication policy. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.",
"Url": "https://hub.prowler.com/check/application_admin_console_mfa_required"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273193 / OKTA-APP-000560."
}
@@ -1,89 +0,0 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
ADMIN_CONSOLE_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
ADMIN_CONSOLE_LABEL_HINT = "Okta Admin Console"
class application_admin_console_mfa_required(Check):
"""STIG V-273193 / OKTA-APP-000560.
The Authentication Policy bound to the Okta Admin Console app must
require multifactor authentication on its top rule: `User must
authenticate with` set to `Password / IdP + Another factor` or
`Any 2 factor types`.
The underlying SDK exposes this as `AssuranceMethod.factor_mode`
with values `1FA` / `2FA`.
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
ADMIN_CONSOLE_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(ADMIN_CONSOLE_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(
self.metadata(), org_domain, ADMIN_CONSOLE_LABEL_HINT
)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must set `User must authenticate with` to "
"`Password / IdP + Another factor` or `Any 2 factor types`."
)
elif rule.factor_mode == "2FA":
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"multifactor authentication (`factorMode=2FA`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
f"enforce multifactor authentication "
f"(`factorMode={rule.factor_mode or 'unset'}`). "
"Set `User must authenticate with` to `Password / IdP + Another "
"factor` or `Any 2 factor types`."
)
findings.append(report)
return findings
@@ -1,37 +0,0 @@
{
"Provider": "okta",
"CheckID": "application_admin_console_phishing_resistant_authentication",
"CheckTitle": "Okta Admin Console authentication policy enforces phishing-resistant factors",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Admin Console** app must restrict possession factors to phishing-resistant authenticators (FIDO2/WebAuthn, PIV/CAC, Okta FastPass with biometrics). On the top active rule, *Possession factor constraints are: Phishing resistant* must be checked (`possession.phishingResistant=REQUIRED`).",
"Risk": "Phishable possession factors (SMS, voice, standard push, OTP delivered via reverse-proxy AiTM) leave the most privileged surface of the IdP exposed.\n\n- **Credential phishing** against administrators succeeds despite MFA\n- **Adversary-in-the-Middle attacks** capture session tokens through fake login pages\n- **Account takeover** of the tenant control plane",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/authenticators/phishing-resistant-auth.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Admin Console** policy.\n4. Edit the top active rule.\n5. Under *Possession factor constraints are*, check **Phishing resistant**.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require phishing-resistant possession factors on the top active rule of the Okta Admin Console authentication policy.",
"Url": "https://hub.prowler.com/check/application_admin_console_phishing_resistant_authentication"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273191 / OKTA-APP-000190."
}
@@ -1,88 +0,0 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
ADMIN_CONSOLE_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
ADMIN_CONSOLE_LABEL_HINT = "Okta Admin Console"
class application_admin_console_phishing_resistant_authentication(Check):
"""STIG V-273191 / OKTA-APP-000190.
The Authentication Policy bound to the Okta Admin Console app must
restrict possession factors to phishing-resistant authenticators.
The underlying SDK exposes `phishingResistant` on each
`PossessionConstraint`; at least one constraint object on the top
rule must set `phishingResistant=REQUIRED` (constraints are OR-ed
by Okta semantics).
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
ADMIN_CONSOLE_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(ADMIN_CONSOLE_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(
self.metadata(), org_domain, ADMIN_CONSOLE_LABEL_HINT
)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must mark "
"`Possession factor constraints are: Phishing resistant`."
)
elif rule.possession_phishing_resistant_required:
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"phishing-resistant possession factors "
"(`possession.phishingResistant=REQUIRED`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
"enforce phishing-resistant possession factors. Enable "
"`Possession factor constraints are: Phishing resistant` "
"on the rule."
)
findings.append(report)
return findings
@@ -1,37 +0,0 @@
{
"Provider": "okta",
"CheckID": "application_admin_console_session_idle_timeout_15min",
"CheckTitle": "Okta Admin Console app session idle timeout is 15 minutes or less",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The integrated **Okta Admin Console** app must close idle privileged sessions. *Maximum app session idle time* on the **Sign On** tab must be `15` minutes or less.\n\nThreshold override: `okta_admin_console_idle_timeout_max_minutes`.",
"Risk": "An unattended administrator workstation leaves the Okta control plane open for session hijacking.\n\n- **Privileged session takeover** by anyone with physical or remote access to the workstation\n- **Tenant-wide configuration changes** under the absent administrator's identity\n- **Bypassed reauthentication** for the most sensitive surface of the IdP",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/guides/configure-signon-policy/main/",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/OktaApplicationSettings/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Applications** > **Applications** > **Okta Admin Console**.\n3. Open the **Sign On** tab.\n4. Under **Okta Admin Console session**, set *Maximum app session idle time* to `15` minutes or less.\n5. Save the changes.",
"Terraform": ""
},
"Recommendation": {
"Text": "Set the *Maximum app session idle time* of the Okta Admin Console first-party app to `15` minutes or less so privileged administrator sessions terminate on inactivity.",
"Url": "https://hub.prowler.com/check/application_admin_console_session_idle_timeout_15min"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273187 / OKTA-APP-000025."
}
@@ -1,89 +0,0 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
AdminConsoleAppSettings,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
missing_admin_console_settings_scope_finding,
)
DEFAULT_THRESHOLD_MINUTES = 15
class application_admin_console_session_idle_timeout_15min(Check):
"""STIG V-273187 / OKTA-APP-000025.
The Okta Admin Console first-party app must set its
`Maximum app session idle time` to 15 minutes (or less) so privileged
administrator sessions terminate on inactivity. Threshold override:
`okta_admin_console_idle_timeout_max_minutes` in the audit config.
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
audit_config = application_client.audit_config or {}
threshold = audit_config.get(
"okta_admin_console_idle_timeout_max_minutes",
DEFAULT_THRESHOLD_MINUTES,
)
org_domain = application_client.provider.identity.org_domain
missing_scope = application_client.missing_scope.get(
"admin_console_app_settings"
)
if missing_scope:
findings.append(
missing_admin_console_settings_scope_finding(
self.metadata(), org_domain, missing_scope
)
)
return findings
settings = application_client.admin_console_app_settings
if settings is None:
placeholder = AdminConsoleAppSettings()
report = CheckReportOkta(
metadata=self.metadata(), resource=placeholder, org_domain=org_domain
)
report.status = "MANUAL"
report.status_extended = (
"Could not retrieve the Okta Admin Console first-party app "
"settings. Okta restricts `GET /api/v1/first-party-app-settings/"
"admin-console` to the Super Administrator role; every other "
"role — including Read-Only Administrator — receives "
"`403 E0000006`. Assign Super Administrator to the service "
f"app to evaluate this check. The `Maximum app session idle "
f"time` must be set to {threshold} minutes or less."
)
findings.append(report)
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=settings, org_domain=org_domain
)
idle = settings.session_idle_timeout_minutes
if idle is None:
report.status = "FAIL"
report.status_extended = (
"The Okta Admin Console first-party app does not define a "
"`Maximum app session idle time`. This value must be "
f"{threshold} minutes or less."
)
elif idle <= threshold:
report.status = "PASS"
report.status_extended = (
"The Okta Admin Console first-party app sets the maximum "
f"app session idle time to {idle} minutes, meeting the "
f"configured threshold of {threshold} minutes."
)
else:
report.status = "FAIL"
report.status_extended = (
"The Okta Admin Console first-party app sets the maximum "
f"app session idle time to {idle} minutes, exceeding the "
f"configured threshold of {threshold} minutes."
)
findings.append(report)
return findings
@@ -1,38 +0,0 @@
{
"Provider": "okta",
"CheckID": "application_authentication_policy_network_zone_enforced",
"CheckTitle": "Okta application authentication policies enforce Network Zones",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "Every active Okta application must be bound to an **Authentication Policy** that uses **Network Zones**. Each active non-default rule must map *User's IP* to `In zone` or `Not in zone`, and the active built-in *Catch-all Rule* must set *Access is* to `Denied`.",
"Risk": "Applications without network-aware authentication rules can be reached from unauthorized locations and bypass location-based access controls.\n\n- **Unauthorized access paths** from unmanaged or blocked networks\n- **Inconsistent information-flow enforcement** across SSO applications\n- **Residual access** when the fallback rule still allows traffic after policy misses",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/policies/about-app-sign-on-policies.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Application/",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Networks** and define the allow-list and deny-list zones required by policy.\n3. For each active application, open **Applications** > **Applications** > *Application* > **Sign On**.\n4. In **User Authentication**, bind the appropriate **Authentication Policy** and open **View Policy Details**.\n5. For each active non-default rule, set *User's IP* to `In zone` or `Not in zone` and select the correct **Network Zone**.\n6. Edit the built-in **Catch-all Rule** and set *Access is* to `Denied`.\n7. Save the policy.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require every active application Authentication Policy to use Network Zones on each active non-default rule and to deny access on the Catch-all Rule.",
"Url": "https://hub.prowler.com/check/application_authentication_policy_network_zone_enforced"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-279693 / OKTA-APP-003244."
}
@@ -1,151 +0,0 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
AuthenticationPolicyRule,
OktaBuiltInApp,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
active_apps,
app_label,
missing_integrated_apps_scope_finding,
no_active_apps_finding,
rule_has_network_zone,
rule_label,
)
class application_authentication_policy_network_zone_enforced(Check):
"""STIG V-279693 / OKTA-APP-003244.
Every active Okta application must be bound to an Authentication
Policy that uses Network Zones. Each active non-default rule must map
`User's IP` to an allow/deny zone, and the active Catch-all Rule
must deny access.
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("integrated_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_integrated_apps_scope_finding(
self.metadata(),
org_domain,
missing_scope,
)
)
return findings
apps = active_apps(application_client.integrated_apps)
if not apps:
findings.append(no_active_apps_finding(self.metadata(), org_domain))
return findings
for app in apps:
report = CheckReportOkta(
metadata=self.metadata(),
resource=app,
org_domain=org_domain,
resource_name=app.label or app.name,
resource_id=app.id,
)
status, status_extended = _evaluate_app(app)
report.status = status
report.status_extended = status_extended
findings.append(report)
return findings
def _active_rules(app: OktaBuiltInApp) -> list[AuthenticationPolicyRule]:
if app.access_policy is None:
return []
return sorted(
[
rule
for rule in app.access_policy.rules
if not rule.status or rule.status.upper() == "ACTIVE"
],
key=lambda rule: (
rule.priority if rule.priority is not None else float("inf"),
rule.name,
),
)
def _evaluate_app(app: OktaBuiltInApp) -> tuple[str, str]:
label = app_label(app)
if app.access_policy_id is None or app.access_policy is None:
return (
"FAIL",
f"{label} has no Authentication Policy bound to it. "
"Bind an Access Policy in Security > Authentication Policies.",
)
active_rules = _active_rules(app)
if not active_rules:
return (
"FAIL",
f"{label} has no active rules on its Authentication Policy. "
"Every active non-default rule must enforce a Network Zone "
"condition, and the Catch-all Rule must set `Access is: Denied`.",
)
nondefault_rules = [
rule
for rule in active_rules
if not rule.is_default and rule.name != "Catch-all Rule"
]
if not nondefault_rules:
return (
"FAIL",
f"{label} has no active non-default rules on its Authentication "
"Policy. Define at least one non-default rule that maps `User's "
"IP` to a Network Zone, and use the Catch-all Rule only as the "
"final deny path.",
)
missing_zone_rules = [
rule.name for rule in nondefault_rules if not rule_has_network_zone(rule)
]
if missing_zone_rules:
quoted_rules = ", ".join(f"'{rule_name}'" for rule_name in missing_zone_rules)
return (
"FAIL",
f"{label} has active non-default rule(s) without Network Zones: "
f"{quoted_rules}. Configure `User's IP` to `In zone` or `Not in zone` "
"for every active non-default rule.",
)
catch_all_rule = next(
(
rule
for rule in active_rules
if rule.is_default or rule.name == "Catch-all Rule"
),
None,
)
if catch_all_rule is None:
return (
"FAIL",
f"{label} has no active Catch-all Rule. The Catch-all Rule must "
"deny access after the zoned non-default rules.",
)
if catch_all_rule.access != "DENY":
return (
"FAIL",
f"Active {rule_label(catch_all_rule)} on {label} does not set "
f"`Access is` to `DENY` (`access={catch_all_rule.access or 'unset'}`). "
"Set the Catch-all Rule to deny access.",
)
return (
"PASS",
f"{label} applies Network Zones on every active non-default rule and "
f"its active {rule_label(catch_all_rule)} denies access.",
)
@@ -1,4 +0,0 @@
from prowler.providers.common.provider import Provider
from prowler.providers.okta.services.application.application_service import Application
application_client = Application(Provider.get_global_provider())
@@ -1,37 +0,0 @@
{
"Provider": "okta",
"CheckID": "application_dashboard_mfa_required",
"CheckTitle": "Okta Dashboard authentication policy enforces multifactor authentication",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Dashboard** app must require MFA for end users. On its top active rule, *User must authenticate with* must be set to `Password / IdP + Another factor` or `Any 2 factor types` (`factorMode=2FA` in the API).",
"Risk": "Single-factor access to the Okta Dashboard lets an attacker pivot from one compromised password into every downstream SSO app.\n\n- **Credential stuffing** and password reuse attacks succeed in one step\n- **Lateral movement** into every SaaS the user has access to via Okta SSO\n- **Weakened identity assurance** for every user signing in to the end-user portal",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/policies/about-app-sign-on-policies.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Dashboard** policy.\n4. Edit the top active rule.\n5. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require MFA on the top active rule of the Okta Dashboard authentication policy. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.",
"Url": "https://hub.prowler.com/check/application_dashboard_mfa_required"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273194 / OKTA-APP-000570."
}
@@ -1,85 +0,0 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
DASHBOARD_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
DASHBOARD_LABEL_HINT = "Okta Dashboard"
class application_dashboard_mfa_required(Check):
"""STIG V-273194 / OKTA-APP-000570.
The Authentication Policy bound to the Okta Dashboard app must
require multifactor authentication on its top rule for
non-privileged users: `User must authenticate with` set to
`Password / IdP + Another factor` or `Any 2 factor types`
(`AssuranceMethod.factor_mode == "2FA"`).
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
DASHBOARD_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(DASHBOARD_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(self.metadata(), org_domain, DASHBOARD_LABEL_HINT)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must set `User must authenticate with` to "
"`Password / IdP + Another factor` or `Any 2 factor types`."
)
elif rule.factor_mode == "2FA":
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"multifactor authentication (`factorMode=2FA`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
f"enforce multifactor authentication "
f"(`factorMode={rule.factor_mode or 'unset'}`). "
"Set `User must authenticate with` to `Password / IdP + Another "
"factor` or `Any 2 factor types`."
)
findings.append(report)
return findings
@@ -1,37 +0,0 @@
{
"Provider": "okta",
"CheckID": "application_dashboard_phishing_resistant_authentication",
"CheckTitle": "Okta Dashboard authentication policy enforces phishing-resistant factors",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Dashboard** app must restrict possession factors to phishing-resistant authenticators. On the top active rule, *Possession factor constraints are: Phishing resistant* must be checked (`possession.phishingResistant=REQUIRED`).",
"Risk": "Phishable possession factors leave end-user SSO sessions exposed to credential phishing and AiTM proxies.\n\n- **Credential phishing** against end users succeeds despite MFA\n- **Session token theft** through reverse-proxy AiTM attacks (Evilginx-class tooling)\n- **Compromise of every downstream SSO app** the user has access to",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/authenticators/phishing-resistant-auth.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Dashboard** policy.\n4. Edit the top active rule.\n5. Under *Possession factor constraints are*, check **Phishing resistant**.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require phishing-resistant possession factors on the top active rule of the Okta Dashboard authentication policy.",
"Url": "https://hub.prowler.com/check/application_dashboard_phishing_resistant_authentication"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273190 / OKTA-APP-000180."
}
@@ -1,84 +0,0 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
DASHBOARD_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
DASHBOARD_LABEL_HINT = "Okta Dashboard"
class application_dashboard_phishing_resistant_authentication(Check):
"""STIG V-273190 / OKTA-APP-000180.
The Authentication Policy bound to the Okta Dashboard app must
restrict possession factors to phishing-resistant authenticators on
its top active rule
(`possession.phishingResistant=REQUIRED`).
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
DASHBOARD_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(DASHBOARD_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(self.metadata(), org_domain, DASHBOARD_LABEL_HINT)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must mark "
"`Possession factor constraints are: Phishing resistant`."
)
elif rule.possession_phishing_resistant_required:
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"phishing-resistant possession factors "
"(`possession.phishingResistant=REQUIRED`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
"enforce phishing-resistant possession factors. Enable "
"`Possession factor constraints are: Phishing resistant` "
"on the rule."
)
findings.append(report)
return findings
@@ -1,592 +0,0 @@
import json
from typing import Optional
from urllib.parse import parse_qs, urlparse
from pydantic import BaseModel, ValidationError
from prowler.lib.logger import logger
from prowler.providers.okta.lib.service.service import OktaService
# These three keys are Okta-platform constants, not tenant-configurable:
#
# - `saasure` / `okta_enduser` are the `name` fields of the OIN catalog
# templates for the Okta Admin Console and Okta Dashboard built-in apps.
# The Okta SDK's `OINApplication.name` is documented as "the key name for
# the OIN app definition" — tied to the platform-level template, not
# editable by customers. The user-facing field is `label`, which we read
# only for display purposes in finding text.
# - `admin-console` is the Okta-defined URL key for
# `/api/v1/first-party-app-settings/{appName}`; per the SDK's own
# `get_first_party_app_settings` docstring it is the only value Okta
# currently supports on that endpoint.
#
# If Okta introduces a new first-party app or renames one of these at the
# platform level, both the constants and the check coverage need updating
# together.
ADMIN_CONSOLE_APP_NAME = "saasure"
DASHBOARD_APP_NAME = "okta_enduser"
ADMIN_CONSOLE_FIRST_PARTY_APP_KEY = "admin-console"
def _next_after_cursor(resp) -> Optional[str]:
"""Extract the `after` cursor from a `Link: ...; rel="next"` header.
Returns None when there is no next page. Header format follows RFC 5988
and Okta's pagination guide. Mirrors the helper in `signon_service` —
duplicated rather than shared until a third Okta service appears.
"""
if resp is None:
return None
headers = getattr(resp, "headers", None) or {}
link = headers.get("link") or headers.get("Link") or ""
if not link:
return None
for part in link.split(","):
if 'rel="next"' not in part:
continue
url_segment = part.split(";", 1)[0].strip().lstrip("<").rstrip(">")
cursor = parse_qs(urlparse(url_segment).query).get("after", [None])[0]
if cursor:
return cursor
return None
REQUIRED_SCOPES: dict[str, str] = {
"admin_console_app_settings": "okta.apps.read",
"built_in_apps": "okta.apps.read",
"integrated_apps": "okta.apps.read",
"access_policies": "okta.policies.read",
}
class Application(OktaService):
"""Fetches Okta first-party apps and their bound Authentication Policies.
Populates:
- `self.admin_console_app_settings` first-party Admin Console session
knobs (`sessionIdleTimeoutMinutes`, `sessionMaxLifetimeMinutes`).
- `self.built_in_apps` keyed by canonical `name` (`saasure`,
`okta_enduser`). Each entry carries the resolved Authentication
Policy (Access Policy) and its rules.
- `self.integrated_apps` lazily populated and keyed by application id.
Used by the per-application network-zone STIG to evaluate every
active app returned by `/api/v1/apps`.
Required OAuth scopes (`REQUIRED_SCOPES`) are compared against the
access token's granted scopes (`provider.identity.granted_scopes`).
When a scope is known to be missing, the corresponding fetch is
skipped and recorded in `self.missing_scope` so each check can emit
an explicit MANUAL finding instead of a misleading
"no resources returned". Empty granted_scopes means "unknown" the
service attempts the fetch and lets the SDK fail loudly.
"""
def __init__(self, provider):
super().__init__(__class__.__name__, provider)
granted = set(getattr(provider.identity, "granted_scopes", None) or [])
self.missing_scope: dict[str, Optional[str]] = {
resource: (scope if granted and scope not in granted else None)
for resource, scope in REQUIRED_SCOPES.items()
}
self.admin_console_app_settings: Optional[AdminConsoleAppSettings] = (
None
if self.missing_scope["admin_console_app_settings"]
else self._get_admin_console_app_settings()
)
# Apps and policies share the same SDK round-trips, so fetch them
# together. When either scope is missing we still attempt the
# other, but `built_in_apps` is only populated when both are
# available — checks then look at `missing_scope` to report which
# one is at fault.
if self.missing_scope["built_in_apps"] or self.missing_scope["access_policies"]:
self.built_in_apps: dict[str, OktaBuiltInApp] = {}
else:
self.built_in_apps = self._list_built_in_apps_with_policies()
self._integrated_apps: Optional[dict[str, OktaBuiltInApp]] = None
@property
def integrated_apps(self) -> dict[str, "OktaBuiltInApp"]:
"""List every Okta-integrated app with its Authentication Policy.
This is fetched lazily because only the V-279693 check needs the
full app inventory; the bundled Admin Console / Dashboard checks
only need the two built-in apps.
"""
if self._integrated_apps is None:
if (
self.missing_scope["integrated_apps"]
or self.missing_scope["access_policies"]
):
self._integrated_apps = {}
else:
self._integrated_apps = self._list_integrated_apps_with_policies()
return self._integrated_apps
def _get_admin_console_app_settings(self) -> Optional["AdminConsoleAppSettings"]:
logger.info("Application - Fetching first-party Admin Console settings...")
try:
return self._run(self._fetch_admin_console_app_settings())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return None
async def _fetch_admin_console_app_settings(
self,
) -> Optional["AdminConsoleAppSettings"]:
result = await self.client.get_first_party_app_settings(
ADMIN_CONSOLE_FIRST_PARTY_APP_KEY
)
err = result[-1]
if err is not None:
# 404 means the org is on Classic engine or the endpoint isn't
# available — fall through to None and checks emit MANUAL.
logger.error(f"Error fetching first-party Admin Console settings: {err}")
return None
data = result[0]
if data is None:
return None
return AdminConsoleAppSettings(
session_idle_timeout_minutes=getattr(
data, "session_idle_timeout_minutes", None
),
session_max_lifetime_minutes=getattr(
data, "session_max_lifetime_minutes", None
),
)
def _list_built_in_apps_with_policies(self) -> dict:
logger.info("Application - Listing Okta built-in apps and policies...")
try:
return self._run(self._fetch_built_in_apps_and_policies())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
def _list_integrated_apps_with_policies(self) -> dict:
logger.info("Application - Listing integrated Okta apps and policies...")
try:
return self._run(self._fetch_integrated_apps_and_policies())
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return {}
async def _fetch_built_in_apps_and_policies(self) -> dict:
# Per-app try/except: one app's SDK failure (e.g. ValidationError
# while deserializing its policy rules) must not erase findings
# for the other.
result: dict[str, OktaBuiltInApp] = {}
for app_name in (ADMIN_CONSOLE_APP_NAME, DASHBOARD_APP_NAME):
try:
built_in_app = await self._fetch_built_in_app(app_name)
except Exception as error:
logger.error(
f"Error fetching built-in app {app_name}: "
f"{error.__class__.__name__}: {error}"
)
continue
if built_in_app is None:
continue
if built_in_app.access_policy_id:
try:
built_in_app.access_policy = await self._fetch_access_policy(
built_in_app.access_policy_id
)
except Exception as error:
logger.error(
f"Error fetching access policy "
f"{built_in_app.access_policy_id} for {app_name}: "
f"{error.__class__.__name__}: {error}"
)
built_in_app.access_policy = None
result[app_name] = built_in_app
return result
async def _fetch_integrated_apps_and_policies(self) -> dict:
all_apps, err = await self._paginate(
lambda after: self.client.list_applications(after=after)
)
if err is not None:
logger.error(f"Error listing integrated apps: {err}")
return {}
# Per-app try/except: a single app's policy fetch failure must
# not drop the whole inventory.
result: dict[str, OktaBuiltInApp] = {}
for app in all_apps:
try:
app_model = _to_application_model(app)
except Exception as error:
logger.error(
f"Error projecting Okta app onto pydantic model "
f"(id={getattr(app, 'id', '?')}): "
f"{error.__class__.__name__}: {error}"
)
continue
if app_model.access_policy_id:
try:
app_model.access_policy = await self._fetch_access_policy(
app_model.access_policy_id
)
except Exception as error:
logger.error(
f"Error fetching access policy "
f"{app_model.access_policy_id} for app "
f"{app_model.name} ({app_model.id}): "
f"{error.__class__.__name__}: {error}"
)
app_model.access_policy = None
result[app_model.id] = app_model
return result
async def _fetch_built_in_app(self, app_name: str) -> Optional["OktaBuiltInApp"]:
# Filter by `name eq` so we don't paginate every app in the org
# for a single match. The two OIN-built-in apps are uniquely
# identified by their internal `name`.
apps, err = await self._paginate(
lambda after: self.client.list_applications(
filter=f'name eq "{app_name}"', after=after
)
)
if err is not None:
logger.error(f"Error listing app with name={app_name}: {err}")
return None
if not apps:
return None
return _to_application_model(apps[0])
async def _fetch_access_policy(
self, policy_id: str
) -> Optional["AuthenticationPolicy"]:
# Okta's `list_policy_rules` does not accept an `after` cursor in
# the SDK signature, so we call once with a generous limit. Auth
# policies almost always have <10 rules; a warning is logged if
# the limit is hit.
rule_fetch_limit = 100
try:
result = await self.client.list_policy_rules(
policy_id, limit=str(rule_fetch_limit)
)
except ValidationError as ve:
# Upstream Okta SDK ↔ Management API enum drift: the SDK's
# strict pydantic validators (e.g. KnowledgeConstraint.types
# uppercase-only) reject values the API returns lowercase
# (e.g. ["password"]). Fall back to a raw-JSON fetch so the
# STIG evaluation isn't blocked by an upstream SDK bug.
logger.warning(
f"Okta SDK raised ValidationError parsing rules for policy "
f"{policy_id} ({ve.error_count()} error(s)) — falling back "
"to raw-JSON parse. This is an okta-sdk-python deserialization "
"bug; the workaround should be removed once upstream fixes it."
)
return await self._fetch_access_policy_raw(policy_id, rule_fetch_limit)
err = result[-1]
if err is not None:
logger.error(f"Error listing rules for access policy {policy_id}: {err}")
return AuthenticationPolicy(
id=policy_id,
name="",
status="",
is_default=False,
rules=[],
)
all_rules = list(result[0] or [])
if len(all_rules) >= rule_fetch_limit:
logger.warning(
f"Access policy {policy_id} returned {len(all_rules)} rules — "
f"the per-policy fetch limit ({rule_fetch_limit}) was hit; any "
"rules beyond this limit are not evaluated by Prowler. Review "
"the policy in the Okta Admin Console."
)
rules_out = [_rule_to_model(rule) for rule in all_rules]
return AuthenticationPolicy(
id=policy_id,
name="",
status="",
is_default=False,
rules=rules_out,
)
async def _fetch_access_policy_raw(
self, policy_id: str, rule_fetch_limit: int
) -> Optional["AuthenticationPolicy"]:
"""Raw-JSON fallback for `list_policy_rules`.
Bypasses the Okta SDK's typed deserialization by calling the
request executor directly without a response type. The response
body is then `json.loads`-ed and projected onto our own pydantic
snapshot, which only validates the fields the STIG checks
actually read. This keeps the checks evaluable on tenants where
the Management API returns values the SDK validators reject.
"""
request, error = await self.client._request_executor.create_request(
method="GET",
url=f"/api/v1/policies/{policy_id}/rules?limit={rule_fetch_limit}",
body=None,
headers={"Accept": "application/json"},
)
if error is not None:
logger.error(
f"Raw rules fetch (create_request) failed for {policy_id}: {error}"
)
return AuthenticationPolicy(
id=policy_id, name="", status="", is_default=False, rules=[]
)
_response, response_body, error = await self.client._request_executor.execute(
request
)
if error is not None:
logger.error(f"Raw rules fetch (execute) failed for {policy_id}: {error}")
return AuthenticationPolicy(
id=policy_id, name="", status="", is_default=False, rules=[]
)
if isinstance(response_body, (bytes, bytearray)):
try:
response_body = response_body.decode("utf-8")
except UnicodeDecodeError as decode_err:
logger.error(
f"Could not decode rules response for {policy_id}: {decode_err}"
)
return AuthenticationPolicy(
id=policy_id, name="", status="", is_default=False, rules=[]
)
try:
rules_data = json.loads(response_body) if response_body else []
except json.JSONDecodeError as decode_err:
logger.error(f"Could not parse rules JSON for {policy_id}: {decode_err}")
return AuthenticationPolicy(
id=policy_id, name="", status="", is_default=False, rules=[]
)
if not isinstance(rules_data, list):
logger.error(
f"Unexpected raw rules payload shape for {policy_id}: "
f"got {type(rules_data).__name__}, expected list"
)
return AuthenticationPolicy(
id=policy_id, name="", status="", is_default=False, rules=[]
)
if len(rules_data) >= rule_fetch_limit:
logger.warning(
f"Access policy {policy_id} returned {len(rules_data)} rules "
f"via raw-JSON fallback — the per-policy fetch limit "
f"({rule_fetch_limit}) was hit; any rules beyond this limit "
"are not evaluated by Prowler."
)
rules_out = [_raw_rule_to_model(rule) for rule in rules_data]
return AuthenticationPolicy(
id=policy_id, name="", status="", is_default=False, rules=rules_out
)
@staticmethod
async def _paginate(fetch):
"""Drain all pages of an SDK list call.
`fetch` is a callable taking the `after` cursor (or None) and
returning the SDK's `(items, resp, err)` tuple. Follows the
`Link: rel="next"` header until exhausted. Mirrors the helper in
`signon_service`.
"""
all_items = []
result = await fetch(None)
err = result[-1]
if err is not None:
return [], err
items = result[0]
resp = result[1] if len(result) >= 3 else None
all_items.extend(items or [])
while True:
cursor = _next_after_cursor(resp)
if not cursor:
break
result = await fetch(cursor)
err = result[-1]
if err is not None:
return all_items, err
items = result[0]
resp = result[1] if len(result) >= 3 else None
all_items.extend(items or [])
return all_items, None
def _policy_id_from_href(href: Optional[str]) -> Optional[str]:
"""Extract the trailing policy id from `.../policies/{id}` URLs."""
if not href:
return None
path = urlparse(href).path or href
segment = path.rstrip("/").rsplit("/", 1)[-1]
return segment or None
def _rule_to_model(rule) -> "AuthenticationPolicyRule":
"""Project an SDK `AccessPolicyRule` onto our pydantic snapshot.
Pulls out the two STIG-relevant fields from the deeply nested
`actions.appSignOn.verificationMethod` tree: the assurance `factor_mode`
and whether any possession constraint requires phishing resistance.
"""
actions = getattr(rule, "actions", None)
app_sign_on = getattr(actions, "app_sign_on", None) if actions else None
verification_method = (
getattr(app_sign_on, "verification_method", None) if app_sign_on else None
)
factor_mode = _stringify_enum(getattr(verification_method, "factor_mode", None))
verification_type = _stringify_enum(getattr(verification_method, "type", None))
constraints = list(getattr(verification_method, "constraints", None) or [])
phishing_resistant_required = False
for constraint in constraints:
possession = getattr(constraint, "possession", None)
if possession is None:
continue
if (
_stringify_enum(getattr(possession, "phishing_resistant", None))
== "REQUIRED"
):
phishing_resistant_required = True
break
access_action = getattr(app_sign_on, "access", None) if app_sign_on else None
conditions = getattr(rule, "conditions", None)
network = getattr(conditions, "network", None) if conditions else None
return AuthenticationPolicyRule(
id=getattr(rule, "id", "") or "",
name=getattr(rule, "name", "") or "",
priority=getattr(rule, "priority", None),
status=getattr(rule, "status", "") or "",
is_default=bool(getattr(rule, "system", False)),
factor_mode=factor_mode,
possession_phishing_resistant_required=phishing_resistant_required,
constraints_count=len(constraints),
verification_method_type=verification_type,
access=_stringify_enum(access_action),
network_connection=_stringify_enum(getattr(network, "connection", None)),
network_zones_include=list(getattr(network, "include", None) or []),
network_zones_exclude=list(getattr(network, "exclude", None) or []),
)
def _stringify_enum(value) -> Optional[str]:
"""Return the string form of an enum-or-string value, or None."""
if value is None:
return None
return getattr(value, "value", None) or str(value)
def _raw_rule_to_model(rule_dict: dict) -> "AuthenticationPolicyRule":
"""Project a raw `/api/v1/policies/{id}/rules` JSON rule onto our model.
Mirrors `_rule_to_model` but reads camelCase JSON keys (`appSignOn`,
`verificationMethod`, `phishingResistant`) instead of the SDK's
snake_case attribute names. Used by the raw-JSON fallback that
activates when the Okta SDK's strict enum validators reject values
the Management API returns.
"""
actions = rule_dict.get("actions") or {}
app_sign_on = actions.get("appSignOn") or {}
verification_method = app_sign_on.get("verificationMethod") or {}
factor_mode = verification_method.get("factorMode")
verification_type = verification_method.get("type")
constraints = verification_method.get("constraints") or []
phishing_resistant_required = False
for constraint in constraints:
possession = (constraint or {}).get("possession") or {}
if possession.get("phishingResistant") == "REQUIRED":
phishing_resistant_required = True
break
access_action = app_sign_on.get("access")
conditions = rule_dict.get("conditions") or {}
network = conditions.get("network") or {}
return AuthenticationPolicyRule(
id=rule_dict.get("id") or "",
name=rule_dict.get("name") or "",
priority=rule_dict.get("priority"),
status=rule_dict.get("status") or "",
is_default=bool(rule_dict.get("system", False)),
factor_mode=factor_mode,
possession_phishing_resistant_required=phishing_resistant_required,
constraints_count=len(constraints),
verification_method_type=verification_type,
access=access_action,
network_connection=network.get("connection"),
network_zones_include=list(network.get("include") or []),
network_zones_exclude=list(network.get("exclude") or []),
)
class AdminConsoleAppSettings(BaseModel):
"""First-party Okta Admin Console session settings.
`id` and `name` are set to fixed sentinels so this can be passed as
the `resource` to `CheckReportOkta`, which reads those attributes.
"""
id: str = "okta-admin-console-app-settings"
name: str = "Okta Admin Console (first-party app settings)"
session_idle_timeout_minutes: Optional[int] = None
session_max_lifetime_minutes: Optional[int] = None
class AuthenticationPolicyRule(BaseModel):
id: str
name: str
priority: Optional[int] = None
status: str = ""
is_default: bool = False
factor_mode: Optional[str] = None
possession_phishing_resistant_required: bool = False
constraints_count: int = 0
verification_method_type: Optional[str] = None
access: Optional[str] = None
network_connection: Optional[str] = None
network_zones_include: list[str] = []
network_zones_exclude: list[str] = []
class AuthenticationPolicy(BaseModel):
id: str
name: str = ""
status: str = ""
is_default: bool = False
rules: list[AuthenticationPolicyRule] = []
class OktaBuiltInApp(BaseModel):
# `id` matches the Okta-generated `0oa…` app identifier; `name` is the
# canonical internal name (`saasure`, `okta_enduser`). Both are read
# directly by `CheckReportOkta(resource=…)`.
id: str
name: str
label: str = ""
status: str = ""
access_policy_id: Optional[str] = None
access_policy: Optional[AuthenticationPolicy] = None
def _application_access_policy_id(app) -> Optional[str]:
links = getattr(app, "links", None)
access_policy_link = getattr(links, "access_policy", None) if links else None
return _policy_id_from_href(
getattr(access_policy_link, "href", None) if access_policy_link else None
)
def _to_application_model(app) -> OktaBuiltInApp:
return OktaBuiltInApp(
id=getattr(app, "id", "") or "",
name=getattr(app, "name", "") or "",
label=getattr(app, "label", "") or "",
status=getattr(app, "status", "") or "",
access_policy_id=_application_access_policy_id(app),
)
@@ -1,212 +0,0 @@
"""Shared helpers for the OKTA application STIG checks."""
from typing import Optional
from prowler.lib.check.models import CheckReportOkta
from prowler.providers.okta.services.application.application_service import (
AdminConsoleAppSettings,
AuthenticationPolicyRule,
OktaBuiltInApp,
)
def active_apps(apps: dict[str, OktaBuiltInApp]) -> list[OktaBuiltInApp]:
"""Return active apps sorted by label/name, id as tiebreaker."""
return sorted(
[
app
for app in apps.values()
if not app.status or app.status.upper() == "ACTIVE"
],
key=lambda app: (app.label or app.name, app.id),
)
def top_active_rule(
app: OktaBuiltInApp,
) -> Optional[AuthenticationPolicyRule]:
"""Return the topmost active rule on the app's Authentication Policy.
Mirrors the STIG fix text *"Click the 'Actions' button next to the
top rule and select 'Edit'"* — by returning the active rule with the
lowest `priority` value (= highest precedence). Downstream checks
separately reject the rule when it is the built-in Catch-all Rule.
The priority value itself is intentionally not pinned to a specific
integer. Okta indexes Access Policy rule priorities inconsistently
across tenants and policy types (some responses report `0` for the
top rule, others `1`); the STIG only requires that the topmost rule
satisfy the predicate, not that it carry any specific priority literal.
"""
if app.access_policy is None:
return None
active_rules = sorted(
[
rule
for rule in app.access_policy.rules
if not rule.status or rule.status.upper() == "ACTIVE"
],
key=lambda rule: (
rule.priority if rule.priority is not None else float("inf"),
rule.name,
),
)
if not active_rules:
return None
return active_rules[0]
def app_label(app: OktaBuiltInApp) -> str:
"""Format a human-readable label for an Okta application."""
label = app.label or app.name
return f"Okta app '{label}' (app={app.name}, id={app.id})"
def rule_label(rule: AuthenticationPolicyRule) -> str:
"""Format whether a rule is the built-in catch-all or a custom rule."""
if rule.is_default or rule.name == "Catch-all Rule":
return f"built-in Catch-all Rule '{rule.name}'"
return f"non-default rule '{rule.name}'"
def rule_has_network_zone(rule: AuthenticationPolicyRule) -> bool:
"""Return True when the rule maps User's IP to at least one Network Zone."""
return bool(rule.network_zones_include or rule.network_zones_exclude)
_SCOPE_ADVICE = (
"Grant it on the service app's Okta API Scopes tab in the Okta Admin "
"Console, then re-run the check."
)
def missing_app_scope_finding(
metadata, org_domain: str, scope: str, app_label_hint: str
) -> CheckReportOkta:
"""Build the MANUAL finding when an app/policy scope is not granted."""
placeholder = OktaBuiltInApp(
id="okta-built-in-app-scope-missing",
name="(scope not granted)",
label=app_label_hint,
status="MISSING",
)
report = CheckReportOkta(
metadata=metadata, resource=placeholder, org_domain=org_domain
)
report.status = "MANUAL"
report.status_extended = (
f"Could not evaluate the authentication policy for {app_label_hint}: "
f"the Okta service app is missing the required `{scope}` API scope. "
f"{_SCOPE_ADVICE}"
)
return report
def missing_integrated_apps_scope_finding(
metadata, org_domain: str, scope: str
) -> CheckReportOkta:
"""Build the MANUAL finding when the integrated-app inventory scope is not granted."""
placeholder = OktaBuiltInApp(
id="okta-integrated-apps-scope-missing",
name="(scope not granted)",
label="Okta integrated applications",
status="MISSING",
)
report = CheckReportOkta(
metadata=metadata,
resource=placeholder,
org_domain=org_domain,
resource_name=placeholder.label,
resource_id=placeholder.id,
)
report.status = "MANUAL"
report.status_extended = (
"Could not retrieve Okta integrated applications and their "
f"authentication policies: the Okta service app is missing the "
f"required `{scope}` API scope. {_SCOPE_ADVICE}"
)
return report
def missing_admin_console_settings_scope_finding(
metadata, org_domain: str, scope: str
) -> CheckReportOkta:
"""Build the MANUAL finding for the Admin Console idle timeout check when scope is missing."""
placeholder = AdminConsoleAppSettings()
report = CheckReportOkta(
metadata=metadata, resource=placeholder, org_domain=org_domain
)
report.status = "MANUAL"
report.status_extended = (
"Could not retrieve the Okta Admin Console first-party app settings: "
f"the Okta service app is missing the required `{scope}` API scope. "
f"{_SCOPE_ADVICE}"
)
return report
def app_not_found_finding(
metadata, org_domain: str, app_label_hint: str
) -> CheckReportOkta:
"""Build the MANUAL finding emitted when a built-in OIN app isn't returned.
Okta filters the first-party apps (`saasure`, `okta_enduser`) out of
`/api/v1/apps` for every admin role below Super Administrator, so the
check has no way to resolve the app's bound Authentication Policy.
"""
placeholder = OktaBuiltInApp(
id="okta-built-in-app-missing",
name="(app not found)",
label=app_label_hint,
status="MISSING",
)
report = CheckReportOkta(
metadata=metadata, resource=placeholder, org_domain=org_domain
)
report.status = "MANUAL"
report.status_extended = (
f"The {app_label_hint} first-party app was not returned by the Okta "
"API. Okta restricts the visibility of first-party apps "
"(`saasure`, `okta_enduser`) to the Super Administrator role; "
"every other role — including Read-Only Administrator — receives "
"an empty result. Assign Super Administrator to the service app "
"to evaluate this check."
)
return report
def no_active_apps_finding(metadata, org_domain: str) -> CheckReportOkta:
"""Build the MANUAL finding emitted when no active apps are returned."""
placeholder = OktaBuiltInApp(
id="okta-apps-missing",
name="(no active apps)",
label="Okta applications",
status="MISSING",
)
report = CheckReportOkta(
metadata=metadata,
resource=placeholder,
org_domain=org_domain,
resource_name=placeholder.label,
resource_id=placeholder.id,
)
report.status = "MANUAL"
report.status_extended = (
"No active Okta applications were returned by the API. Verify the "
"tenant exposes applications to the Read-Only Administrator role and "
"review the application inventory manually."
)
return report
def policy_missing_finding(
metadata, org_domain: str, app: OktaBuiltInApp
) -> CheckReportOkta:
"""Build the FAIL finding when the built-in app has no bound Access Policy."""
report = CheckReportOkta(metadata=metadata, resource=app, org_domain=org_domain)
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no Authentication Policy bound to it. "
"Bind an Access Policy in Security > Authentication Policies."
)
return report
+1 -1
View File
@@ -120,7 +120,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">=3.10,<3.13"
version = "5.29.0"
version = "5.28.2"
[project.scripts]
prowler = "prowler.__main__:prowler"
+1 -24
View File
@@ -79,10 +79,8 @@ Recharts/library? → CHART_COLORS constant + var()
### Scope Rule (ABSOLUTE)
- Used by unrelated features → `lib/` or `types/` or `hooks/` (components go in `components/{domain}/`)
- Used by 2+ files in the same feature/domain → keep it inside that feature/domain, e.g. `_lib/`, `_hooks/`, or local `types.ts`
- Used 2+ places → `lib/` or `types/` or `hooks/` (components go in `components/{domain}/`)
- Used 1 place → keep local in feature directory
- **Feature/domain ownership beats raw usage count**: sharing between sibling files does not automatically make a helper global
- **This determines ALL folder structure decisions**
## Project Structure
@@ -244,27 +242,6 @@ import { severityLabel } from "@/types/findings";
If a helper doesn't exist and will be used in 2+ places, add it to `ui/lib/` or `ui/types/` and reuse it. Keep local only if used in exactly one place.
## Error State Replacement Checklist (REQUIRED)
When replacing UI error-state logic, remove legacy state fields and branches that the new contract can no longer produce.
Before finishing the change, check:
- [ ] Are there flags, union members, buttons, or handlers that are now unreachable?
- [ ] Does the UI still render an action for an error case the new API contract cannot emit?
- [ ] Does retry appear only for errors where repeating the same action can help?
- [ ] Are error mappers matching specific status/code/detail or field pointers rather than broad payload-level pointers?
- [ ] Are negative tests present for over-broad mapper/type-guard matches?
```typescript
// ❌ BAD: old branch kept after new mapper never returns needsSignOut
if (state.needsSignOut) return <SignInWithDifferentAccount />;
// ✅ GOOD: remove unreachable state and render only reachable actions
{state.canRetry && <Button onClick={retry}>Retry</Button>}
<Button asChild><Link href="/sign-in">Go to Sign In</Link></Button>
```
## Derived State Rule (REQUIRED)
Avoid `useState` + `useEffect` patterns that mirror props or searchParams — they create sync bugs and unnecessary re-renders. Derive values directly from the source of truth.
-32
View File
@@ -102,38 +102,6 @@ function isUser(value: unknown): value is User {
}
```
### Type Guard Tests (REQUIRED)
When adding or changing a type guard or mapper predicate, test both accepted and rejected shapes. A positive test alone often hides over-broad matching.
```typescript
const ERROR_POINTER = {
INVITATION_TOKEN: "/data/attributes/invitation_token",
} as const;
function isInvitationTokenError(error: ApiError): boolean {
return error.source?.pointer === ERROR_POINTER.INVITATION_TOKEN;
}
it("should identify invitation token errors", () => {
expect(
isInvitationTokenError({
detail: "Invalid invitation code.",
source: { pointer: ERROR_POINTER.INVITATION_TOKEN },
}),
).toBe(true);
});
it("should not identify payload-level errors as invitation token errors", () => {
expect(
isInvitationTokenError({
detail: "Invalid request data.",
source: { pointer: "/data" },
}),
).toBe(false);
});
```
## Coupled Optional Props (REQUIRED)
Do not model semantically coupled props as independent optionals — this allows invalid half-states that compile but break at runtime. Use discriminated unions with `never` to make invalid combinations impossible.
-3
View File
@@ -493,6 +493,3 @@ okta:
# Okta Sign-On Policies
# okta.signon_global_session_idle_timeout_15min
okta_max_session_idle_minutes: 15
# Okta Applications
# okta.application_admin_console_session_idle_timeout_15min
okta_admin_console_idle_timeout_max_minutes: 15
+7 -20
View File
@@ -128,7 +128,7 @@ CIS_2_0_GCP = Compliance(
Description="This CIS Benchmark is the product of a community consensus process and consists of secure configuration guidelines developed for Google Cloud Computing Platform",
Requirements=[
Compliance_Requirement(
Checks=["apikeys_key_exits", "service_test_check_id"],
Checks=["apikeys_key_exits"],
Id="2.13",
Description="Ensure That Microsoft Defender for Databases Is Set To 'On'",
Attributes=[
@@ -177,7 +177,7 @@ CIS_1_8_KUBERNETES = Compliance(
Description="This CIS Kubernetes Benchmark provides prescriptive guidance for establishing a secure configuration posture for Kubernetes v1.27.",
Requirements=[
Compliance_Requirement(
Checks=["apiserver_always_pull_images_plugin", "service_test_check_id"],
Checks=["apiserver_always_pull_images_plugin"],
Id="1.1.3",
Description="Ensure that the controller manager pod specification file permissions are set to 600 or more restrictive",
Attributes=[
@@ -259,7 +259,6 @@ CIS_4_0_M365 = Compliance(
Compliance_Requirement(
Checks=[
"mfa_delete_enabled",
"service_test_check_id",
],
Id="2.1.3",
Description="Ensure MFA Delete is enabled on S3 buckets",
@@ -337,7 +336,6 @@ MITRE_ATTACK_AWS = Compliance(
"inspector2_active_findings_exist",
"awslambda_function_not_publicly_accessible",
"ec2_instance_public_ip",
"service_test_check_id",
],
),
Mitre_Requirement(
@@ -414,7 +412,6 @@ MITRE_ATTACK_AZURE = Compliance(
"defender_ensure_notify_emails_to_owners",
"defender_ensure_system_updates_are_applied",
"defender_ensure_wdatp_is_enabled",
"service_test_check_id",
],
),
Mitre_Requirement(
@@ -470,7 +467,6 @@ MITRE_ATTACK_GCP = Compliance(
"compute_instance_public_ip",
"compute_public_address_shodan",
"kms_key_not_publicly_accessible",
"service_test_check_id",
],
),
Mitre_Requirement(
@@ -518,7 +514,7 @@ ENS_RD2022_AWS = Compliance(
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled", "service_test_check_id"],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.aws.ct.4",
@@ -566,7 +562,7 @@ ENS_RD2022_AZURE = Compliance(
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled", "service_test_check_id"],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.azure.ct.4",
@@ -613,7 +609,7 @@ ENS_RD2022_GCP = Compliance(
Dependencias=[],
)
],
Checks=["cloudtrail_log_file_validation_enabled", "service_test_check_id"],
Checks=["cloudtrail_log_file_validation_enabled"],
),
Compliance_Requirement(
Id="op.exp.8.gcp.ct.4",
@@ -670,10 +666,7 @@ AWS_WELL_ARCHITECTED = Compliance(
ImplementationGuidanceUrl="https://docs.aws.amazon.com/wellarchitected/latest/security-pillar/sec_securely_operate_multi_accounts.html#implementation-guidance.",
)
],
Checks=[
"organizations_account_part_of_organizations",
"service_test_check_id",
],
Checks=["organizations_account_part_of_organizations"],
),
Compliance_Requirement(
Id="SEC01-BP02",
@@ -716,7 +709,7 @@ ISO27001_2013_AWS = Compliance(
Check_Summary="Setup Encryption at rest for RDS instances",
)
],
Checks=["rds_instance_storage_encrypted", "service_test_check_id"],
Checks=["rds_instance_storage_encrypted"],
),
Compliance_Requirement(
Id="A.10.2",
@@ -766,7 +759,6 @@ NIST_800_53_REVISION_4_AWS = Compliance(
"rds_instance_integration_cloudwatch_logs",
"redshift_cluster_audit_logging",
"securityhub_enabled",
"service_test_check_id",
],
),
Compliance_Requirement(
@@ -823,7 +815,6 @@ KISA_ISMSP_AWS = Compliance(
Checks=[
"cloudwatch_log_metric_filter_authentication_failures",
"cognito_user_pool_mfa_enabled",
"service_test_check_id",
],
),
Compliance_Requirement(
@@ -881,7 +872,6 @@ PROWLER_THREATSCORE_AWS = Compliance(
],
Checks=[
"iam_root_mfa_enabled",
"service_test_check_id",
],
),
Compliance_Requirement(
@@ -926,7 +916,6 @@ PROWLER_THREATSCORE_AZURE = Compliance(
],
Checks=[
"iam_root_mfa_enabled",
"service_test_check_id",
],
),
Compliance_Requirement(
@@ -971,7 +960,6 @@ PROWLER_THREATSCORE_GCP = Compliance(
],
Checks=[
"iam_root_mfa_enabled",
"service_test_check_id",
],
),
Compliance_Requirement(
@@ -1016,7 +1004,6 @@ PROWLER_THREATSCORE_M365 = Compliance(
],
Checks=[
"iam_root_mfa_enabled",
"service_test_check_id",
],
),
Compliance_Requirement(
@@ -5,11 +5,6 @@ from unittest import mock
from freezegun import freeze_time
from mock import patch
from prowler.lib.check.compliance_models import (
Compliance,
Compliance_Requirement,
Generic_Compliance_Requirement_Attribute,
)
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.compliance.generic.models import GenericComplianceModel
from tests.lib.outputs.compliance.fixtures import NIST_800_53_REVISION_4_AWS
@@ -134,67 +129,3 @@ class TestAWSGenericCompliance:
expected_csv = f"PROVIDER;DESCRIPTION;ACCOUNTID;REGION;ASSESSMENTDATE;REQUIREMENTS_ID;REQUIREMENTS_DESCRIPTION;REQUIREMENTS_ATTRIBUTES_SECTION;REQUIREMENTS_ATTRIBUTES_SUBSECTION;REQUIREMENTS_ATTRIBUTES_SUBGROUP;REQUIREMENTS_ATTRIBUTES_SERVICE;REQUIREMENTS_ATTRIBUTES_TYPE;STATUS;STATUSEXTENDED;RESOURCEID;CHECKID;MUTED;RESOURCENAME;FRAMEWORK;NAME;REQUIREMENTS_ATTRIBUTES_COMMENT\r\naws;NIST 800-53 is a regulatory standard that defines the minimum baseline of security controls for all U.S. federal information systems except those related to national security. The controls defined in this standard are customizable and address a diverse set of security and privacy requirements.;123456789012;eu-west-1;{datetime.now()};ac_2_4;Account Management;Access Control (AC);Account Management (AC-2);;aws;;PASS;;;service_test_check_id;False;;NIST-800-53-Revision-4;National Institute of Standards and Technology (NIST) 800-53 Revision 4;\r\naws;NIST 800-53 is a regulatory standard that defines the minimum baseline of security controls for all U.S. federal information systems except those related to national security. The controls defined in this standard are customizable and address a diverse set of security and privacy requirements.;;;{datetime.now()};ac_2_5;Account Management;Access Control (AC);Account Management (AC-2);;aws;;MANUAL;Manual check;manual_check;manual;False;Manual check;NIST-800-53-Revision-4;National Institute of Standards and Technology (NIST) 800-53 Revision 4;\r\n"
assert content == expected_csv
def test_csv_row_count_matches_framework_checks_not_stored_compliance(self):
"""Regression test for PROWLER-1763.
Ensures CSV emission is driven by the framework JSON's Requirements[].Checks
(the same source the UI uses) and not by the per-finding `finding.compliance`
snapshot stored at scan time. If `finding.check_id` is in a requirement's
Checks list, the row must be emitted regardless of what was stored in
`finding.compliance`. Conversely, a stale `finding.compliance` entry pointing
to a requirement whose Checks list no longer contains the finding's check_id
must not produce a row.
"""
framework_name = "Test-Framework-1763"
compliance = Compliance(
Framework=framework_name,
Name=framework_name,
Provider="AWS",
Version="",
Description="Regression fixture for PROWLER-1763",
Requirements=[
Compliance_Requirement(
Id="req_in_framework",
Description="Requirement currently in framework",
Attributes=[
Generic_Compliance_Requirement_Attribute(
Section="Section A", Service="aws"
)
],
Checks=["service_check_in_framework"],
),
Compliance_Requirement(
Id="req_no_longer_in_framework",
Description="Requirement whose Checks list no longer includes the finding's check_id",
Attributes=[
Generic_Compliance_Requirement_Attribute(
Section="Section B", Service="aws"
)
],
Checks=["service_different_check"],
),
],
)
# Snapshot drift case: finding.compliance maps to a requirement whose
# current Checks list no longer includes the finding's check_id, AND
# the finding belongs to a requirement that is NOT in the snapshot.
findings = [
generate_finding_output(
check_id="service_check_in_framework",
compliance={framework_name: ["req_no_longer_in_framework"]},
)
]
output = GenericCompliance(findings, compliance)
rows = [
row
for row in output.data
if row.Status != "MANUAL" and row.ResourceName != "Manual check"
]
assert (
len(rows) == 1
), f"Expected 1 row driven by framework JSON, got {len(rows)}"
assert rows[0].Requirements_Id == "req_in_framework"
assert rows[0].CheckId == "service_check_in_framework"

Some files were not shown because too many files have changed in this diff Show More