Compare commits

...

23 Commits

Author SHA1 Message Date
Alan Buscaglia 22f17aa394 docs(skills): strengthen UI review rules
- Clarify feature-local helper placement
- Add UI error-state replacement checklist
- Require negative tests for mapper predicates
2026-05-27 16:41:18 +02:00
Juan Pablo 3252f9cf19 fix(compliance/ens): remap resilience VPC checks out of mp.com.4 (#11372)
Co-authored-by: Juan Pablo Mora <juanpablo.mora@logalty.com>
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-05-27 13:10:58 +02:00
Hugo Pereira Brito f1cdf3df15 feat(ui): improve dark mode contrast for editorial readability (#11073) 2026-05-27 12:49:50 +02:00
Pedro Martín 03ddb8a708 fix(ui): show compliance data when opening compliance sidebar (#11374) 2026-05-27 11:18:32 +02:00
Daniel Barranquero 2678c6bc9f feat(okta): add application service with 6 new checks (#11358) 2026-05-27 11:16:18 +02:00
Pedro Martín 48c071297f fix(sdk): align compliance CSV row emission with framework JSON (#11370) 2026-05-27 11:06:23 +02:00
Prowler Bot 7e9a16d022 feat(aws): Update regions for AWS services (#11349)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-27 10:36:28 +02:00
Pedro Martín 84b388f649 fix(ui): honor page size select in compliance req findings (#11365)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-26 15:35:33 +02:00
Rubén De la Torre Vico 671d0c746c fix(mcp_server): preserve authorization header in HTTP mode (#11366)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-26 15:25:46 +02:00
Pepe Fagoaga 0e4b117161 chore: SDK changelog v5.28.1 (#11363) 2026-05-26 12:15:19 +02:00
Alan Buscaglia a70bc3c1c7 fix(ui): avoid report preflight timeouts (#11350)
Co-authored-by: Adrián Jesús Peña Rodríguez <adrianjpr@gmail.com>
2026-05-26 11:47:34 +02:00
Pedro Martín 723d161c63 fix(az-m365): asyncio.run() in Azure/M365 Celery worker event (#11360) 2026-05-26 11:26:39 +02:00
Aline Almeida d560020592 fix(gcp): match enable-oslogin metadata case-insensitively (#11341)
Co-authored-by: Hugo Pereira Brito <101209179+HugoPBrito@users.noreply.github.com>
2026-05-26 10:35:26 +02:00
Pedro Martín 00451f8239 feat(compliance): add AWS AI Security Framework for AWS (#11353) 2026-05-26 10:20:39 +02:00
Adrián Peña 329dfdf8e6 perf(api): reduce DB load in scan hot loop by 13x (#11249)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
2026-05-25 19:09:28 +02:00
Hugo Pereira Brito 4c59af93eb fix(azure): require all SMB channel encryption algorithms to be secure (storage_smb_channel_encryption_with_secure_algorithm) (#11327)
Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
2026-05-25 18:28:21 +02:00
Hugo Pereira Brito 6ca8e726f7 feat(azure): add storage_account_public_network_access_disabled and fix CIS storage mapping (#11334) 2026-05-25 18:17:41 +02:00
Pepe Fagoaga 546eb2d85a chore: changelog v5.28.1 (#11347) 2026-05-25 10:18:42 +02:00
Alan Buscaglia ec3efc94f5 chore(ui): add changelog for scan report fix (#11338) 2026-05-22 15:09:44 +02:00
Alan Buscaglia 6cffd0d17f fix(ui): stream scan report downloads (#11330) 2026-05-22 14:05:00 +02:00
Josema Camacho 528d32601b perf(api): speed up finding-groups endpoint for finding-level filters (#11326) 2026-05-22 13:59:05 +02:00
Prowler Bot 56b3044aae chore(release): Bump versions to v5.29.0 (#11332)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2026-05-22 13:34:30 +02:00
Alejandro Bailo 3a096b1750 refactor(ui): improve resource detail and tab UX (#11325) 2026-05-22 12:03:03 +02:00
146 changed files with 7080 additions and 739 deletions
+1 -1
View File
@@ -145,7 +145,7 @@ SENTRY_RELEASE=local
NEXT_PUBLIC_SENTRY_ENVIRONMENT=${SENTRY_ENVIRONMENT}
#### Prowler release version ####
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.28.0
NEXT_PUBLIC_PROWLER_RELEASE_VERSION=v5.29.0
# Social login credentials
SOCIAL_GOOGLE_OAUTH_CALLBACK_URL="${AUTH_URL}/api/auth/callback/google"
+16 -1
View File
@@ -2,6 +2,22 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.30.0] (Prowler UNRELEASED)
### 🔄 Changed
- Scan finding ingestion: bulk-resolve `Resource`/`ResourceTag` rows, replace per-mapping `SELECT FOR UPDATE` with deferred `ResourceTagMapping.bulk_create(ignore_conflicts=True)`, wrap each micro-batch in a single `rls_transaction`, and raise `SCAN_DB_BATCH_SIZE` to 1000 [(#11249)](https://github.com/prowler-cloud/prowler/pull/11249)
---
## [1.29.1] (Prowler v5.28.1)
### 🐞 Fixed
- `finding-groups` slow response with finding-level filters such as `region`; check title and description are now read from the daily summaries, which drops sorting by `check_title` [(#11326)](https://github.com/prowler-cloud/prowler/pull/11326)
---
## [1.29.0] (Prowler v5.28.0)
### 🚀 Added
@@ -29,7 +45,6 @@ All notable changes to the **Prowler API** are documented in this file.
- `perform_scan_task` and `perform_scheduled_scan_task` now short-circuit with a warning and `return None` when the target provider no longer exists, instead of letting `handle_provider_deletion` raise `ProviderDeletedException`. `perform_scheduled_scan_task` also removes any orphan `PeriodicTask` it finds so beat stops re-firing scans for deleted providers. Prevents queued messages for deleted providers from being recorded as `FAILURE` [(#11185)](https://github.com/prowler-cloud/prowler/pull/11185)
- Attack Paths: `BEDROCK-001` and `BEDROCK-002` now target roles trusting `bedrock-agentcore.amazonaws.com` instead of `bedrock.amazonaws.com`, eliminating false positives against regular Bedrock service roles (Agents, Knowledge Bases, model invocation) [(#11141)](https://github.com/prowler-cloud/prowler/pull/11141)
---
## [1.27.1] (Prowler v5.26.1)
+1 -1
View File
@@ -68,7 +68,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.29.0"
version = "1.30.0"
[tool.uv]
# Transitive pins matching master to avoid silent drift; bump deliberately.
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.29.0
version: 1.30.0
description: |-
Prowler API specification.
+20 -6
View File
@@ -15921,6 +15921,12 @@ class TestFindingGroupViewSet:
assert attrs["fail_count"] == 0
assert attrs["resources_total"] == 1
assert attrs["resources_fail"] == 0
# check_title / check_description are resolved post-pagination from the
# summary table, not from the finding's check_metadata.
assert attrs["check_title"] == "Ensure EC2 instances do not have public IPs"
assert (
attrs["check_description"] == "EC2 instances should use private IPs only."
)
def test_finding_groups_status_pass_when_no_fail(
self, authenticated_client, finding_groups_fixture
@@ -17162,6 +17168,12 @@ class TestFindingGroupViewSet:
assert attrs["fail_count"] == 0
assert attrs["resources_total"] == 1
assert attrs["resources_fail"] == 0
# check_title / check_description are resolved post-pagination from the
# summary table, not from the finding's check_metadata.
assert attrs["check_title"] == "Ensure EC2 instances do not have public IPs"
assert (
attrs["check_description"] == "EC2 instances should use private IPs only."
)
def test_finding_groups_latest_status_in_filter(
self, authenticated_client, finding_groups_fixture
@@ -17419,18 +17431,20 @@ class TestFindingGroupViewSet:
check_ids = [item["id"] for item in data]
assert check_ids == sorted(check_ids)
def test_finding_groups_latest_sort_by_check_title(
def test_finding_groups_latest_sort_by_check_title_not_supported(
self, authenticated_client, finding_groups_fixture
):
"""Test /latest supports sorting by check_title."""
"""check_title is not a sortable field for finding groups.
Titles live in the TOASTed check_metadata blob and are resolved after
pagination from the summary table, so they cannot drive DB-level
ordering. Requesting that sort is rejected.
"""
response = authenticated_client.get(
reverse("finding-group-latest"),
{"sort": "check_title"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
check_titles = [item["attributes"]["check_title"] for item in data]
assert check_titles == sorted(check_titles)
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.parametrize(
"endpoint_name", ["finding-group-list", "finding-group-latest"]
+39 -11
View File
@@ -7369,6 +7369,15 @@ class FindingGroupViewSet(BaseRLSViewSet):
output_field=IntegerField(),
)
# `check_title` / `check_description` are intentionally NOT resolved
# here. They live in the large JSONB `check_metadata` blob (TOASTed),
# so reading them per finding row is very expensive, and pulling them
# in via a correlated subquery makes Django add the subquery to GROUP
# BY, which re-evaluates it once per input row. They are identical for
# every finding of a `check_id`, so `_post_process_aggregation` fills
# them from the summary table's plain columns in a single batched
# lookup scoped to the paginated page.
# `pass_count`, `fail_count` and `manual_count` only count non-muted
# findings. Muted findings are tracked separately via the
# `*_muted_count` fields.
@@ -7439,15 +7448,6 @@ class FindingGroupViewSet(BaseRLSViewSet):
agg_failing_since=Min(
"first_seen_at", filter=Q(status="FAIL", muted=False)
),
check_title=Coalesce(
Max(KeyTextTransform("checktitle", "check_metadata")),
Max(KeyTextTransform("CheckTitle", "check_metadata")),
Max(KeyTextTransform("Checktitle", "check_metadata")),
),
check_description=Coalesce(
Max(KeyTextTransform("description", "check_metadata")),
Max(KeyTextTransform("Description", "check_metadata")),
),
)
.annotate(
# Group is muted only if it has zero non-muted findings.
@@ -7503,9 +7503,38 @@ class FindingGroupViewSet(BaseRLSViewSet):
- Computes aggregated status (FAIL > PASS > MANUAL); the orthogonal
``muted`` boolean is already on the row from the SQL aggregation
- Converts provider string to list
- Fills check_title / check_description for the findings path
"""
rows = list(aggregated_data)
# The findings-aggregation path omits check_title / check_description
# (they sit in TOASTed JSONB; see _aggregate_findings). Fill them from
# the summary table's plain columns in one query scoped to this page.
# The summary-aggregation path already carries them, so skip it there.
if rows and "check_title" not in rows[0]:
check_ids = [row["check_id"] for row in rows]
role = get_role(self.request.user, self.request.tenant_id)
summaries = FindingGroupDailySummary.objects.filter(
tenant_id=self.request.tenant_id,
check_id__in=check_ids,
)
# Scope to the user's providers, mirroring get_queryset(), so titles
# are read only from providers the user can see.
if not role.unlimited_visibility:
summaries = summaries.filter(provider__in=get_providers(role))
metadata_by_check = {
item["check_id"]: item
for item in summaries.order_by("check_id", "-inserted_at")
.distinct("check_id")
.values("check_id", "check_title", "check_description")
}
for row in rows:
metadata = metadata_by_check.get(row["check_id"], {})
row["check_title"] = metadata.get("check_title")
row["check_description"] = metadata.get("check_description")
results = []
for row in aggregated_data:
for row in rows:
# Convert severity order back to string
severity_order = row.get("severity_order", 1)
row["severity"] = SEVERITY_ORDER_REVERSE.get(
@@ -7551,7 +7580,6 @@ class FindingGroupViewSet(BaseRLSViewSet):
_FINDING_GROUP_SORT_MAP = {
"check_id": "check_id",
"check_title": "check_title",
"severity": "severity_order",
"status": "status_order",
"muted": "muted",
+455 -272
View File
@@ -42,7 +42,6 @@ from api.db_utils import (
SET_CONFIG_QUERY,
psycopg_connection,
rls_transaction,
update_objects_in_batches,
)
from api.exceptions import ProviderConnectionError
from api.models import (
@@ -59,6 +58,7 @@ from api.models import (
ResourceFindingMapping,
ResourceScanSummary,
ResourceTag,
ResourceTagMapping,
Scan,
ScanCategorySummary,
ScanGroupSummary,
@@ -97,8 +97,16 @@ COMPLIANCE_REQUIREMENT_COPY_COLUMNS = (
)
# Controls how many findings we process per micro-batch before flushing to DB writes
FINDINGS_MICRO_BATCH_SIZE = env.int("DJANGO_FINDINGS_MICRO_BATCH_SIZE", default=3000)
# Controls how many rows each ORM bulk_create/bulk_update call sends to Postgres
SCAN_DB_BATCH_SIZE = env.int("DJANGO_SCAN_DB_BATCH_SIZE", default=500)
# Controls how many rows each ORM bulk_create/bulk_update call sends to Postgres.
SCAN_DB_BATCH_SIZE = env.int("DJANGO_SCAN_DB_BATCH_SIZE", default=1000)
# Throttle scan progress persistence: minimum progress delta (fraction 0-1)
# between two persisted progress updates.
PROGRESS_THROTTLE_DELTA = env.float("DJANGO_SCAN_PROGRESS_THROTTLE_DELTA", default=0.01)
# Throttle scan progress persistence: maximum seconds without persisting progress
# regardless of delta (so slow checks still show progress in the UI).
PROGRESS_THROTTLE_SECONDS = env.float(
"DJANGO_SCAN_PROGRESS_THROTTLE_SECONDS", default=10.0
)
ATTACK_SURFACE_PROVIDER_COMPATIBILITY = {
"internet-exposed": None, # Compatible with all providers
@@ -528,16 +536,26 @@ def _process_finding_micro_batch(
"""
# Accumulate objects for bulk operations
findings_to_create = []
mappings_to_create = []
dirty_resources = {}
resources_with_new_tag_mappings: set[str] = set()
resource_denormalized_data = [] # (finding_instance, resource_instance) pairs
tag_mappings_to_create: list[ResourceTagMapping] = []
skipped_findings_count = 0 # Track findings skipped due to UID length
# Prefetch last statuses for all findings in this batch
# TEMPORARY WORKAROUND: Filter out UIDs > 300 chars to avoid query errors
finding_uids = [
f.uid for f in findings_batch if f is not None and len(f.uid) <= 300
]
# Separate findings into those persistable (uid <= 300) and over-limit.
# Resources/tags ARE still resolved for over-limit findings to preserve the
# original behavior (resources are persisted even when their finding is dropped).
non_null_findings = [f for f in findings_batch if f is not None]
persistable_findings = [f for f in non_null_findings if len(f.uid) <= 300]
skipped_findings_count = len(non_null_findings) - len(persistable_findings)
none_count = len(findings_batch) - len(non_null_findings)
if none_count:
logger.error(
f"{none_count} None finding(s) detected on scan {scan_instance.id}."
)
# Prefetch last statuses for all persistable findings in this batch (read replica)
finding_uids = [f.uid for f in persistable_findings]
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
last_statuses = {
item["uid"]: (item["status"], item["first_seen_at"])
@@ -548,281 +566,411 @@ def _process_finding_micro_batch(
.order_by("uid", "-inserted_at")
.distinct("uid")
}
# Update cache
for uid, data in last_statuses.items():
if uid not in last_status_cache:
last_status_cache[uid] = data
# Process each finding in the batch
for finding in findings_batch:
if finding is None:
logger.error(f"None finding detected on scan {scan_instance.id}.")
continue
# All DB writes for this micro-batch run inside ONE rls_transaction,
# with deadlock-retry at micro-batch granularity instead of per-finding.
for attempt in range(CELERY_DEADLOCK_ATTEMPTS):
try:
with rls_transaction(tenant_id):
# 1) Pre-resolve Resources in bulk
# Collect all uids referenced by this batch that are not in cache yet.
# NOTE: we intentionally include empty-string uids here. The SDK
# explicitly emits findings with `resource_uid=""` for some flows
# (IaC scans, some Azure/GCP/K8s checks). The original
# `get_or_create` behavior was to create/share a Resource with
# uid="" for these findings rather than dropping them. Preserve
# that behavior; do NOT filter by truthiness.
batch_resource_uids: set[str] = set()
for f in non_null_findings:
if f.resource_uid not in resource_cache:
batch_resource_uids.add(f.resource_uid)
# Process resource with deadlock retry
for attempt in range(CELERY_DEADLOCK_ATTEMPTS):
try:
with rls_transaction(tenant_id):
resource_uid = finding.resource_uid
if resource_uid not in resource_cache:
check_metadata = finding.get_metadata()
group = check_metadata.get("resourcegroup") or None
resource_instance, _ = Resource.objects.get_or_create(
if batch_resource_uids:
existing_resources = {
r.uid: r
for r in Resource.objects.filter(
tenant_id=tenant_id,
provider=provider_instance,
uid=resource_uid,
defaults={
"region": finding.region,
"service": finding.service_name,
"type": finding.resource_type,
"name": finding.resource_name,
"groups": [group] if group else None,
},
provider_id=provider_instance.id,
uid__in=batch_resource_uids,
)
resource_cache[resource_uid] = resource_instance
resource_failed_findings_cache[resource_uid] = 0
else:
resource_instance = resource_cache[resource_uid]
break
except (OperationalError, IntegrityError) as db_err:
if attempt < CELERY_DEADLOCK_ATTEMPTS - 1:
logger.warning(
f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} "
f"detected when processing resource {resource_uid} on scan {scan_instance.id}. Retrying..."
}
missing_uids = batch_resource_uids - existing_resources.keys()
if missing_uids:
# Build defaults from the first finding referencing each uid.
first_finding_per_uid: dict[str, ProwlerFinding] = {}
for f in non_null_findings:
if f.resource_uid in missing_uids:
first_finding_per_uid.setdefault(f.resource_uid, f)
resources_to_create = []
for uid in missing_uids:
f = first_finding_per_uid[uid]
check_metadata = f.get_metadata()
group = check_metadata.get("resourcegroup") or None
resources_to_create.append(
Resource(
tenant_id=tenant_id,
provider=provider_instance,
uid=uid,
region=f.region,
service=f.service_name,
type=f.resource_type,
name=f.resource_name,
groups=[group] if group else None,
)
)
Resource.objects.bulk_create(
resources_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "provider_id", "uid"],
)
# Re-fetch to obtain instances we just created AND any
# created concurrently by another scan against the same provider.
existing_resources.update(
{
r.uid: r
for r in Resource.objects.filter(
tenant_id=tenant_id,
provider_id=provider_instance.id,
uid__in=missing_uids,
)
}
)
for uid, r in existing_resources.items():
resource_cache[uid] = r
resource_failed_findings_cache.setdefault(uid, 0)
# 2) Pre-resolve ResourceTags in bulk
batch_tag_kv: set[tuple[str, str]] = set()
for f in non_null_findings:
for k, v in f.resource_tags.items():
if (k, v) not in tag_cache:
batch_tag_kv.add((k, v))
if batch_tag_kv:
keys_to_query = {k for k, _ in batch_tag_kv}
existing_tags = {
(t.key, t.value): t
for t in ResourceTag.objects.filter(
tenant_id=tenant_id, key__in=keys_to_query
)
if (t.key, t.value) in batch_tag_kv
}
missing_kv = batch_tag_kv - existing_tags.keys()
if missing_kv:
ResourceTag.objects.bulk_create(
[
ResourceTag(tenant_id=tenant_id, key=k, value=v)
for k, v in missing_kv
],
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "key", "value"],
)
existing_tags.update(
{
(t.key, t.value): t
for t in ResourceTag.objects.filter(
tenant_id=tenant_id,
key__in={k for k, _ in missing_kv},
)
if (t.key, t.value) in missing_kv
}
)
tag_cache.update(existing_tags)
# 3) Per-finding in-memory processing
for finding in non_null_findings:
resource_uid = finding.resource_uid
resource_instance = resource_cache.get(resource_uid)
if resource_instance is None:
# Should be unreachable after the pre-resolve step. Defensive log.
logger.error(
f"Resource {resource_uid} missing from cache after pre-resolve "
f"on scan {scan_instance.id}; skipping finding."
)
continue
# Detect resource field changes (defer save until end-of-batch bulk_update).
check_metadata = finding.get_metadata()
group = check_metadata.get("resourcegroup") or None
updated = False
if finding.region and resource_instance.region != finding.region:
resource_instance.region = finding.region
updated = True
if resource_instance.service != finding.service_name:
resource_instance.service = finding.service_name
updated = True
if resource_instance.type != finding.resource_type:
resource_instance.type = finding.resource_type
updated = True
if resource_instance.metadata != finding.resource_metadata:
resource_instance.metadata = json.dumps(
finding.resource_metadata, cls=CustomEncoder
)
updated = True
if resource_instance.details != finding.resource_details:
resource_instance.details = finding.resource_details
updated = True
if resource_instance.partition != finding.partition:
resource_instance.partition = finding.partition
updated = True
if group and (
not resource_instance.groups
or group not in resource_instance.groups
):
resource_instance.groups = (resource_instance.groups or []) + [
group
]
updated = True
if updated:
dirty_resources[resource_uid] = resource_instance
# Accumulate ResourceTagMapping rows; bulk_create at end of block.
for k, v in finding.resource_tags.items():
tag_instance = tag_cache.get((k, v))
if tag_instance is None:
# Should not happen after pre-resolve; skip defensively.
continue
tag_mappings_to_create.append(
ResourceTagMapping(
tenant_id=tenant_id,
resource=resource_instance,
tag=tag_instance,
)
)
unique_resources.add(
(resource_instance.uid, resource_instance.region)
)
time.sleep(0.1 * (2**attempt))
continue
else:
raise db_err
# Track resource field changes (defer save)
updated = False
check_metadata = finding.get_metadata()
group = check_metadata.get("resourcegroup") or None
if finding.region and resource_instance.region != finding.region:
resource_instance.region = finding.region
updated = True
if resource_instance.service != finding.service_name:
resource_instance.service = finding.service_name
updated = True
if resource_instance.type != finding.resource_type:
resource_instance.type = finding.resource_type
updated = True
if resource_instance.metadata != finding.resource_metadata:
resource_instance.metadata = json.dumps(
finding.resource_metadata, cls=CustomEncoder
)
updated = True
if resource_instance.details != finding.resource_details:
resource_instance.details = finding.resource_details
updated = True
if resource_instance.partition != finding.partition:
resource_instance.partition = finding.partition
updated = True
if group and (
not resource_instance.groups or group not in resource_instance.groups
):
resource_instance.groups = (resource_instance.groups or []) + [group]
updated = True
# TEMPORARY WORKAROUND: Skip findings with UID > 300 chars
# TODO: Remove this after implementing text field migration for finding.uid
if len(finding.uid) > 300:
logger.warning(
f"Skipping finding with UID exceeding 300 characters. "
f"Length: {len(finding.uid)}, "
f"Check: {finding.check_id}, "
f"Resource: {finding.resource_name}, "
f"UID: {finding.uid}"
)
continue
if updated:
dirty_resources[resource_uid] = resource_instance
# Process tags
tags = []
with rls_transaction(tenant_id):
for key, value in finding.resource_tags.items():
tag_key = (key, value)
if tag_key not in tag_cache:
tag_instance, _ = ResourceTag.objects.get_or_create(
tenant_id=tenant_id, key=key, value=value
finding_uid = finding.uid
last_status, last_first_seen_at = last_status_cache.get(
finding_uid, (None, None)
)
tag_cache[tag_key] = tag_instance
else:
tag_instance = tag_cache[tag_key]
tags.append(tag_instance)
resource_instance.upsert_or_delete_tags(tags=tags)
unique_resources.add((resource_instance.uid, resource_instance.region))
status = FindingStatus[finding.status]
delta = _create_finding_delta(last_status, status)
# Prepare finding data
finding_uid = finding.uid
if not last_first_seen_at:
last_first_seen_at = datetime.now(tz=timezone.utc)
# TEMPORARY WORKAROUND: Skip findings with UID > 300 chars
# TODO: Remove this after implementing text field migration for finding.uid
if len(finding_uid) > 300:
skipped_findings_count += 1
logger.warning(
f"Skipping finding with UID exceeding 300 characters. "
f"Length: {len(finding_uid)}, "
f"Check: {finding.check_id}, "
f"Resource: {finding.resource_name}, "
f"UID: {finding_uid}"
)
continue
# Determine if finding should be muted and why
# Priority: mutelist processor (highest) > manual mute rules
is_muted = False
muted_reason = None
if finding.muted:
is_muted = True
muted_reason = "Muted by mutelist"
elif finding_uid in mute_rules_cache:
is_muted = True
muted_reason = mute_rules_cache[finding_uid]
last_status, last_first_seen_at = last_status_cache.get(
finding_uid, (None, None)
)
if status == FindingStatus.FAIL and not is_muted:
resource_failed_findings_cache[resource_uid] += 1
status = FindingStatus[finding.status]
delta = _create_finding_delta(last_status, status)
check_metadata["compliance"] = finding.compliance
finding_instance = Finding(
tenant_id=tenant_id,
uid=finding_uid,
delta=delta,
check_metadata=check_metadata,
status=status,
status_extended=finding.status_extended,
severity=finding.severity,
impact=finding.severity,
raw_result=finding.raw,
check_id=finding.check_id,
scan=scan_instance,
first_seen_at=last_first_seen_at,
muted=is_muted,
muted_at=datetime.now(tz=timezone.utc) if is_muted else None,
muted_reason=muted_reason,
compliance=finding.compliance,
categories=check_metadata.get("categories", []) or [],
resource_groups=check_metadata.get("resourcegroup") or None,
# Denormalized resource arrays populated directly on insert
# (was previously a separate bulk_update; saves a CASE WHEN
# over thousands of rows per micro-batch).
resource_regions=[resource_instance.region]
if resource_instance.region
else [],
resource_services=[resource_instance.service]
if resource_instance.service
else [],
resource_types=[resource_instance.type]
if resource_instance.type
else [],
)
findings_to_create.append(finding_instance)
resource_denormalized_data.append(
(finding_instance, resource_instance)
)
if not last_first_seen_at:
last_first_seen_at = datetime.now(tz=timezone.utc)
scan_resource_cache.add(
(
str(resource_instance.id),
resource_instance.service,
resource_instance.region,
resource_instance.type,
)
)
# Determine if finding should be muted and why
# Priority: mutelist processor (highest) > manual mute rules
is_muted = False
muted_reason = None
aggregate_category_counts(
categories=check_metadata.get("categories", []) or [],
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
cache=scan_categories_cache,
)
# Check mutelist processor first (highest priority)
if finding.muted:
is_muted = True
muted_reason = "Muted by mutelist"
# If not muted by mutelist, check manual mute rules
elif finding_uid in mute_rules_cache:
is_muted = True
muted_reason = mute_rules_cache[finding_uid]
aggregate_resource_group_counts(
resource_group=check_metadata.get("resourcegroup") or None,
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
resource_uid=resource_instance.uid if resource_instance else "",
cache=scan_resource_groups_cache,
group_resources_cache=group_resources_cache,
)
# Increment failed_findings_count cache if needed
if status == FindingStatus.FAIL and not is_muted:
resource_failed_findings_cache[resource_uid] += 1
# 4) Bulk create ResourceTagMappings
# Replaces the original per-resource `upsert_or_delete_tags`
# (which did one `update_or_create` + SELECT FOR UPDATE per mapping).
if tag_mappings_to_create:
# Pre-SELECT existing pairs: `bulk_create(ignore_conflicts=True)`
# does not populate `pk`, so we cannot tell new vs existing from
# the result; we need that to bump `updated_at` only on resources
# that actually gain a mapping.
candidate_resource_ids = {
m.resource_id for m in tag_mappings_to_create
}
candidate_tag_ids = {m.tag_id for m in tag_mappings_to_create}
existing_pairs = set(
ResourceTagMapping.objects.filter(
tenant_id=tenant_id,
resource_id__in=candidate_resource_ids,
tag_id__in=candidate_tag_ids,
).values_list("resource_id", "tag_id")
)
resource_uid_by_id = {
str(r.id): uid for uid, r in resource_cache.items()
}
for m in tag_mappings_to_create:
if (m.resource_id, m.tag_id) not in existing_pairs:
uid = resource_uid_by_id.get(str(m.resource_id))
if uid is not None:
resources_with_new_tag_mappings.add(uid)
# Create finding object (don't save yet)
check_metadata = finding.get_metadata()
check_metadata["compliance"] = finding.compliance
finding_instance = Finding(
tenant_id=tenant_id,
uid=finding_uid,
delta=delta,
check_metadata=check_metadata,
status=status,
status_extended=finding.status_extended,
severity=finding.severity,
impact=finding.severity,
raw_result=finding.raw,
check_id=finding.check_id,
scan=scan_instance,
first_seen_at=last_first_seen_at,
muted=is_muted,
muted_at=datetime.now(tz=timezone.utc) if is_muted else None,
muted_reason=muted_reason,
compliance=finding.compliance,
categories=check_metadata.get("categories", []) or [],
resource_groups=check_metadata.get("resourcegroup") or None,
)
findings_to_create.append(finding_instance)
resource_denormalized_data.append((finding_instance, resource_instance))
ResourceTagMapping.objects.bulk_create(
tag_mappings_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "resource_id", "tag_id"],
)
# Track for scan summary
scan_resource_cache.add(
(
str(resource_instance.id),
resource_instance.service,
resource_instance.region,
resource_instance.type,
)
)
# 5) Bulk create Findings
if findings_to_create:
Finding.objects.bulk_create(
findings_to_create, batch_size=SCAN_DB_BATCH_SIZE
)
# Track categories with counts for ScanCategorySummary by (category, severity)
aggregate_category_counts(
categories=check_metadata.get("categories", []) or [],
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
cache=scan_categories_cache,
)
# 6) Bulk create ResourceFindingMapping rows
mappings_to_create = [
ResourceFindingMapping(
tenant_id=tenant_id,
resource=resource_instance,
finding=finding_instance,
)
for finding_instance, resource_instance in resource_denormalized_data
]
if mappings_to_create:
created_mappings = ResourceFindingMapping.objects.bulk_create(
mappings_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "resource_id", "finding_id"],
)
inserted = sum(1 for m in created_mappings if m.pk)
if inserted != len(mappings_to_create):
logger.error(
f"scan {scan_instance.id}: expected "
f"{len(mappings_to_create)} ResourceFindingMapping rows, "
f"inserted {inserted}. Rolling back micro-batch."
)
# Track resource groups with counts for ScanGroupSummary
aggregate_resource_group_counts(
resource_group=check_metadata.get("resourcegroup") or None,
severity=finding.severity.value,
status=status.value,
delta=delta.value if delta else None,
muted=is_muted,
resource_uid=resource_instance.uid if resource_instance else "",
cache=scan_resource_groups_cache,
group_resources_cache=group_resources_cache,
)
# Bulk operations within single transaction
with rls_transaction(tenant_id):
# Bulk create findings
if findings_to_create:
Finding.objects.bulk_create(
findings_to_create, batch_size=SCAN_DB_BATCH_SIZE
)
# Bulk create resource-finding mappings
for finding_instance, resource_instance in resource_denormalized_data:
mappings_to_create.append(
ResourceFindingMapping(
tenant_id=tenant_id,
resource=resource_instance,
finding=finding_instance,
# 7) Bulk update Resources
# Union of:
# - resources whose fields changed (dirty_resources)
# - resources that got new tag mappings (need updated_at bump,
# preserving the original `self.save(update_fields=["updated_at"])`
# behavior of `upsert_or_delete_tags`)
all_resource_uids_to_touch = (
set(dirty_resources.keys()) | resources_with_new_tag_mappings
)
)
if mappings_to_create:
created_mappings = ResourceFindingMapping.objects.bulk_create(
mappings_to_create,
batch_size=SCAN_DB_BATCH_SIZE,
ignore_conflicts=True,
unique_fields=["tenant_id", "resource_id", "finding_id"],
)
inserted = sum(1 for m in created_mappings if m.pk)
if inserted != len(mappings_to_create):
logger.error(
f"scan {scan_instance.id}: expected "
f"{len(mappings_to_create)} ResourceFindingMapping rows, "
f"inserted {inserted}. Rolling back micro-batch."
if all_resource_uids_to_touch:
now_utc = datetime.now(tz=timezone.utc)
resources_to_bulk_update = []
for uid in all_resource_uids_to_touch:
# Use the instance from dirty_resources if present (has mutated
# fields), otherwise the cached one (for updated_at bump only).
r = dirty_resources.get(uid) or resource_cache.get(uid)
if r is None:
continue
# Manually bump updated_at since bulk_update bypasses auto_now.
r.updated_at = now_utc
resources_to_bulk_update.append(r)
if resources_to_bulk_update:
Resource.objects.bulk_update(
resources_to_bulk_update,
[
"metadata",
"details",
"partition",
"region",
"service",
"type",
"groups",
"updated_at",
],
batch_size=1000,
)
# Successful execution: leave deadlock retry loop.
break
except (OperationalError, IntegrityError) as db_err:
if attempt < CELERY_DEADLOCK_ATTEMPTS - 1:
logger.warning(
f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} "
f"on micro-batch for scan {scan_instance.id}. Retrying (attempt {attempt + 1})..."
)
# Update finding denormalized arrays
findings_to_update = []
for finding_instance, resource_instance in resource_denormalized_data:
if not finding_instance.resource_regions:
finding_instance.resource_regions = []
if not finding_instance.resource_services:
finding_instance.resource_services = []
if not finding_instance.resource_types:
finding_instance.resource_types = []
if resource_instance.region not in finding_instance.resource_regions:
finding_instance.resource_regions.append(resource_instance.region)
if resource_instance.service not in finding_instance.resource_services:
finding_instance.resource_services.append(resource_instance.service)
if resource_instance.type not in finding_instance.resource_types:
finding_instance.resource_types.append(resource_instance.type)
findings_to_update.append(finding_instance)
if findings_to_update:
Finding.objects.bulk_update(
findings_to_update,
["resource_regions", "resource_services", "resource_types"],
batch_size=SCAN_DB_BATCH_SIZE,
)
# Bulk update dirty resources
if dirty_resources:
update_objects_in_batches(
tenant_id=tenant_id,
model=Resource,
objects=list(dirty_resources.values()),
fields=[
"metadata",
"details",
"partition",
"region",
"service",
"type",
"groups",
],
batch_size=1000,
)
time.sleep(0.1 * (2**attempt))
# Clear accumulators that we appended to inside the failed transaction
# so the retry produces consistent results.
findings_to_create.clear()
resource_denormalized_data.clear()
tag_mappings_to_create.clear()
dirty_resources.clear()
resources_with_new_tag_mappings.clear()
continue
raise
# Log skipped findings summary
if skipped_findings_count > 0:
@@ -873,7 +1021,7 @@ def perform_prowler_scan(
scan_instance = Scan.objects.get(pk=scan_id)
scan_instance.state = StateChoices.EXECUTING
scan_instance.started_at = datetime.now(tz=timezone.utc)
scan_instance.save()
scan_instance.save(update_fields=["state", "started_at", "updated_at"])
# Find the mutelist processor if it exists
with rls_transaction(tenant_id, using=READ_REPLICA_ALIAS):
@@ -918,7 +1066,13 @@ def perform_prowler_scan(
provider_instance.connection_last_checked_at = datetime.now(
tz=timezone.utc
)
provider_instance.save()
provider_instance.save(
update_fields=[
"connected",
"connection_last_checked_at",
"updated_at",
]
)
# If the provider is not connected, raise an exception outside the transaction.
# If raised within the transaction, the transaction will be rolled back and the provider will not be marked
@@ -933,6 +1087,13 @@ def perform_prowler_scan(
last_status_cache = {}
resource_failed_findings_cache = defaultdict(int)
# Throttle scan_instance progress writes to avoid hammering the writer:
# only persist when progress moves by at least `PROGRESS_THROTTLE_DELTA`
# OR `PROGRESS_THROTTLE_SECONDS` have elapsed. The final progress (1.0)
# always persists in the `finally` block below.
last_persisted_progress = -1.0
last_persisted_progress_at = 0.0
for progress, findings in prowler_scan.scan():
# Process findings in micro-batches
findings_list = list(findings)
@@ -959,10 +1120,20 @@ def perform_prowler_scan(
group_resources_cache=group_resources_cache,
)
# Update scan progress
with rls_transaction(tenant_id):
scan_instance.progress = progress
scan_instance.save()
# Throttled progress save (the final save in the `finally` block
# below always runs regardless of throttle).
now = time.time()
progress_delta = progress - last_persisted_progress
elapsed = now - last_persisted_progress_at
if (
progress_delta >= PROGRESS_THROTTLE_DELTA
or elapsed >= PROGRESS_THROTTLE_SECONDS
):
with rls_transaction(tenant_id):
scan_instance.progress = progress
scan_instance.save(update_fields=["progress", "updated_at"])
last_persisted_progress = progress
last_persisted_progress_at = now
scan_instance.state = StateChoices.COMPLETED
@@ -976,13 +1147,16 @@ def perform_prowler_scan(
resources_to_update.append(resource_instance)
if resources_to_update:
update_objects_in_batches(
tenant_id=tenant_id,
model=Resource,
objects=resources_to_update,
fields=["failed_findings_count"],
batch_size=1000,
)
# Single rls_transaction wrapping the bulk_update (previously
# `update_objects_in_batches` opened one rls_transaction per
# chunk; for tenants with many resources this collapsed N
# BEGINs/COMMITs into 1).
with rls_transaction(tenant_id):
Resource.objects.bulk_update(
resources_to_update,
["failed_findings_count"],
batch_size=SCAN_DB_BATCH_SIZE,
)
except Exception as e:
logger.error(f"Error performing scan {scan_id}: {e}")
@@ -994,7 +1168,16 @@ def perform_prowler_scan(
scan_instance.duration = time.time() - start_time
scan_instance.completed_at = datetime.now(tz=timezone.utc)
scan_instance.unique_resource_count = len(unique_resources)
scan_instance.save()
scan_instance.save(
update_fields=[
"state",
"duration",
"completed_at",
"unique_resource_count",
"progress",
"updated_at",
]
)
if exception is not None:
raise exception
Generated
+1 -1
View File
@@ -4494,7 +4494,7 @@ dependencies = [
[[package]]
name = "prowler-api"
version = "1.29.0"
version = "1.30.0"
source = { virtual = "." }
dependencies = [
{ name = "cartography" },
@@ -118,8 +118,8 @@ To update the environment file:
Edit the `.env` file and change version values:
```env
PROWLER_UI_VERSION="5.27.0"
PROWLER_API_VERSION="5.27.0"
PROWLER_UI_VERSION="5.28.0"
PROWLER_API_VERSION="5.28.0"
```
<Note>
@@ -91,6 +91,7 @@ The following list includes all the Azure checks with configurable variables tha
| `sqlserver_recommended_minimal_tls_version` | `recommended_minimal_tls_versions` | List of Strings |
| `vm_sufficient_daily_backup_retention_period` | `vm_backup_min_daily_retention_days` | Integer |
| `vm_desired_sku_size` | `desired_vm_sku_sizes` | List of Strings |
| `storage_smb_channel_encryption_with_secure_algorithm` | `recommended_smb_channel_encryption_algorithms` | List of Strings |
| `defender_attack_path_notifications_properly_configured` | `defender_attack_path_minimal_risk_level` | String |
| `apim_threat_detection_llm_jacking` | `apim_threat_detection_llm_jacking_threshold` | Float |
| `apim_threat_detection_llm_jacking` | `apim_threat_detection_llm_jacking_minutes` | Integer |
@@ -165,6 +166,7 @@ The following list includes all the Okta checks with configurable variables that
| Check Name | Value | Type |
|---------------------------------------------------------------|------------------------------------|---------|
| `application_admin_console_session_idle_timeout_15min` | `okta_admin_console_idle_timeout_max_minutes` | Integer |
| `signon_global_session_idle_timeout_15min` | `okta_max_session_idle_minutes` | Integer |
## Config YAML File Structure
@@ -534,6 +536,18 @@ azure:
"1.3"
]
# Azure Storage
# azure.storage_smb_channel_encryption_with_secure_algorithm
# List of SMB channel encryption algorithms allowed on file shares. A storage
# account passes only if every enabled algorithm is in this list. Defaults to
# the value required by CIS (AES-256-GCM only, excluding weaker AES-128 ciphers).
recommended_smb_channel_encryption_algorithms:
[
"AES-256-GCM",
# "AES-128-CCM",
# "AES-128-GCM",
]
# Azure Virtual Machines
# azure.vm_desired_sku_size
# List of desired VM SKU sizes that are allowed in the organization
@@ -30,25 +30,49 @@ If a different authentication method is needed (SSWS API token, OAuth with user
### Required OAuth Scopes
The bundled signon checks require the following read-only scopes:
The bundled checks require the following read-only scopes:
- `okta.policies.read`
- `okta.brands.read`
- `okta.apps.read`
Additional scopes will be needed as more services and checks are added. These are the current ones needed:
| Scope | Used by |
|---|---|
| `okta.policies.read` | Sign-on / password / authentication policies |
| `okta.policies.read` | Sign-on, password, and authentication policies |
| `okta.brands.read` | Sign-in page customizations (DOD Notice and Consent Banner check) |
| `okta.apps.read` | First-party app settings (Okta Admin Console session), integrated app inventory, and the Authentication Policies bound to Okta applications |
### Required Admin Role
The service application must be assigned the built-in **Read-Only Administrator** role.
The service application must be assigned **one** of the following Okta admin roles:
Okta's Management API enforces a two-layer authorization model: an OAuth **scope** decides which API endpoints the token can call, and an **admin role** decides whether the call returns data. With only a scope granted, the token mint succeeds but every read returns `403 Forbidden`. The Read-Only Administrator role is the minimum that lets the granted `okta.*.read` scopes actually return configuration data to Prowler's checks — without it, the credential probe at provider startup fails and the scan never gets to evaluate any check.
- **Read-Only Administrator** — covers every `signon` check and runs `application_authentication_policy_network_zone_enforced` against the apps it can see. **Visibility caveat:** under Read-Only Administrator the `/api/v1/apps` endpoint returns only the apps the service application is itself assigned to — typically just the service app's own row (for example, `Prowler Scanner`). The check still produces a finding for that app, but the rest of the org's app inventory is invisible at this role level.
- **Super Administrator** — required additionally to evaluate five application-service checks that target Okta's first-party apps (Okta Admin Console, Okta Dashboard). With Super Administrator, `application_authentication_policy_network_zone_enforced` also evaluates the full org-wide app inventory instead of the service-app-only slice.
Read-Only Administrator is intentionally the narrowest role that satisfies this requirement and aligns with the least-privilege guidance in DISA STIG.
Okta's Management API enforces a two-layer authorization model: an OAuth **scope** decides which API endpoints the token can call, and an **admin role** decides whether the call returns data. With only a scope granted, the token mint succeeds but every read returns `403 Forbidden`. Read-Only Administrator is the minimum role that lets the granted `okta.*.read` scopes return configuration data to Prowler's checks; without it, the credential probe at provider startup fails and the scan never gets to evaluate any check.
#### When Super Administrator is required
Four checks need to resolve the Authentication Policy bound to Okta's first-party apps (Okta Admin Console, Okta Dashboard) and depend on `/api/v1/apps` returning those system apps — which Okta restricts to Super Administrator:
| Check | STIG |
|---|---|
| `application_admin_console_mfa_required` | V-273193 |
| `application_admin_console_phishing_resistant_authentication` | V-273191 |
| `application_dashboard_mfa_required` | V-273194 |
| `application_dashboard_phishing_resistant_authentication` | V-273190 |
Okta filters the first-party apps (`saasure`, `okta_enduser`) out of `/api/v1/apps` for every role below Super Administrator, so `okta.apps.read` alone is not enough. The `okta.apps.manageFirstPartyApps` permission exists only in the paid Okta Identity Governance role `ACCESS_REQUESTS_ADMIN` and cannot be added to custom roles ([Okta Permissions Catalog](https://developer.okta.com/docs/api/openapi/okta-management/guides/permissions)).
A fifth check — `application_admin_console_session_idle_timeout_15min` (STIG V-273187) — also requires Super Administrator: it calls `GET /api/v1/first-party-app-settings/admin-console`, which returns `403 E0000006` for every role below Super Administrator.
When the service app runs with Read-Only Administrator, the five checks listed in this section return **MANUAL** instead of PASS/FAIL — the rest of the scan keeps running.
<Note>
Read-Only Administrator stays the recommended default for the least-privilege framing that aligns with DISA STIG. Assign Super Administrator on a separate run when full coverage of the first-party app checks is needed.
</Note>
## Step-by-Step Setup
@@ -98,19 +122,21 @@ Okta displays the private key **only once**. If you close the modal without copy
### 5. Grant the required OAuth scopes
On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled signon checks require `okta.policies.read` and `okta.brands.read`.
On the app, open the **Okta API Scopes** tab and click **Grant** on every scope Prowler needs. The bundled checks require `okta.policies.read`, `okta.brands.read`, and `okta.apps.read`.
![Okta — grant OAuth scopes](/user-guide/providers/okta/images/grant-permissions.png)
### 6. Assign the Read-Only Administrator role
### 6. Assign an admin role
On the app, open the **Admin roles** tab and click **Edit assignments → Add assignment**:
- **Role:** Read-Only Administrator
- **Role:** Read-Only Administrator (default) — covers every `signon` check and runs the per-app network-zone check against the apps the service app can see (typically only the service app's own row).
- **Resources:** All resources
Save the changes.
To additionally evaluate the first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, and phishing-resistant authentication) and to widen the per-app network-zone check to the full org-wide app inventory, assign **Super Administrator** instead. Without Super Administrator, the five first-party checks return MANUAL and the network-zone check is limited to the service app's own visibility — the rest of the scan still runs. See [Required Admin Role](#required-admin-role) for the full breakdown.
![Okta — grant Read-Only role](/user-guide/providers/okta/images/grant-roles.png)
### 7. [Optional] Verify DPoP setting
@@ -132,8 +158,8 @@ export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem"
# or
export OKTA_PRIVATE_KEY="$(cat /secure/path/to/prowler-okta.pem)"
# Optional — defaults to "okta.policies.read,okta.brands.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read"
# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read"
uv run python prowler-cli.py okta
```
@@ -174,8 +200,12 @@ Prowler validates credentials at startup by listing one sign-on policy. This err
Raised when the credential probe succeeds at the OAuth layer but the request is rejected because the service app lacks the required scope or admin role:
- **`invalid_scope`** — one of the requested scopes (`okta.policies.read` or `okta.brands.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**.
- **`Forbidden` / `not authorized`** — the **Read-Only Administrator** role is not assigned to the service app. Assign it from **Admin roles**.
- **`invalid_scope`** — one of the requested scopes (`okta.policies.read`, `okta.brands.read`, or `okta.apps.read`) is not granted on the service app. Grant the missing scope from **Okta API Scopes**.
- **`Forbidden` / `not authorized`** — no admin role is assigned to the service app. Assign **Read-Only Administrator** (or **Super Administrator** for the first-party application checks) from **Admin roles**.
### Application-service checks return MANUAL on first-party apps
When the service app runs with Read-Only Administrator, the five application-service checks targeting the Okta Admin Console and Okta Dashboard return MANUAL. This is by design — Okta restricts the underlying endpoints (`/api/v1/first-party-app-settings/{appName}` and `/api/v1/apps` for first-party app `name` values `saasure` / `okta_enduser`) to **Super Administrator**. Assign the Super Administrator role to the service app to evaluate those checks. See [Required Admin Role](#required-admin-role) for the full list.
### `invalid_dpop_proof`
@@ -12,7 +12,7 @@ Set up authentication for Okta with the [Okta Authentication](/user-guide/provid
- An Okta organization. The UI examples below use **Identity Engine** terminology such as **Global Session Policy**; Classic Engine exposes the equivalent sign-on policy concepts under older names.
- A **Super Administrator** account on that organization for the one-time service-app setup.
- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read` and `okta.brands.read` scopes granted and the **Read-Only Administrator** role assigned.
- An **API Services** app integration in the Okta Admin Console with the `okta.policies.read`, `okta.brands.read`, and `okta.apps.read` scopes granted and an admin role assigned. **Read-Only Administrator** covers every `signon` check and runs the per-app network-zone check against the apps the service app can see (under Read-Only Administrator that is typically only the service app's own row — the rest of the org's app inventory stays invisible). **Super Administrator** is required additionally to evaluate the five first-party application checks (Okta Admin Console / Okta Dashboard idle timeout, MFA, phishing-resistant authentication) and to widen the network-zone check to the full app inventory — see [Okta Authentication](/user-guide/providers/okta/authentication#required-admin-role) for the full breakdown.
- Python 3.10+ and Prowler 5.27.0 or later installed locally.
<CardGroup cols={2}>
@@ -85,8 +85,8 @@ Follow the [Okta Authentication](/user-guide/providers/okta/authentication) guid
export OKTA_ORG_DOMAIN="acme.okta.com"
export OKTA_CLIENT_ID="0oa1234567890abcdef"
export OKTA_PRIVATE_KEY_FILE="/secure/path/to/prowler-okta.pem"
# Optional — defaults to "okta.policies.read,okta.brands.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read"
# Optional — defaults to "okta.policies.read,okta.brands.read,okta.apps.read"
export OKTA_SCOPES="okta.policies.read,okta.brands.read,okta.apps.read"
```
The private key file may contain either a PEM-encoded RSA key or a JWK JSON document.
@@ -128,6 +128,9 @@ okta:
# okta.signon_global_session_idle_timeout_15min
# Defaults to 15 minutes per DISA STIG V-273186.
okta_max_session_idle_minutes: 15
# okta.application_admin_console_session_idle_timeout_15min
# Defaults to 15 minutes per DISA STIG V-273187.
okta_admin_console_idle_timeout_max_minutes: 15
```
To use a custom configuration:
@@ -140,9 +143,10 @@ prowler okta --config-file /path/to/config.yaml
Prowler for Okta includes security checks across the following services:
| Service | Description |
| ----------- | ----------------------------------------------------------------------------------- |
| **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) |
| Service | Description |
| --------------- | -------------------------------------------------------------------------------------------------------------------------------------- |
| **Sign-On** | Global session policy controls (idle timeout, lifetime, rule priority and ordering) |
| **Application** | Okta Admin Console sign-on settings plus Authentication Policy controls for Okta applications (session idle, MFA, phishing resistance, network zones) |
## Troubleshooting
@@ -154,10 +158,11 @@ This is stricter than simply finding the same timeout value somewhere else in th
### Default Scopes
Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover the bundled signon checks:
Prowler requests a fixed set of OAuth scopes on every token exchange. The defaults cover every bundled check across the Sign-On and Application services:
- `okta.policies.read`
- `okta.brands.read`
- `okta.apps.read`
The service app must have these scopes granted in the **Okta API Scopes** tab. When the granted set is narrower than the requested set, the token request fails with an `invalid_scope` error and the scan stops at provider initialization.
+10
View File
@@ -2,6 +2,14 @@
All notable changes to the **Prowler MCP Server** are documented in this file.
## [0.7.2] (Prowler v5.28.1)
### 🐞 Fixed
- Preserve authorization header in HTTP mode [(#11366)](https://github.com/prowler-cloud/prowler/pull/11366)
---
## [0.7.1] (Prowler v5.28.0)
### 🔐 Security
@@ -44,6 +52,8 @@ All notable changes to the **Prowler MCP Server** are documented in this file.
- Attack Path tool to get Neo4j DB schema [(#10321)](https://github.com/prowler-cloud/prowler/pull/10321)
---
## [0.4.0] (Prowler v5.19.0)
### 🚀 Added
@@ -5,6 +5,7 @@ from datetime import datetime
from typing import Dict, Optional
from fastmcp.server.dependencies import get_http_headers
from prowler_mcp_server import __version__
from prowler_mcp_server.lib.logger import logger
@@ -68,7 +69,7 @@ class ProwlerAppAuth:
async def authenticate(self) -> str:
"""Authenticate and return token (API key for STDIO, API key or JWT for HTTP)."""
if self.mode == "http":
headers = get_http_headers()
headers = get_http_headers(include={"authorization"})
authorization_header = headers.get("authorization", None)
if not authorization_header:
+25
View File
@@ -2,6 +2,31 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.29.0] (Prowler UNRELEASED)
### 🚀 Added
- `application` service for Okta provider with `application_admin_console_session_idle_timeout_15min`, `application_admin_console_mfa_required`, `application_admin_console_phishing_resistant_authentication`, `application_dashboard_mfa_required`, `application_dashboard_phishing_resistant_authentication`, and `application_authentication_policy_network_zone_enforced` checks [(#11358)](https://github.com/prowler-cloud/prowler/pull/11358)
- AWS AI Security Framework compliance for AWS provider [(#11353)](https://github.com/prowler-cloud/prowler/pull/11353)
- `storage_account_public_network_access_disabled` check for Azure provider and remapped the Azure CIS "Public Network Access is Disabled" requirements to it [(#11334)](https://github.com/prowler-cloud/prowler/pull/11334)
### 🐞 Fixed
- ENS RD 311/2022 (AWS) compliance mapping: `vpc_different_regions` was uncorrectly mapped under the `mp.com.4` family (Network segregation). That check is now mapped to a new `op.cont.2.aws.vpc.1` requirement under the Continuity of Service control [(#11372)](https://github.com/prowler-cloud/prowler/pull/11372)
- Compliance CSV row count now matches the UI per requirement by sourcing rows from the framework JSON's `requirement.Checks` instead of the stale `finding.compliance` snapshot [(#11370)](https://github.com/prowler-cloud/prowler/pull/11370)
---
## [5.28.1] (Prowler 5.28.1)
### 🐞 Fixed
- `compute_project_os_login_enabled` and `compute_project_os_login_2fa_enabled` checks for GCP provider no longer false-FAIL on projects where the `enable-oslogin` / `enable-oslogin-2fa` metadata is not set explicitly but is inherited automatically from the `constraints/compute.requireOsLogin` org policy. The policy controller writes the inherited value in lowercase (`"true"`), but the service-layer parser compared it to the uppercase string literal `"TRUE"`. Comparison is now case-insensitive [(#11341)](https://github.com/prowler-cloud/prowler/pull/11341)
- `storage_smb_channel_encryption_with_secure_algorithm` check for Azure provider no longer passes when a storage account allows a weak SMB channel encryption algorithm (e.g. `AES-128-CCM`/`AES-128-GCM`) alongside `AES-256-GCM`; it now requires every enabled algorithm to be in the recommended list, configurable via `azure.recommended_smb_channel_encryption_algorithms` (defaults to `AES-256-GCM` only, as required by CIS) [(#11327)](https://github.com/prowler-cloud/prowler/pull/11327)
- Azure and M365 providers crashing with `RuntimeError: There is no current event loop` on Python 3.12 when called from threads without an active event loop (e.g. Celery workers) [(#11360)](https://github.com/prowler-cloud/prowler/pull/11360)
---
## [5.28.0] (Prowler v5.28.0)
### 🚀 Added
File diff suppressed because it is too large Load Diff
+26 -4
View File
@@ -2539,8 +2539,7 @@
}
],
"Checks": [
"vpc_subnet_separate_private_public",
"vpc_different_regions"
"vpc_subnet_separate_private_public"
]
},
{
@@ -2593,8 +2592,8 @@
}
],
"Checks": [
"vpc_subnet_different_az",
"vpc_different_regions"
"vpc_different_regions",
"vpc_subnet_different_az"
]
},
{
@@ -4262,6 +4261,29 @@
],
"Checks": []
},
{
"Id": "op.cont.2.aws.vpc.1",
"Description": "Plan de continuidad",
"Attributes": [
{
"IdGrupoControl": "op.cont.2",
"Marco": "operacional",
"Categoria": "continuidad del servicio",
"DescripcionControl": "Distribución de las VPCs entre múltiples regiones y zonas de disponibilidad de AWS para garantizar la continuidad del servicio ante fallos regionales o zonales.",
"Nivel": "alto",
"Tipo": "requisito",
"Dimensiones": [
"disponibilidad"
],
"ModoEjecucion": "automático",
"Dependencias": []
}
],
"Checks": [
"vpc_different_regions",
"vpc_subnet_different_az"
]
},
{
"Id": "op.cont.3.aws.drs.1",
"Description": "Pruebas periódicas",
+1 -1
View File
@@ -1383,7 +1383,7 @@
"Id": "3.7",
"Description": "Ensure that 'Public Network Access' is `Disabled' for storage accounts",
"Checks": [
"storage_blob_public_access_level_is_disabled"
"storage_account_public_network_access_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -1651,7 +1651,7 @@
"Id": "4.6",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_blob_public_access_level_is_disabled"
"storage_account_public_network_access_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -3021,7 +3021,7 @@
"Id": "10.3.2.2",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_blob_public_access_level_is_disabled"
"storage_account_public_network_access_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -3182,7 +3182,7 @@
"Id": "9.3.2.2",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_blob_public_access_level_is_disabled"
"storage_account_public_network_access_disabled"
],
"Attributes": [
{
@@ -459,7 +459,7 @@
"Id": "2.2.6",
"Description": "Ensure that 'Public Network Access' is 'Disabled' for storage accounts",
"Checks": [
"storage_blob_public_access_level_is_disabled"
"storage_account_public_network_access_disabled"
],
"Attributes": [
{
+1 -1
View File
@@ -48,7 +48,7 @@ class _MutableTimestamp:
timestamp = _MutableTimestamp(datetime.today())
timestamp_utc = _MutableTimestamp(datetime.now(timezone.utc))
prowler_version = "5.28.0"
prowler_version = "5.29.0"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://raw.githubusercontent.com/prowler-cloud/prowler/dc7d2d5aeb92fdf12e8604f42ef6472cd3e8e889/docs/img/prowler-logo-black.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
+18
View File
@@ -467,6 +467,18 @@ azure:
"1.3",
]
# Azure Storage
# azure.storage_smb_channel_encryption_with_secure_algorithm
# List of SMB channel encryption algorithms allowed on file shares. A storage
# account passes only if every enabled algorithm is in this list. Defaults to
# the value required by CIS (AES-256-GCM only, excluding weaker AES-128 ciphers).
recommended_smb_channel_encryption_algorithms:
[
"AES-256-GCM",
# "AES-128-CCM",
# "AES-128-GCM",
]
# Azure Virtual Machines
# azure.vm_desired_sku_size
# List of desired VM SKU sizes that are allowed in the organization
@@ -657,3 +669,9 @@ okta:
# 15 per DISA STIG V-273186 (OKTA-APP-000020); raise it only with an
# explicit risk acceptance.
okta_max_session_idle_minutes: 15
# Okta Applications
# okta.application_admin_console_session_idle_timeout_15min
# Maximum acceptable Okta Admin Console app idle timeout, in minutes.
# Defaults to 15 per DISA STIG V-273187 (OKTA-APP-000025); raise it only
# with an explicit risk acceptance.
okta_admin_console_idle_timeout_max_minutes: 15
@@ -37,9 +37,9 @@ class ASDEssentialEightAWS(ComplianceOutput):
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ASDEssentialEightAWSModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class AWSWellArchitected(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSWellArchitectedModel(
Provider=finding.provider,
+2 -3
View File
@@ -35,10 +35,9 @@ class AWSC5(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSC5Model(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AzureC5(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureC5Model(
Provider=finding.provider,
+2 -3
View File
@@ -35,10 +35,9 @@ class GCPC5(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPC5Model(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class CCC_AWS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = CCC_AWSModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class CCC_Azure(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = CCC_AzureModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class CCC_GCP(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = CCC_GCPModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AlibabaCloudCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AlibabaCloudCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AWSCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AzureCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GCPCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GithubCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GithubCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GoogleWorkspaceCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GoogleWorkspaceCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class KubernetesCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = KubernetesCISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class M365CIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = M365CISModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class OracleCloudCIS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = OracleCloudCISModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class GoogleWorkspaceCISASCuBA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GoogleWorkspaceCISASCuBAModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AlibabaCloudCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AlibabaCloudCSAModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AWSCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSCSAModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AzureCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureCSAModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GCPCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPCSAModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class OracleCloudCSA(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = OracleCloudCSAModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AWSENS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSENSModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AzureENS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureENSModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GCPENS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPENSModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GenericCompliance(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GenericComplianceModel(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AWSISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSISO27001Model(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AzureISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AzureISO27001Model(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class GCPISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = GCPISO27001Model(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class KubernetesISO27001(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = KubernetesISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,9 @@ class M365ISO27001(ComplianceOutput):
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = M365ISO27001Model(
Provider=finding.provider,
@@ -35,9 +35,9 @@ class NHNISO27001(ComplianceOutput):
- None
"""
for finding in findings:
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = NHNISO27001Model(
Provider=finding.provider,
@@ -35,10 +35,9 @@ class AWSKISAISMSP(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = AWSKISAISMSPModel(
Provider=finding.provider,
@@ -36,10 +36,9 @@ class AWSMitreAttack(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
compliance_row = AWSMitreAttackModel(
Provider=finding.provider,
Description=compliance.Description,
@@ -36,10 +36,9 @@ class AzureMitreAttack(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
compliance_row = AzureMitreAttackModel(
Provider=finding.provider,
Description=compliance.Description,
@@ -36,10 +36,9 @@ class GCPMitreAttack(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
compliance_row = GCPMitreAttackModel(
Provider=finding.provider,
Description=compliance.Description,
@@ -37,10 +37,9 @@ class ProwlerThreatScoreAlibaba(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAlibabaModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class ProwlerThreatScoreAWS(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAWSModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class ProwlerThreatScoreAzure(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreAzureModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class ProwlerThreatScoreGCP(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreGCPModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class ProwlerThreatScoreKubernetes(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreKubernetesModel(
Provider=finding.provider,
@@ -37,10 +37,9 @@ class ProwlerThreatScoreM365(ComplianceOutput):
- None
"""
for finding in findings:
# Get the compliance requirements for the finding
finding_requirements = finding.compliance.get(compliance_name, [])
for requirement in compliance.Requirements:
if requirement.Id in finding_requirements:
# Source of truth: framework JSON, not finding.compliance snapshot (avoids CSV/UI count drift).
if finding.check_id in requirement.Checks:
for attribute in requirement.Attributes:
compliance_row = ProwlerThreatScoreM365Model(
Provider=finding.provider,
@@ -482,6 +482,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -506,7 +507,9 @@
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -1231,6 +1234,21 @@
"aws-us-gov": []
}
},
"aws-devops-agent": {
"regions": {
"aws": [
"ap-northeast-1",
"ap-southeast-2",
"eu-central-1",
"eu-west-1",
"us-east-1",
"us-west-2"
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": []
}
},
"awshealthdashboard": {
"regions": {
"aws": [
@@ -1590,6 +1608,7 @@
"ap-southeast-2",
"ca-central-1",
"eu-central-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"us-east-1",
@@ -2193,6 +2212,7 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -2255,6 +2275,8 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -2444,6 +2466,9 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-central-2",
@@ -2589,7 +2614,9 @@
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -2796,6 +2823,7 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -2806,6 +2834,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -3767,6 +3796,7 @@
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -3829,7 +3859,9 @@
"us-west-2"
],
"aws-cn": [],
"aws-eusc": [],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
@@ -3890,17 +3922,22 @@
"dsql": {
"regions": {
"aws": [
"ap-east-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-4",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-north-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -4681,15 +4718,19 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
@@ -4703,6 +4744,7 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -5260,6 +5302,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5310,6 +5353,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5360,6 +5404,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5410,6 +5455,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -5460,6 +5506,7 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
@@ -6034,6 +6081,7 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -6081,6 +6129,7 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -7681,6 +7730,8 @@
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-north-1",
@@ -8065,6 +8116,7 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
@@ -8075,8 +8127,10 @@
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -8088,6 +8142,7 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -8279,22 +8334,31 @@
"aws": [
"af-south-1",
"ap-east-1",
"ap-east-2",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
"eu-south-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"il-central-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -8315,6 +8379,7 @@
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
@@ -8574,6 +8639,7 @@
"il-central-1",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -8706,13 +8772,19 @@
"aws": [
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-south-1",
"ap-south-2",
"ap-southeast-2",
"ap-southeast-4",
"ca-central-1",
"eu-central-1",
"eu-central-2",
"eu-south-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -9034,6 +9106,7 @@
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
@@ -9136,11 +9209,14 @@
"regions": {
"aws": [
"ap-northeast-1",
"ap-northeast-3",
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-south-2",
"eu-west-1",
"eu-west-2",
"eu-west-3",
@@ -9377,6 +9453,7 @@
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ap-southeast-7",
"ca-central-1",
"eu-central-1",
"eu-central-2",
@@ -9395,7 +9472,9 @@
"aws-cn": [
"cn-northwest-1"
],
"aws-eusc": [],
"aws-eusc": [
"eusc-de-east-1"
],
"aws-us-gov": [
"us-gov-west-1"
]
@@ -9865,10 +9944,12 @@
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-5",
"ap-southeast-6",
"ap-southeast-7",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -10012,7 +10093,10 @@
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": []
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
]
}
},
"resource-groups": {
@@ -10693,7 +10777,10 @@
"us-west-1",
"us-west-2"
],
"aws-cn": [],
"aws-cn": [
"cn-north-1",
"cn-northwest-1"
],
"aws-eusc": [],
"aws-us-gov": []
}
@@ -11315,7 +11402,9 @@
"ap-southeast-2",
"ap-southeast-3",
"ap-southeast-4",
"ap-southeast-6",
"ca-central-1",
"ca-west-1",
"eu-central-1",
"eu-central-2",
"eu-north-1",
@@ -11647,26 +11736,6 @@
]
}
},
"simspaceweaver": {
"regions": {
"aws": [
"ap-southeast-1",
"ap-southeast-2",
"eu-central-1",
"eu-north-1",
"eu-west-1",
"us-east-1",
"us-east-2",
"us-west-2"
],
"aws-cn": [],
"aws-eusc": [],
"aws-us-gov": [
"us-gov-east-1",
"us-gov-west-1"
]
}
},
"sms": {
"regions": {
"aws": [
@@ -13063,6 +13132,7 @@
"eu-west-3",
"me-central-1",
"me-south-1",
"mx-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
@@ -13414,6 +13484,7 @@
"ap-south-1",
"ap-southeast-1",
"ap-southeast-2",
"ap-southeast-5",
"ca-central-1",
"eu-central-1",
"eu-west-1",
@@ -13422,6 +13493,7 @@
"il-central-1",
"sa-east-1",
"us-east-1",
"us-east-2",
"us-west-2"
],
"aws-cn": [
+1 -1
View File
@@ -949,7 +949,7 @@ class AzureProvider(Provider):
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
asyncio.get_event_loop().run_until_complete(get_azure_identity())
asyncio.run(get_azure_identity())
# Managed identities only can be assigned resource, resource group and subscription scope permissions
elif managed_identity_auth:
@@ -0,0 +1,37 @@
{
"Provider": "azure",
"CheckID": "storage_account_public_network_access_disabled",
"CheckTitle": "Storage account has 'Public Network Access' disabled",
"CheckType": [],
"ServiceName": "storage",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "microsoft.storage/storageaccounts",
"ResourceGroup": "storage",
"Description": "**Azure Storage accounts** with **public network access** disabled cannot be reached from public networks. Setting `publicNetworkAccess` to `Disabled` overrides the public access settings of individual containers and forces access through private endpoints or trusted services. This is independent from the 'Allow Blob Anonymous Access' setting.",
"Risk": "Leaving **public network access** enabled exposes the storage account endpoints to the **public Internet**, widening the attack surface and undermining **defense in depth**.\n\nThis increases the risk of **unauthorized access**, **data exfiltration**, and reconnaissance against the account, especially when combined with weak network rules or overly permissive access policies.",
"RelatedUrl": "",
"AdditionalURLs": [
"https://learn.microsoft.com/en-us/azure/storage/common/storage-network-security?tabs=azure-portal#change-the-default-network-access-rule",
"https://learn.microsoft.com/en-us/azure/storage/common/storage-network-security-set-default-access"
],
"Remediation": {
"Code": {
"CLI": "az storage account update --name <storage-account> --resource-group <resource-group> --public-network-access Disabled",
"NativeIaC": "```bicep\n// Storage account with public network access disabled\nresource sa 'Microsoft.Storage/storageAccounts@2023-01-01' = {\n name: '<example_resource_name>'\n location: resourceGroup().location\n kind: 'StorageV2'\n sku: { name: 'Standard_LRS' }\n properties: {\n publicNetworkAccess: 'Disabled' // Critical: disables public network access to the account\n }\n}\n```",
"Other": "1. In the Azure portal, go to Storage accounts and select the target account\n2. Under Security + networking, click Networking\n3. Set Public network access to Disabled\n4. Click Save",
"Terraform": "```hcl\nresource \"azurerm_storage_account\" \"<example_resource_name>\" {\n name = \"<example_resource_name>\"\n resource_group_name = \"<example_resource_name>\"\n location = \"<example_location>\"\n account_tier = \"Standard\"\n account_replication_type = \"LRS\"\n public_network_access_enabled = false # Critical: disables public network access\n}\n```"
},
"Recommendation": {
"Text": "Disable **public network access** on the storage account and reach it through **private endpoints** or trusted Azure services only. Combine this with **least privilege** RBAC, short-lived `SAS` tokens, and network restrictions. Validate client connectivity before disabling public access in production.",
"Url": "https://hub.prowler.com/check/storage_account_public_network_access_disabled"
}
},
"Categories": [
"internet-exposed"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check evaluates the storage account publicNetworkAccess property. It is independent from the 'Allow Blob Anonymous Access' setting evaluated by storage_blob_public_access_level_is_disabled."
}
@@ -0,0 +1,38 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.storage.storage_client import storage_client
class storage_account_public_network_access_disabled(Check):
"""
Ensure that 'Public Network Access' is 'Disabled' for storage accounts.
This check evaluates the storage account's publicNetworkAccess property, which controls
whether the account is reachable from public networks. It is independent from the
'Allow Blob Anonymous Access' setting (covered by
storage_blob_public_access_level_is_disabled).
- PASS: The storage account has public network access disabled.
- FAIL: The storage account has public network access enabled (or unset, which Azure treats as enabled).
"""
def execute(self) -> list[Check_Report_Azure]:
findings = []
for subscription, storage_accounts in storage_client.storage_accounts.items():
subscription_name = storage_client.subscriptions.get(
subscription, subscription
)
for storage_account in storage_accounts:
report = Check_Report_Azure(
metadata=self.metadata(), resource=storage_account
)
report.subscription = subscription
if storage_account.public_network_access == "Disabled":
report.status = "PASS"
report.status_extended = f"Storage account {storage_account.name} from subscription {subscription_name} ({subscription}) has public network access disabled."
else:
report.status = "FAIL"
report.status_extended = f"Storage account {storage_account.name} from subscription {subscription_name} ({subscription}) has public network access enabled."
findings.append(report)
return findings
@@ -1,7 +1,7 @@
{
"Provider": "azure",
"CheckID": "storage_blob_public_access_level_is_disabled",
"CheckTitle": "Storage account has 'Allow blob public access' disabled",
"CheckTitle": "Storage account has 'Allow Blob Anonymous Access' disabled",
"CheckType": [],
"ServiceName": "storage",
"SubServiceName": "",
@@ -9,7 +9,7 @@
"Severity": "high",
"ResourceType": "microsoft.storage/storageaccounts",
"ResourceGroup": "storage",
"Description": "**Azure Storage accounts** with **blob public access** disabled prevent containers or blobs from being set to a public access level. Setting `allow blob public access` to `false` enforces no anonymous reads across the account.",
"Description": "**Azure Storage accounts** with **blob anonymous (public) access** disabled prevent containers or blobs from being set to a public access level. Setting `allowBlobPublicAccess` to `false` enforces no anonymous reads across the account. This is independent from the account's 'Public Network Access' setting, which is evaluated by storage_account_public_network_access_disabled.",
"Risk": "Allowing public access permits unauthenticated users to read blob data or enumerate container contents when any container is made public, compromising confidentiality.\n\nExposed objects can be scraped at scale, enabling data exfiltration and intelligence gathering without audit attribution.",
"RelatedUrl": "",
"AdditionalURLs": [
@@ -33,5 +33,5 @@
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
"Notes": "This check evaluates the 'Allow Blob Anonymous Access' (allowBlobPublicAccess) setting. The account's 'Public Network Access' (publicNetworkAccess) setting is evaluated by storage_account_public_network_access_disabled."
}
@@ -42,6 +42,9 @@ class Storage(AzureService):
enable_https_traffic_only=storage_account.enable_https_traffic_only,
infrastructure_encryption=storage_account.encryption.require_infrastructure_encryption,
allow_blob_public_access=storage_account.allow_blob_public_access,
public_network_access=getattr(
storage_account, "public_network_access", None
),
network_rule_set=NetworkRuleSet(
bypass=getattr(
storage_account.network_rule_set,
@@ -301,6 +304,7 @@ class Account(BaseModel):
enable_https_traffic_only: bool
infrastructure_encryption: Optional[bool] = None
allow_blob_public_access: bool
public_network_access: Optional[str] = None
network_rule_set: NetworkRuleSet
encryption_type: str
minimum_tls_version: str
@@ -34,5 +34,5 @@
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "This check passes if SMB channel encryption is set to a secure algorithm."
"Notes": "This check passes only if every SMB channel encryption algorithm allowed on the file shares is in the recommended list, which is configurable via azure.recommended_smb_channel_encryption_algorithms and defaults to AES-256-GCM only, as required by CIS."
}
@@ -1,32 +1,38 @@
from prowler.lib.check.models import Check, Check_Report_Azure
from prowler.providers.azure.services.storage.storage_client import storage_client
SECURE_ENCRYPTION_ALGORITHMS = ["AES-256-GCM"]
DEFAULT_SECURE_ENCRYPTION_ALGORITHMS = ["AES-256-GCM"]
class storage_smb_channel_encryption_with_secure_algorithm(Check):
"""
Ensure SMB channel encryption for file shares is set to the recommended algorithm (AES-256-GCM or higher).
Ensure SMB channel encryption for file shares only allows secure algorithms (AES-256-GCM or higher by default).
The list of allowed algorithms is configurable via
azure.recommended_smb_channel_encryption_algorithms in the Prowler configuration file.
This check evaluates whether SMB file shares are configured to use only the recommended SMB channel encryption algorithms.
- PASS: Storage account has the recommended SMB channel encryption (AES-256-GCM or higher) enabled for file shares.
- FAIL: Storage account does not have the recommended SMB channel encryption enabled for file shares or uses an unsupported algorithm.
- PASS: Storage account only allows secure SMB channel encryption algorithms for file shares.
- FAIL: Storage account does not have SMB channel encryption enabled, or it allows at least one algorithm that is not in the recommended list.
"""
def execute(self) -> list[Check_Report_Azure]:
findings = []
secure_encryption_algorithms = storage_client.audit_config.get(
"recommended_smb_channel_encryption_algorithms",
DEFAULT_SECURE_ENCRYPTION_ALGORITHMS,
)
for subscription, storage_accounts in storage_client.storage_accounts.items():
subscription_name = storage_client.subscriptions.get(
subscription, subscription
)
for account in storage_accounts:
if account.file_service_properties:
channel_encryption = (
account.file_service_properties.smb_protocol_settings.channel_encryption
)
pretty_current_algorithms = (
", ".join(
account.file_service_properties.smb_protocol_settings.channel_encryption
)
if account.file_service_properties.smb_protocol_settings.channel_encryption
else "none"
", ".join(channel_encryption) if channel_encryption else "none"
)
report = Check_Report_Azure(
metadata=self.metadata(),
@@ -35,20 +41,18 @@ class storage_smb_channel_encryption_with_secure_algorithm(Check):
report.subscription = subscription
report.resource_name = account.name
if (
not account.file_service_properties.smb_protocol_settings.channel_encryption
):
if not channel_encryption:
report.status = "FAIL"
report.status_extended = f"Storage account {account.name} from subscription {subscription_name} ({subscription}) does not have SMB channel encryption enabled for file shares."
elif any(
algorithm in SECURE_ENCRYPTION_ALGORITHMS
for algorithm in account.file_service_properties.smb_protocol_settings.channel_encryption
elif all(
algorithm in secure_encryption_algorithms
for algorithm in channel_encryption
):
report.status = "PASS"
report.status_extended = f"Storage account {account.name} from subscription {subscription_name} ({subscription}) has a secure algorithm for SMB channel encryption ({', '.join(SECURE_ENCRYPTION_ALGORITHMS)}) enabled for file shares since it supports {pretty_current_algorithms}."
report.status_extended = f"Storage account {account.name} from subscription {subscription_name} ({subscription}) only allows secure algorithms for SMB channel encryption on file shares since it supports {pretty_current_algorithms}."
else:
report.status = "FAIL"
report.status_extended = f"Storage account {account.name} from subscription {subscription_name} ({subscription}) does not have SMB channel encryption with a secure algorithm for file shares since it supports {pretty_current_algorithms}."
report.status_extended = f"Storage account {account.name} from subscription {subscription_name} ({subscription}) allows insecure algorithms for SMB channel encryption on file shares since it supports {pretty_current_algorithms} and only {', '.join(secure_encryption_algorithms)} is recommended."
findings.append(report)
return findings
@@ -87,9 +87,15 @@ class Compute(GCPService):
.execute(num_retries=DEFAULT_RETRY_ATTEMPTS)
)
for item in response["commonInstanceMetadata"].get("items", []):
if item["key"] == "enable-oslogin" and item["value"] == "TRUE":
if (
item["key"] == "enable-oslogin"
and item["value"].lower() == "true"
):
enable_oslogin = True
if item["key"] == "enable-oslogin-2fa" and item["value"] == "TRUE":
if (
item["key"] == "enable-oslogin-2fa"
and item["value"].lower() == "true"
):
enable_oslogin_2fa = True
self.compute_projects.append(
Project(
+3 -7
View File
@@ -1073,7 +1073,7 @@ class M365Provider(Provider):
organization_info = await client.organization.get()
identity.tenant_id = organization_info.value[0].id
asyncio.get_event_loop().run_until_complete(get_m365_identity(identity))
asyncio.run(get_m365_identity(identity))
return identity
@staticmethod
@@ -1261,9 +1261,7 @@ class M365Provider(Provider):
result = await client.domains.get()
return result.value
result = asyncio.get_event_loop().run_until_complete(
verify_certificate()
)
result = asyncio.run(verify_certificate())
if not result:
raise M365NotValidCertificateContentError(
file=os.path.basename(__file__),
@@ -1284,9 +1282,7 @@ class M365Provider(Provider):
result = await client.domains.get()
return result.value
result = asyncio.get_event_loop().run_until_complete(
verify_certificate()
)
result = asyncio.run(verify_certificate())
if not result:
raise M365NotValidCertificatePathError(
file=os.path.basename(__file__),
@@ -35,7 +35,7 @@ def init_parser(self):
nargs="+",
help=(
"OAuth scopes to request, space-separated "
"(e.g. okta.policies.read okta.brands.read okta.users.read). "
"(e.g. okta.policies.read okta.brands.read okta.apps.read). "
"Defaults to the read scopes required by the bundled checks."
),
default=None,
+1 -1
View File
@@ -32,7 +32,7 @@ from prowler.providers.okta.exceptions.exceptions import (
from prowler.providers.okta.lib.mutelist.mutelist import OktaMutelist
from prowler.providers.okta.models import OktaIdentityInfo, OktaSession
DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read"]
DEFAULT_SCOPES = ["okta.policies.read", "okta.brands.read", "okta.apps.read"]
# Accept only Okta-managed domains. Custom (vanity) domains are rejected on
# purpose — they're a recurring source of typos and silent misconfig and
# Prowler's audience overwhelmingly uses Okta-managed hosts. The TLDs below
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "application_admin_console_mfa_required",
"CheckTitle": "Okta Admin Console authentication policy enforces multifactor authentication",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Admin Console** app must require MFA. On its top active rule, *User must authenticate with* must be set to `Password / IdP + Another factor` or `Any 2 factor types` (`factorMode=2FA` in the API).",
"Risk": "Single-factor access to the Okta control plane is the highest-impact identity risk in the tenant.\n\n- **Credential compromise** is enough to take over every administrator account\n- **Lateral movement** into every downstream SaaS that trusts Okta SSO\n- **Privileged configuration changes** with no second-factor barrier",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/policies/about-app-sign-on-policies.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Admin Console** policy.\n4. Edit the top active rule.\n5. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require MFA on the top active rule of the Okta Admin Console authentication policy. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.",
"Url": "https://hub.prowler.com/check/application_admin_console_mfa_required"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273193 / OKTA-APP-000560."
}
@@ -0,0 +1,89 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
ADMIN_CONSOLE_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
ADMIN_CONSOLE_LABEL_HINT = "Okta Admin Console"
class application_admin_console_mfa_required(Check):
"""STIG V-273193 / OKTA-APP-000560.
The Authentication Policy bound to the Okta Admin Console app must
require multifactor authentication on its top rule: `User must
authenticate with` set to `Password / IdP + Another factor` or
`Any 2 factor types`.
The underlying SDK exposes this as `AssuranceMethod.factor_mode`
with values `1FA` / `2FA`.
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
ADMIN_CONSOLE_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(ADMIN_CONSOLE_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(
self.metadata(), org_domain, ADMIN_CONSOLE_LABEL_HINT
)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must set `User must authenticate with` to "
"`Password / IdP + Another factor` or `Any 2 factor types`."
)
elif rule.factor_mode == "2FA":
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"multifactor authentication (`factorMode=2FA`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
f"enforce multifactor authentication "
f"(`factorMode={rule.factor_mode or 'unset'}`). "
"Set `User must authenticate with` to `Password / IdP + Another "
"factor` or `Any 2 factor types`."
)
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "application_admin_console_phishing_resistant_authentication",
"CheckTitle": "Okta Admin Console authentication policy enforces phishing-resistant factors",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Admin Console** app must restrict possession factors to phishing-resistant authenticators (FIDO2/WebAuthn, PIV/CAC, Okta FastPass with biometrics). On the top active rule, *Possession factor constraints are: Phishing resistant* must be checked (`possession.phishingResistant=REQUIRED`).",
"Risk": "Phishable possession factors (SMS, voice, standard push, OTP delivered via reverse-proxy AiTM) leave the most privileged surface of the IdP exposed.\n\n- **Credential phishing** against administrators succeeds despite MFA\n- **Adversary-in-the-Middle attacks** capture session tokens through fake login pages\n- **Account takeover** of the tenant control plane",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/authenticators/phishing-resistant-auth.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Admin Console** policy.\n4. Edit the top active rule.\n5. Under *Possession factor constraints are*, check **Phishing resistant**.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require phishing-resistant possession factors on the top active rule of the Okta Admin Console authentication policy.",
"Url": "https://hub.prowler.com/check/application_admin_console_phishing_resistant_authentication"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273191 / OKTA-APP-000190."
}
@@ -0,0 +1,88 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
ADMIN_CONSOLE_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
ADMIN_CONSOLE_LABEL_HINT = "Okta Admin Console"
class application_admin_console_phishing_resistant_authentication(Check):
"""STIG V-273191 / OKTA-APP-000190.
The Authentication Policy bound to the Okta Admin Console app must
restrict possession factors to phishing-resistant authenticators.
The underlying SDK exposes `phishingResistant` on each
`PossessionConstraint`; at least one constraint object on the top
rule must set `phishingResistant=REQUIRED` (constraints are OR-ed
by Okta semantics).
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
ADMIN_CONSOLE_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(ADMIN_CONSOLE_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(
self.metadata(), org_domain, ADMIN_CONSOLE_LABEL_HINT
)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must mark "
"`Possession factor constraints are: Phishing resistant`."
)
elif rule.possession_phishing_resistant_required:
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"phishing-resistant possession factors "
"(`possession.phishingResistant=REQUIRED`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
"enforce phishing-resistant possession factors. Enable "
"`Possession factor constraints are: Phishing resistant` "
"on the rule."
)
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "application_admin_console_session_idle_timeout_15min",
"CheckTitle": "Okta Admin Console app session idle timeout is 15 minutes or less",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The integrated **Okta Admin Console** app must close idle privileged sessions. *Maximum app session idle time* on the **Sign On** tab must be `15` minutes or less.\n\nThreshold override: `okta_admin_console_idle_timeout_max_minutes`.",
"Risk": "An unattended administrator workstation leaves the Okta control plane open for session hijacking.\n\n- **Privileged session takeover** by anyone with physical or remote access to the workstation\n- **Tenant-wide configuration changes** under the absent administrator's identity\n- **Bypassed reauthentication** for the most sensitive surface of the IdP",
"RelatedUrl": "",
"AdditionalURLs": [
"https://developer.okta.com/docs/guides/configure-signon-policy/main/",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/OktaApplicationSettings/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Applications** > **Applications** > **Okta Admin Console**.\n3. Open the **Sign On** tab.\n4. Under **Okta Admin Console session**, set *Maximum app session idle time* to `15` minutes or less.\n5. Save the changes.",
"Terraform": ""
},
"Recommendation": {
"Text": "Set the *Maximum app session idle time* of the Okta Admin Console first-party app to `15` minutes or less so privileged administrator sessions terminate on inactivity.",
"Url": "https://hub.prowler.com/check/application_admin_console_session_idle_timeout_15min"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273187 / OKTA-APP-000025."
}
@@ -0,0 +1,89 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
AdminConsoleAppSettings,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
missing_admin_console_settings_scope_finding,
)
DEFAULT_THRESHOLD_MINUTES = 15
class application_admin_console_session_idle_timeout_15min(Check):
"""STIG V-273187 / OKTA-APP-000025.
The Okta Admin Console first-party app must set its
`Maximum app session idle time` to 15 minutes (or less) so privileged
administrator sessions terminate on inactivity. Threshold override:
`okta_admin_console_idle_timeout_max_minutes` in the audit config.
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
audit_config = application_client.audit_config or {}
threshold = audit_config.get(
"okta_admin_console_idle_timeout_max_minutes",
DEFAULT_THRESHOLD_MINUTES,
)
org_domain = application_client.provider.identity.org_domain
missing_scope = application_client.missing_scope.get(
"admin_console_app_settings"
)
if missing_scope:
findings.append(
missing_admin_console_settings_scope_finding(
self.metadata(), org_domain, missing_scope
)
)
return findings
settings = application_client.admin_console_app_settings
if settings is None:
placeholder = AdminConsoleAppSettings()
report = CheckReportOkta(
metadata=self.metadata(), resource=placeholder, org_domain=org_domain
)
report.status = "MANUAL"
report.status_extended = (
"Could not retrieve the Okta Admin Console first-party app "
"settings. Okta restricts `GET /api/v1/first-party-app-settings/"
"admin-console` to the Super Administrator role; every other "
"role — including Read-Only Administrator — receives "
"`403 E0000006`. Assign Super Administrator to the service "
f"app to evaluate this check. The `Maximum app session idle "
f"time` must be set to {threshold} minutes or less."
)
findings.append(report)
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=settings, org_domain=org_domain
)
idle = settings.session_idle_timeout_minutes
if idle is None:
report.status = "FAIL"
report.status_extended = (
"The Okta Admin Console first-party app does not define a "
"`Maximum app session idle time`. This value must be "
f"{threshold} minutes or less."
)
elif idle <= threshold:
report.status = "PASS"
report.status_extended = (
"The Okta Admin Console first-party app sets the maximum "
f"app session idle time to {idle} minutes, meeting the "
f"configured threshold of {threshold} minutes."
)
else:
report.status = "FAIL"
report.status_extended = (
"The Okta Admin Console first-party app sets the maximum "
f"app session idle time to {idle} minutes, exceeding the "
f"configured threshold of {threshold} minutes."
)
findings.append(report)
return findings
@@ -0,0 +1,38 @@
{
"Provider": "okta",
"CheckID": "application_authentication_policy_network_zone_enforced",
"CheckTitle": "Okta application authentication policies enforce Network Zones",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "Every active Okta application must be bound to an **Authentication Policy** that uses **Network Zones**. Each active non-default rule must map *User's IP* to `In zone` or `Not in zone`, and the active built-in *Catch-all Rule* must set *Access is* to `Denied`.",
"Risk": "Applications without network-aware authentication rules can be reached from unauthorized locations and bypass location-based access controls.\n\n- **Unauthorized access paths** from unmanaged or blocked networks\n- **Inconsistent information-flow enforcement** across SSO applications\n- **Residual access** when the fallback rule still allows traffic after policy misses",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/policies/about-app-sign-on-policies.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Application/",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Networks** and define the allow-list and deny-list zones required by policy.\n3. For each active application, open **Applications** > **Applications** > *Application* > **Sign On**.\n4. In **User Authentication**, bind the appropriate **Authentication Policy** and open **View Policy Details**.\n5. For each active non-default rule, set *User's IP* to `In zone` or `Not in zone` and select the correct **Network Zone**.\n6. Edit the built-in **Catch-all Rule** and set *Access is* to `Denied`.\n7. Save the policy.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require every active application Authentication Policy to use Network Zones on each active non-default rule and to deny access on the Catch-all Rule.",
"Url": "https://hub.prowler.com/check/application_authentication_policy_network_zone_enforced"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-279693 / OKTA-APP-003244."
}
@@ -0,0 +1,151 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
AuthenticationPolicyRule,
OktaBuiltInApp,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
active_apps,
app_label,
missing_integrated_apps_scope_finding,
no_active_apps_finding,
rule_has_network_zone,
rule_label,
)
class application_authentication_policy_network_zone_enforced(Check):
"""STIG V-279693 / OKTA-APP-003244.
Every active Okta application must be bound to an Authentication
Policy that uses Network Zones. Each active non-default rule must map
`User's IP` to an allow/deny zone, and the active Catch-all Rule
must deny access.
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("integrated_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_integrated_apps_scope_finding(
self.metadata(),
org_domain,
missing_scope,
)
)
return findings
apps = active_apps(application_client.integrated_apps)
if not apps:
findings.append(no_active_apps_finding(self.metadata(), org_domain))
return findings
for app in apps:
report = CheckReportOkta(
metadata=self.metadata(),
resource=app,
org_domain=org_domain,
resource_name=app.label or app.name,
resource_id=app.id,
)
status, status_extended = _evaluate_app(app)
report.status = status
report.status_extended = status_extended
findings.append(report)
return findings
def _active_rules(app: OktaBuiltInApp) -> list[AuthenticationPolicyRule]:
if app.access_policy is None:
return []
return sorted(
[
rule
for rule in app.access_policy.rules
if not rule.status or rule.status.upper() == "ACTIVE"
],
key=lambda rule: (
rule.priority if rule.priority is not None else float("inf"),
rule.name,
),
)
def _evaluate_app(app: OktaBuiltInApp) -> tuple[str, str]:
label = app_label(app)
if app.access_policy_id is None or app.access_policy is None:
return (
"FAIL",
f"{label} has no Authentication Policy bound to it. "
"Bind an Access Policy in Security > Authentication Policies.",
)
active_rules = _active_rules(app)
if not active_rules:
return (
"FAIL",
f"{label} has no active rules on its Authentication Policy. "
"Every active non-default rule must enforce a Network Zone "
"condition, and the Catch-all Rule must set `Access is: Denied`.",
)
nondefault_rules = [
rule
for rule in active_rules
if not rule.is_default and rule.name != "Catch-all Rule"
]
if not nondefault_rules:
return (
"FAIL",
f"{label} has no active non-default rules on its Authentication "
"Policy. Define at least one non-default rule that maps `User's "
"IP` to a Network Zone, and use the Catch-all Rule only as the "
"final deny path.",
)
missing_zone_rules = [
rule.name for rule in nondefault_rules if not rule_has_network_zone(rule)
]
if missing_zone_rules:
quoted_rules = ", ".join(f"'{rule_name}'" for rule_name in missing_zone_rules)
return (
"FAIL",
f"{label} has active non-default rule(s) without Network Zones: "
f"{quoted_rules}. Configure `User's IP` to `In zone` or `Not in zone` "
"for every active non-default rule.",
)
catch_all_rule = next(
(
rule
for rule in active_rules
if rule.is_default or rule.name == "Catch-all Rule"
),
None,
)
if catch_all_rule is None:
return (
"FAIL",
f"{label} has no active Catch-all Rule. The Catch-all Rule must "
"deny access after the zoned non-default rules.",
)
if catch_all_rule.access != "DENY":
return (
"FAIL",
f"Active {rule_label(catch_all_rule)} on {label} does not set "
f"`Access is` to `DENY` (`access={catch_all_rule.access or 'unset'}`). "
"Set the Catch-all Rule to deny access.",
)
return (
"PASS",
f"{label} applies Network Zones on every active non-default rule and "
f"its active {rule_label(catch_all_rule)} denies access.",
)
@@ -0,0 +1,4 @@
from prowler.providers.common.provider import Provider
from prowler.providers.okta.services.application.application_service import Application
application_client = Application(Provider.get_global_provider())
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "application_dashboard_mfa_required",
"CheckTitle": "Okta Dashboard authentication policy enforces multifactor authentication",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "high",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Dashboard** app must require MFA for end users. On its top active rule, *User must authenticate with* must be set to `Password / IdP + Another factor` or `Any 2 factor types` (`factorMode=2FA` in the API).",
"Risk": "Single-factor access to the Okta Dashboard lets an attacker pivot from one compromised password into every downstream SSO app.\n\n- **Credential stuffing** and password reuse attacks succeed in one step\n- **Lateral movement** into every SaaS the user has access to via Okta SSO\n- **Weakened identity assurance** for every user signing in to the end-user portal",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/policies/about-app-sign-on-policies.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Dashboard** policy.\n4. Edit the top active rule.\n5. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require MFA on the top active rule of the Okta Dashboard authentication policy. Set *User must authenticate with* to `Password / IdP + Another factor` or `Any 2 factor types`.",
"Url": "https://hub.prowler.com/check/application_dashboard_mfa_required"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273194 / OKTA-APP-000570."
}
@@ -0,0 +1,85 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
DASHBOARD_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
DASHBOARD_LABEL_HINT = "Okta Dashboard"
class application_dashboard_mfa_required(Check):
"""STIG V-273194 / OKTA-APP-000570.
The Authentication Policy bound to the Okta Dashboard app must
require multifactor authentication on its top rule for
non-privileged users: `User must authenticate with` set to
`Password / IdP + Another factor` or `Any 2 factor types`
(`AssuranceMethod.factor_mode == "2FA"`).
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
DASHBOARD_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(DASHBOARD_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(self.metadata(), org_domain, DASHBOARD_LABEL_HINT)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must set `User must authenticate with` to "
"`Password / IdP + Another factor` or `Any 2 factor types`."
)
elif rule.factor_mode == "2FA":
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"multifactor authentication (`factorMode=2FA`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
f"enforce multifactor authentication "
f"(`factorMode={rule.factor_mode or 'unset'}`). "
"Set `User must authenticate with` to `Password / IdP + Another "
"factor` or `Any 2 factor types`."
)
findings.append(report)
return findings
@@ -0,0 +1,37 @@
{
"Provider": "okta",
"CheckID": "application_dashboard_phishing_resistant_authentication",
"CheckTitle": "Okta Dashboard authentication policy enforces phishing-resistant factors",
"CheckType": [],
"ServiceName": "application",
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "medium",
"ResourceType": "NotDefined",
"ResourceGroup": "governance",
"Description": "The **Authentication Policy** bound to the **Okta Dashboard** app must restrict possession factors to phishing-resistant authenticators. On the top active rule, *Possession factor constraints are: Phishing resistant* must be checked (`possession.phishingResistant=REQUIRED`).",
"Risk": "Phishable possession factors leave end-user SSO sessions exposed to credential phishing and AiTM proxies.\n\n- **Credential phishing** against end users succeeds despite MFA\n- **Session token theft** through reverse-proxy AiTM attacks (Evilginx-class tooling)\n- **Compromise of every downstream SSO app** the user has access to",
"RelatedUrl": "",
"AdditionalURLs": [
"https://help.okta.com/oie/en-us/content/topics/identity-engine/authenticators/phishing-resistant-auth.htm",
"https://developer.okta.com/docs/api/openapi/okta-management/management/tag/Policy/"
],
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "1. Sign in to the **Okta Admin Console** as a *Super Admin*.\n2. Navigate to **Security** > **Authentication Policies**.\n3. Open the **Okta Dashboard** policy.\n4. Edit the top active rule.\n5. Under *Possession factor constraints are*, check **Phishing resistant**.\n6. Save the rule.",
"Terraform": ""
},
"Recommendation": {
"Text": "Require phishing-resistant possession factors on the top active rule of the Okta Dashboard authentication policy.",
"Url": "https://hub.prowler.com/check/application_dashboard_phishing_resistant_authentication"
}
},
"Categories": [
"identity-access"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": "Aligns with DISA STIG V-273190 / OKTA-APP-000180."
}
@@ -0,0 +1,84 @@
from prowler.lib.check.models import Check, CheckReportOkta
from prowler.providers.okta.services.application.application_client import (
application_client,
)
from prowler.providers.okta.services.application.application_service import (
DASHBOARD_APP_NAME,
)
from prowler.providers.okta.services.application.lib.application_helpers import (
app_label,
app_not_found_finding,
missing_app_scope_finding,
policy_missing_finding,
rule_label,
top_active_rule,
)
DASHBOARD_LABEL_HINT = "Okta Dashboard"
class application_dashboard_phishing_resistant_authentication(Check):
"""STIG V-273190 / OKTA-APP-000180.
The Authentication Policy bound to the Okta Dashboard app must
restrict possession factors to phishing-resistant authenticators on
its top active rule
(`possession.phishingResistant=REQUIRED`).
"""
def execute(self) -> list[CheckReportOkta]:
findings: list[CheckReportOkta] = []
org_domain = application_client.provider.identity.org_domain
for scope_key in ("built_in_apps", "access_policies"):
missing_scope = application_client.missing_scope.get(scope_key)
if missing_scope:
findings.append(
missing_app_scope_finding(
self.metadata(),
org_domain,
missing_scope,
DASHBOARD_LABEL_HINT,
)
)
return findings
app = application_client.built_in_apps.get(DASHBOARD_APP_NAME)
if app is None:
findings.append(
app_not_found_finding(self.metadata(), org_domain, DASHBOARD_LABEL_HINT)
)
return findings
if app.access_policy_id is None or app.access_policy is None:
findings.append(policy_missing_finding(self.metadata(), org_domain, app))
return findings
report = CheckReportOkta(
metadata=self.metadata(), resource=app, org_domain=org_domain
)
rule = top_active_rule(app)
if rule is None:
report.status = "FAIL"
report.status_extended = (
f"{app_label(app)} has no active rules on its Authentication "
"Policy. The top rule must mark "
"`Possession factor constraints are: Phishing resistant`."
)
elif rule.possession_phishing_resistant_required:
report.status = "PASS"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} enforces "
"phishing-resistant possession factors "
"(`possession.phishingResistant=REQUIRED`)."
)
else:
report.status = "FAIL"
report.status_extended = (
f"Top active {rule_label(rule)} on {app_label(app)} does not "
"enforce phishing-resistant possession factors. Enable "
"`Possession factor constraints are: Phishing resistant` "
"on the rule."
)
findings.append(report)
return findings

Some files were not shown because too many files have changed in this diff Show More