Compare commits

...

13 Commits

Author SHA1 Message Date
Chandrapal Badshah ebd1ff0ea4 fix(lighthouse): update prompt and tool schema for checks tool (#8265)
Co-authored-by: Chandrapal Badshah <12944530+Chan9390@users.noreply.github.com>
(cherry picked from commit 44d70f8467)
2025-07-28 09:42:24 +00:00
Prowler Bot 231fcf98d0 fix(aws): sns_topics_not_publicly_accessible false positive with aws:SourceArn conditions (#8359)
Co-authored-by: Andoni Alonso <14891798+andoniaf@users.noreply.github.com>
Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
2025-07-24 22:30:18 +08:00
Prowler Bot c7093013f9 fix(wazuh): patch command injection vulnerability in prowler-wrapper.py (#8355)
Co-authored-by: Cole Murray <colemurray.cs@gmail.com>
Co-authored-by: Test User <test@example.com>
Co-authored-by: MrCloudSec <hello@mistercloudsec.com>
2025-07-23 16:15:30 +02:00
Prowler Bot 6e96cb0874 fix(azure/storage): handle when Azure API set values to None (#8349)
Co-authored-by: Rubén De la Torre Vico <ruben@prowler.com>
Co-authored-by: Sergio Garcia <hello@mistercloudsec.com>
2025-07-23 17:40:39 +08:00
Prowler Bot 296fa0f984 chore(release): Bump version to v5.9.3 (#8344)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2025-07-22 12:09:55 +02:00
Víctor Fernández Poyatos 9a46fca8dd chore: Update poetry locks for v5.9.2 (#8343) 2025-07-22 11:12:28 +02:00
César Arroba 66e5a03f9f chore(prowler): modify changelog for 5.9.2 release 2025-07-22 10:36:18 +02:00
Prowler Bot ccd561f0f1 feat(resources): Optimize findings prefetching during resource views (#8339)
Co-authored-by: Víctor Fernández Poyatos <victor@prowler.com>
2025-07-21 17:02:31 +02:00
Prowler Bot 9e1b78e64f fix(defender): avoid duplicated findings in check defender_domain_dkim_enabled (#8335)
Co-authored-by: Daniel Barranquero <74871504+danibarranqueroo@users.noreply.github.com>
2025-07-21 13:14:46 +02:00
Prowler Bot 536f90ced3 chore(release): Bump version to v5.9.2 (#8328)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2025-07-18 15:27:03 +02:00
Prowler Bot 5453c02fd4 fix(tasks): calculate failed findings for resources during scan (#8324)
Co-authored-by: Víctor Fernández Poyatos <victor@prowler.com>
2025-07-18 13:29:31 +02:00
Prowler Bot 230e11be8a chore(release): Bump version to v5.9.1 (#8315)
Co-authored-by: prowler-bot <179230569+prowler-bot@users.noreply.github.com>
2025-07-17 19:38:46 +02:00
César Arroba 20625954a3 chore(api): change prowler version 2025-07-17 17:53:45 +02:00
28 changed files with 4339 additions and 1667 deletions
+15 -1
View File
@@ -2,6 +2,20 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.10.2] (Prowler v5.9.2)
### Changed
- Optimized queries for resources views [(#8336)](https://github.com/prowler-cloud/prowler/pull/8336)
---
## [v1.10.1] (Prowler v5.9.1)
### Fixed
- Calculate failed findings during scans to prevent heavy database queries [(#8322)](https://github.com/prowler-cloud/prowler/pull/8322)
---
## [v1.10.0] (Prowler v5.9.0)
### Added
@@ -12,7 +26,7 @@ All notable changes to the **Prowler API** are documented in this file.
- `/processors` endpoints to post-process findings. Currently, only the Mutelist processor is supported to allow to mute findings.
- Optimized the underlying queries for resources endpoints [(#8112)](https://github.com/prowler-cloud/prowler/pull/8112)
- Optimized include parameters for resources view [(#8229)](https://github.com/prowler-cloud/prowler/pull/8229)
- Optimized overview background tasks [(#8300)](https://github.com/prowler-cloud/prowler/pull/8300)
- Optimized overview background tasks [(#8300)](https://github.com/prowler-cloud/prowler/pull/8300)
### Fixed
- Search filter for findings and resources [(#8112)](https://github.com/prowler-cloud/prowler/pull/8112)
+2336 -1038
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -24,7 +24,7 @@ dependencies = [
"drf-spectacular-jsonapi==0.5.1",
"gunicorn==23.0.0",
"lxml==5.3.2",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@master",
"prowler @ git+https://github.com/prowler-cloud/prowler.git@v5.9",
"psycopg2-binary==2.9.9",
"pytest-celery[redis] (>=1.0.1,<2.0.0)",
"sentry-sdk[django] (>=2.20.0,<3.0.0)",
@@ -38,7 +38,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.10.0"
version = "1.10.2"
[project.scripts]
celery = "src.backend.config.settings.celery"
@@ -0,0 +1,30 @@
from functools import partial
from django.db import migrations
from api.db_utils import create_index_on_partitions, drop_index_on_partitions
class Migration(migrations.Migration):
atomic = False
dependencies = [
("api", "0039_resource_resources_failed_findings_idx"),
]
operations = [
migrations.RunPython(
partial(
create_index_on_partitions,
parent_table="resource_finding_mappings",
index_name="rfm_tenant_resource_idx",
columns="tenant_id, resource_id",
method="BTREE",
),
reverse_code=partial(
drop_index_on_partitions,
parent_table="resource_finding_mappings",
index_name="rfm_tenant_resource_idx",
),
),
]
@@ -0,0 +1,17 @@
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("api", "0040_rfm_tenant_resource_index_partitions"),
]
operations = [
migrations.AddIndex(
model_name="resourcefindingmapping",
index=models.Index(
fields=["tenant_id", "resource_id"],
name="rfm_tenant_resource_idx",
),
),
]
@@ -0,0 +1,23 @@
from django.contrib.postgres.operations import AddIndexConcurrently
from django.db import migrations, models
class Migration(migrations.Migration):
atomic = False
dependencies = [
("api", "0041_rfm_tenant_resource_parent_partitions"),
("django_celery_beat", "0019_alter_periodictasks_options"),
]
operations = [
AddIndexConcurrently(
model_name="scan",
index=models.Index(
condition=models.Q(("state", "completed")),
fields=["tenant_id", "provider_id", "-inserted_at"],
include=("id",),
name="scans_prov_ins_desc_idx",
),
),
]
+11
View File
@@ -476,6 +476,13 @@ class Scan(RowLevelSecurityProtectedModel):
condition=Q(state=StateChoices.COMPLETED),
name="scans_prov_state_ins_desc_idx",
),
# TODO This might replace `scans_prov_state_ins_desc_idx` completely. Review usage
models.Index(
fields=["tenant_id", "provider_id", "-inserted_at"],
condition=Q(state=StateChoices.COMPLETED),
include=["id"],
name="scans_prov_ins_desc_idx",
),
]
class JSONAPIMeta:
@@ -860,6 +867,10 @@ class ResourceFindingMapping(PostgresPartitionedModel, RowLevelSecurityProtected
fields=["tenant_id", "finding_id"],
name="rfm_tenant_finding_idx",
),
models.Index(
fields=["tenant_id", "resource_id"],
name="rfm_tenant_resource_idx",
),
]
constraints = [
models.UniqueConstraint(
+1 -1
View File
@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.10.0
version: 1.10.2
description: |-
Prowler API specification.
+2
View File
@@ -5188,6 +5188,8 @@ class TestComplianceOverviewViewSet:
assert "description" in attributes
assert "status" in attributes
# TODO: This test may fail randomly because requirements are not ordered
@pytest.mark.xfail
def test_compliance_overview_requirements_manual(
self, authenticated_client, compliance_requirements_overviews_fixture
):
+35 -8
View File
@@ -22,7 +22,7 @@ from django.conf import settings as django_settings
from django.contrib.postgres.aggregates import ArrayAgg
from django.contrib.postgres.search import SearchQuery
from django.db import transaction
from django.db.models import Count, F, Prefetch, Q, Sum
from django.db.models import Count, F, Prefetch, Q, Subquery, Sum
from django.db.models.functions import Coalesce
from django.http import HttpResponse
from django.shortcuts import redirect
@@ -292,7 +292,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.10.0"
spectacular_settings.VERSION = "1.10.2"
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)
@@ -1994,6 +1994,21 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
)
)
def _should_prefetch_findings(self) -> bool:
fields_param = self.request.query_params.get("fields[resources]", "")
include_param = self.request.query_params.get("include", "")
return (
fields_param == ""
or "findings" in fields_param.split(",")
or "findings" in include_param.split(",")
)
def _get_findings_prefetch(self):
findings_queryset = Finding.all_objects.defer("scan", "resources").filter(
tenant_id=self.request.tenant_id
)
return [Prefetch("findings", queryset=findings_queryset)]
def get_serializer_class(self):
if self.action in ["metadata", "metadata_latest"]:
return ResourceMetadataSerializer
@@ -2017,7 +2032,11 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
filtered_queryset,
manager=Resource.all_objects,
select_related=["provider"],
prefetch_related=["findings"],
prefetch_related=(
self._get_findings_prefetch()
if self._should_prefetch_findings()
else []
),
)
def retrieve(self, request, *args, **kwargs):
@@ -2042,14 +2061,18 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
tenant_id = request.tenant_id
filtered_queryset = self.filter_queryset(self.get_queryset())
latest_scan_ids = (
Scan.all_objects.filter(tenant_id=tenant_id, state=StateChoices.COMPLETED)
latest_scans = (
Scan.all_objects.filter(
tenant_id=tenant_id,
state=StateChoices.COMPLETED,
)
.order_by("provider_id", "-inserted_at")
.distinct("provider_id")
.values_list("id", flat=True)
.values("provider_id")
)
filtered_queryset = filtered_queryset.filter(
tenant_id=tenant_id, provider__scan__in=latest_scan_ids
provider_id__in=Subquery(latest_scans)
)
return self.paginate_by_pk(
@@ -2057,7 +2080,11 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
filtered_queryset,
manager=Resource.all_objects,
select_related=["provider"],
prefetch_related=["findings"],
prefetch_related=(
self._get_findings_prefetch()
if self._should_prefetch_findings()
else []
),
)
@action(detail=False, methods=["get"], url_name="metadata")
+38 -48
View File
@@ -1,11 +1,12 @@
import json
import time
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timezone
from celery.utils.log import get_task_logger
from config.settings.celery import CELERY_DEADLOCK_ATTEMPTS
from django.db import IntegrityError, OperationalError, connection
from django.db import IntegrityError, OperationalError
from django.db.models import Case, Count, IntegerField, Prefetch, Sum, When
from tasks.utils import CustomEncoder
@@ -13,7 +14,11 @@ from api.compliance import (
PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE,
generate_scan_compliance,
)
from api.db_utils import create_objects_in_batches, rls_transaction
from api.db_utils import (
create_objects_in_batches,
rls_transaction,
update_objects_in_batches,
)
from api.exceptions import ProviderConnectionError
from api.models import (
ComplianceRequirementOverview,
@@ -103,7 +108,10 @@ def _store_resources(
def perform_prowler_scan(
tenant_id: str, scan_id: str, provider_id: str, checks_to_execute: list[str] = None
tenant_id: str,
scan_id: str,
provider_id: str,
checks_to_execute: list[str] | None = None,
):
"""
Perform a scan using Prowler and store the findings and resources in the database.
@@ -175,6 +183,7 @@ def perform_prowler_scan(
resource_cache = {}
tag_cache = {}
last_status_cache = {}
resource_failed_findings_cache = defaultdict(int)
for progress, findings in prowler_scan.scan():
for finding in findings:
@@ -200,6 +209,9 @@ def perform_prowler_scan(
},
)
resource_cache[resource_uid] = resource_instance
# Initialize all processed resources in the cache
resource_failed_findings_cache[resource_uid] = 0
else:
resource_instance = resource_cache[resource_uid]
@@ -313,6 +325,11 @@ def perform_prowler_scan(
)
finding_instance.add_resources([resource_instance])
# Increment failed_findings_count cache if the finding status is FAIL and not muted
if status == FindingStatus.FAIL and not finding.muted:
resource_uid = finding.resource_uid
resource_failed_findings_cache[resource_uid] += 1
# Update scan resource summaries
scan_resource_cache.add(
(
@@ -330,6 +347,24 @@ def perform_prowler_scan(
scan_instance.state = StateChoices.COMPLETED
# Update failed_findings_count for all resources in batches if scan completed successfully
if resource_failed_findings_cache:
resources_to_update = []
for resource_uid, failed_count in resource_failed_findings_cache.items():
if resource_uid in resource_cache:
resource_instance = resource_cache[resource_uid]
resource_instance.failed_findings_count = failed_count
resources_to_update.append(resource_instance)
if resources_to_update:
update_objects_in_batches(
tenant_id=tenant_id,
model=Resource,
objects=resources_to_update,
fields=["failed_findings_count"],
batch_size=1000,
)
except Exception as e:
logger.error(f"Error performing scan {scan_id}: {e}")
exception = e
@@ -376,7 +411,6 @@ def perform_prowler_scan(
def aggregate_findings(tenant_id: str, scan_id: str):
"""
Aggregates findings for a given scan and stores the results in the ScanSummary table.
Also updates the failed_findings_count for each resource based on the latest findings.
This function retrieves all findings associated with a given `scan_id` and calculates various
metrics such as counts of failed, passed, and muted findings, as well as their deltas (new,
@@ -405,8 +439,6 @@ def aggregate_findings(tenant_id: str, scan_id: str):
- muted_new: Muted findings with a delta of 'new'.
- muted_changed: Muted findings with a delta of 'changed'.
"""
_update_resource_failed_findings_count(tenant_id, scan_id)
with rls_transaction(tenant_id):
findings = Finding.objects.filter(tenant_id=tenant_id, scan_id=scan_id)
@@ -531,48 +563,6 @@ def aggregate_findings(tenant_id: str, scan_id: str):
ScanSummary.objects.bulk_create(scan_aggregations, batch_size=3000)
def _update_resource_failed_findings_count(tenant_id: str, scan_id: str):
"""
Update the failed_findings_count field for resources based on the latest findings.
This function calculates the number of failed findings for each resource by:
1. Getting the latest finding for each finding.uid
2. Counting failed findings per resource
3. Updating the failed_findings_count field for each resource
Args:
tenant_id (str): The ID of the tenant to which the scan belongs.
scan_id (str): The ID of the scan for which to update resource counts.
"""
with rls_transaction(tenant_id):
scan = Scan.objects.get(pk=scan_id)
provider_id = str(scan.provider_id)
with connection.cursor() as cursor:
cursor.execute(
"""
UPDATE resources AS r
SET failed_findings_count = COALESCE((
SELECT COUNT(*) FROM (
SELECT DISTINCT ON (f.uid) f.uid
FROM findings AS f
JOIN resource_finding_mappings AS rfm
ON rfm.finding_id = f.id
WHERE f.tenant_id = %s
AND f.status = %s
AND f.muted = FALSE
AND rfm.resource_id = r.id
ORDER BY f.uid, f.inserted_at DESC
) AS latest_uids
), 0)
WHERE r.tenant_id = %s
AND r.provider_id = %s
""",
[tenant_id, FindingStatus.FAIL, tenant_id, provider_id],
)
def create_compliance_requirements(tenant_id: str, scan_id: str):
"""
Create detailed compliance requirement overview records for a scan.
+358 -75
View File
@@ -7,22 +7,14 @@ import pytest
from tasks.jobs.scan import (
_create_finding_delta,
_store_resources,
_update_resource_failed_findings_count,
create_compliance_requirements,
perform_prowler_scan,
)
from tasks.utils import CustomEncoder
from api.exceptions import ProviderConnectionError
from api.models import (
Finding,
Provider,
Resource,
Scan,
Severity,
StateChoices,
StatusChoices,
)
from api.models import Finding, Provider, Resource, Scan, StateChoices, StatusChoices
from prowler.lib.check.models import Severity
@pytest.mark.django_db
@@ -182,6 +174,9 @@ class TestPerformScan:
assert tag_keys == set(finding.resource_tags.keys())
assert tag_values == set(finding.resource_tags.values())
# Assert that failed_findings_count is 0 (finding is PASS and muted)
assert scan_resource.failed_findings_count == 0
@patch("tasks.jobs.scan.ProwlerScan")
@patch(
"tasks.jobs.scan.initialize_prowler_provider",
@@ -386,6 +381,359 @@ class TestPerformScan:
assert resource == resource_instance
assert resource_uid_tuple == (resource_instance.uid, resource_instance.region)
def test_perform_prowler_scan_with_failed_findings(
self,
tenants_fixture,
scans_fixture,
providers_fixture,
):
"""Test that failed findings increment the failed_findings_count"""
with (
patch("api.db_utils.rls_transaction"),
patch(
"tasks.jobs.scan.initialize_prowler_provider"
) as mock_initialize_prowler_provider,
patch("tasks.jobs.scan.ProwlerScan") as mock_prowler_scan_class,
patch(
"tasks.jobs.scan.PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE",
new_callable=dict,
),
patch("api.compliance.PROWLER_CHECKS", new_callable=dict),
):
# Ensure the database is empty
assert Finding.objects.count() == 0
assert Resource.objects.count() == 0
tenant = tenants_fixture[0]
scan = scans_fixture[0]
provider = providers_fixture[0]
# Ensure the provider type is 'aws'
provider.provider = Provider.ProviderChoices.AWS
provider.save()
tenant_id = str(tenant.id)
scan_id = str(scan.id)
provider_id = str(provider.id)
# Mock a FAIL finding that is not muted
fail_finding = MagicMock()
fail_finding.uid = "fail_finding_uid"
fail_finding.status = StatusChoices.FAIL
fail_finding.status_extended = "test fail status"
fail_finding.severity = Severity.high
fail_finding.check_id = "fail_check"
fail_finding.get_metadata.return_value = {"key": "value"}
fail_finding.resource_uid = "resource_uid_fail"
fail_finding.resource_name = "fail_resource"
fail_finding.region = "us-east-1"
fail_finding.service_name = "ec2"
fail_finding.resource_type = "instance"
fail_finding.resource_tags = {"env": "test"}
fail_finding.muted = False
fail_finding.raw = {}
fail_finding.resource_metadata = {"test": "metadata"}
fail_finding.resource_details = {"details": "test"}
fail_finding.partition = "aws"
fail_finding.compliance = {"compliance1": "FAIL"}
# Mock the ProwlerScan instance
mock_prowler_scan_instance = MagicMock()
mock_prowler_scan_instance.scan.return_value = [(100, [fail_finding])]
mock_prowler_scan_class.return_value = mock_prowler_scan_instance
# Mock prowler_provider
mock_prowler_provider_instance = MagicMock()
mock_prowler_provider_instance.get_regions.return_value = ["us-east-1"]
mock_initialize_prowler_provider.return_value = (
mock_prowler_provider_instance
)
# Call the function under test
perform_prowler_scan(tenant_id, scan_id, provider_id, [])
# Refresh instances from the database
scan.refresh_from_db()
scan_resource = Resource.objects.get(provider=provider)
# Assert that failed_findings_count is 1 (one FAIL finding not muted)
assert scan_resource.failed_findings_count == 1
def test_perform_prowler_scan_multiple_findings_same_resource(
self,
tenants_fixture,
scans_fixture,
providers_fixture,
):
"""Test that multiple FAIL findings on the same resource increment the counter correctly"""
with (
patch("api.db_utils.rls_transaction"),
patch(
"tasks.jobs.scan.initialize_prowler_provider"
) as mock_initialize_prowler_provider,
patch("tasks.jobs.scan.ProwlerScan") as mock_prowler_scan_class,
patch(
"tasks.jobs.scan.PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE",
new_callable=dict,
),
patch("api.compliance.PROWLER_CHECKS", new_callable=dict),
):
tenant = tenants_fixture[0]
scan = scans_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
tenant_id = str(tenant.id)
scan_id = str(scan.id)
provider_id = str(provider.id)
# Create multiple findings for the same resource
# Two FAIL findings (not muted) and one PASS finding
resource_uid = "shared_resource_uid"
fail_finding_1 = MagicMock()
fail_finding_1.uid = "fail_finding_1"
fail_finding_1.status = StatusChoices.FAIL
fail_finding_1.status_extended = "fail 1"
fail_finding_1.severity = Severity.high
fail_finding_1.check_id = "fail_check_1"
fail_finding_1.get_metadata.return_value = {"key": "value1"}
fail_finding_1.resource_uid = resource_uid
fail_finding_1.resource_name = "shared_resource"
fail_finding_1.region = "us-east-1"
fail_finding_1.service_name = "ec2"
fail_finding_1.resource_type = "instance"
fail_finding_1.resource_tags = {}
fail_finding_1.muted = False
fail_finding_1.raw = {}
fail_finding_1.resource_metadata = {}
fail_finding_1.resource_details = {}
fail_finding_1.partition = "aws"
fail_finding_1.compliance = {}
fail_finding_2 = MagicMock()
fail_finding_2.uid = "fail_finding_2"
fail_finding_2.status = StatusChoices.FAIL
fail_finding_2.status_extended = "fail 2"
fail_finding_2.severity = Severity.medium
fail_finding_2.check_id = "fail_check_2"
fail_finding_2.get_metadata.return_value = {"key": "value2"}
fail_finding_2.resource_uid = resource_uid
fail_finding_2.resource_name = "shared_resource"
fail_finding_2.region = "us-east-1"
fail_finding_2.service_name = "ec2"
fail_finding_2.resource_type = "instance"
fail_finding_2.resource_tags = {}
fail_finding_2.muted = False
fail_finding_2.raw = {}
fail_finding_2.resource_metadata = {}
fail_finding_2.resource_details = {}
fail_finding_2.partition = "aws"
fail_finding_2.compliance = {}
pass_finding = MagicMock()
pass_finding.uid = "pass_finding"
pass_finding.status = StatusChoices.PASS
pass_finding.status_extended = "pass"
pass_finding.severity = Severity.low
pass_finding.check_id = "pass_check"
pass_finding.get_metadata.return_value = {"key": "value3"}
pass_finding.resource_uid = resource_uid
pass_finding.resource_name = "shared_resource"
pass_finding.region = "us-east-1"
pass_finding.service_name = "ec2"
pass_finding.resource_type = "instance"
pass_finding.resource_tags = {}
pass_finding.muted = False
pass_finding.raw = {}
pass_finding.resource_metadata = {}
pass_finding.resource_details = {}
pass_finding.partition = "aws"
pass_finding.compliance = {}
# Mock the ProwlerScan instance
mock_prowler_scan_instance = MagicMock()
mock_prowler_scan_instance.scan.return_value = [
(100, [fail_finding_1, fail_finding_2, pass_finding])
]
mock_prowler_scan_class.return_value = mock_prowler_scan_instance
# Mock prowler_provider
mock_prowler_provider_instance = MagicMock()
mock_prowler_provider_instance.get_regions.return_value = ["us-east-1"]
mock_initialize_prowler_provider.return_value = (
mock_prowler_provider_instance
)
# Call the function under test
perform_prowler_scan(tenant_id, scan_id, provider_id, [])
# Refresh instances from the database
scan_resource = Resource.objects.get(provider=provider, uid=resource_uid)
# Assert that failed_findings_count is 2 (two FAIL findings, one PASS)
assert scan_resource.failed_findings_count == 2
def test_perform_prowler_scan_with_muted_findings(
self,
tenants_fixture,
scans_fixture,
providers_fixture,
):
"""Test that muted FAIL findings do not increment the failed_findings_count"""
with (
patch("api.db_utils.rls_transaction"),
patch(
"tasks.jobs.scan.initialize_prowler_provider"
) as mock_initialize_prowler_provider,
patch("tasks.jobs.scan.ProwlerScan") as mock_prowler_scan_class,
patch(
"tasks.jobs.scan.PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE",
new_callable=dict,
),
patch("api.compliance.PROWLER_CHECKS", new_callable=dict),
):
tenant = tenants_fixture[0]
scan = scans_fixture[0]
provider = providers_fixture[0]
provider.provider = Provider.ProviderChoices.AWS
provider.save()
tenant_id = str(tenant.id)
scan_id = str(scan.id)
provider_id = str(provider.id)
# Mock a FAIL finding that is muted
muted_fail_finding = MagicMock()
muted_fail_finding.uid = "muted_fail_finding"
muted_fail_finding.status = StatusChoices.FAIL
muted_fail_finding.status_extended = "muted fail"
muted_fail_finding.severity = Severity.high
muted_fail_finding.check_id = "muted_fail_check"
muted_fail_finding.get_metadata.return_value = {"key": "value"}
muted_fail_finding.resource_uid = "muted_resource_uid"
muted_fail_finding.resource_name = "muted_resource"
muted_fail_finding.region = "us-east-1"
muted_fail_finding.service_name = "ec2"
muted_fail_finding.resource_type = "instance"
muted_fail_finding.resource_tags = {}
muted_fail_finding.muted = True
muted_fail_finding.raw = {}
muted_fail_finding.resource_metadata = {}
muted_fail_finding.resource_details = {}
muted_fail_finding.partition = "aws"
muted_fail_finding.compliance = {}
# Mock the ProwlerScan instance
mock_prowler_scan_instance = MagicMock()
mock_prowler_scan_instance.scan.return_value = [(100, [muted_fail_finding])]
mock_prowler_scan_class.return_value = mock_prowler_scan_instance
# Mock prowler_provider
mock_prowler_provider_instance = MagicMock()
mock_prowler_provider_instance.get_regions.return_value = ["us-east-1"]
mock_initialize_prowler_provider.return_value = (
mock_prowler_provider_instance
)
# Call the function under test
perform_prowler_scan(tenant_id, scan_id, provider_id, [])
# Refresh instances from the database
scan_resource = Resource.objects.get(provider=provider)
# Assert that failed_findings_count is 0 (FAIL finding is muted)
assert scan_resource.failed_findings_count == 0
def test_perform_prowler_scan_reset_failed_findings_count(
self,
tenants_fixture,
providers_fixture,
resources_fixture,
):
"""Test that failed_findings_count is reset to 0 at the beginning of each scan"""
# Use existing resource from fixture and set initial failed_findings_count
tenant = tenants_fixture[0]
provider = providers_fixture[0]
resource = resources_fixture[0]
# Set a non-zero failed_findings_count initially
resource.failed_findings_count = 5
resource.save()
# Create a new scan
scan = Scan.objects.create(
name="Reset Test Scan",
provider=provider,
trigger=Scan.TriggerChoices.MANUAL,
state=StateChoices.AVAILABLE,
tenant_id=tenant.id,
)
with (
patch("api.db_utils.rls_transaction"),
patch(
"tasks.jobs.scan.initialize_prowler_provider"
) as mock_initialize_prowler_provider,
patch("tasks.jobs.scan.ProwlerScan") as mock_prowler_scan_class,
patch(
"tasks.jobs.scan.PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE",
new_callable=dict,
),
patch("api.compliance.PROWLER_CHECKS", new_callable=dict),
):
provider.provider = Provider.ProviderChoices.AWS
provider.save()
tenant_id = str(tenant.id)
scan_id = str(scan.id)
provider_id = str(provider.id)
# Mock a PASS finding for the existing resource
pass_finding = MagicMock()
pass_finding.uid = "reset_test_finding"
pass_finding.status = StatusChoices.PASS
pass_finding.status_extended = "reset test pass"
pass_finding.severity = Severity.low
pass_finding.check_id = "reset_test_check"
pass_finding.get_metadata.return_value = {"key": "value"}
pass_finding.resource_uid = resource.uid
pass_finding.resource_name = resource.name
pass_finding.region = resource.region
pass_finding.service_name = resource.service
pass_finding.resource_type = resource.type
pass_finding.resource_tags = {}
pass_finding.muted = False
pass_finding.raw = {}
pass_finding.resource_metadata = {}
pass_finding.resource_details = {}
pass_finding.partition = "aws"
pass_finding.compliance = {}
# Mock the ProwlerScan instance
mock_prowler_scan_instance = MagicMock()
mock_prowler_scan_instance.scan.return_value = [(100, [pass_finding])]
mock_prowler_scan_class.return_value = mock_prowler_scan_instance
# Mock prowler_provider
mock_prowler_provider_instance = MagicMock()
mock_prowler_provider_instance.get_regions.return_value = [resource.region]
mock_initialize_prowler_provider.return_value = (
mock_prowler_provider_instance
)
# Call the function under test
perform_prowler_scan(tenant_id, scan_id, provider_id, [])
# Refresh resource from the database
resource.refresh_from_db()
# Assert that failed_findings_count was reset to 0 during the scan
assert resource.failed_findings_count == 0
# TODO Add tests for aggregations
@@ -697,68 +1045,3 @@ class TestCreateComplianceRequirements:
assert "requirements_created" in result
assert result["requirements_created"] >= 0
@pytest.mark.django_db
class TestUpdateResourceFailedFindingsCount:
def test_execute_sql_update(
self, tenants_fixture, scans_fixture, providers_fixture, resources_fixture
):
resource = resources_fixture[0]
tenant_id = resource.tenant_id
scan_id = resource.provider.scans.first().id
# Common kwargs for all failing findings
base_kwargs = {
"tenant_id": tenant_id,
"scan_id": scan_id,
"delta": None,
"status": StatusChoices.FAIL,
"status_extended": "test status extended",
"impact": Severity.critical,
"impact_extended": "test impact extended",
"severity": Severity.critical,
"raw_result": {
"status": StatusChoices.FAIL,
"impact": Severity.critical,
"severity": Severity.critical,
},
"tags": {"test": "dev-qa"},
"check_id": "test_check_id",
"check_metadata": {
"CheckId": "test_check_id",
"Description": "test description apple sauce",
"servicename": "ec2",
},
"first_seen_at": "2024-01-02T00:00:00Z",
}
# UIDs to create (two with same UID, one unique)
uids = ["test_finding_uid_1", "test_finding_uid_1", "test_finding_uid_2"]
# Create findings and associate with the resource
for uid in uids:
finding = Finding.objects.create(uid=uid, **base_kwargs)
finding.add_resources([resource])
resource.refresh_from_db()
assert resource.failed_findings_count == 0
_update_resource_failed_findings_count(tenant_id=tenant_id, scan_id=scan_id)
resource.refresh_from_db()
# Only two since two findings share the same UID
assert resource.failed_findings_count == 2
@patch("tasks.jobs.scan.Scan.objects.get")
def test_scan_not_found(
self,
mock_scan_get,
):
mock_scan_get.side_effect = Scan.DoesNotExist
with pytest.raises(Scan.DoesNotExist):
_update_resource_failed_findings_count(
"8614ca97-8370-4183-a7f7-e96a6c7d2c93",
"4705bed5-8782-4e8b-bab6-55e8043edaa6",
)
+5 -4
View File
@@ -23,6 +23,7 @@ import argparse
import json
import os
import re
import shlex
import signal
import socket
import subprocess
@@ -145,11 +146,11 @@ def _get_script_arguments():
def _run_prowler(prowler_args):
_debug("Running prowler with args: {0}".format(prowler_args), 1)
_prowler_command = "{prowler}/prowler {args}".format(
prowler=PATH_TO_PROWLER, args=prowler_args
_prowler_command = shlex.split(
"{prowler}/prowler {args}".format(prowler=PATH_TO_PROWLER, args=prowler_args)
)
_debug("Running command: {0}".format(_prowler_command), 2)
_process = subprocess.Popen(_prowler_command, stdout=subprocess.PIPE, shell=True)
_debug("Running command: {0}".format(" ".join(_prowler_command)), 2)
_process = subprocess.Popen(_prowler_command, stdout=subprocess.PIPE)
_output, _error = _process.communicate()
_debug("Raw prowler output: {0}".format(_output), 3)
_debug("Raw prowler error: {0}".format(_error), 3)
Generated
+416 -375
View File
File diff suppressed because it is too large Load Diff
+23
View File
@@ -2,6 +2,29 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [v5.10.0] (Prowler UNRELEASED)
### Added
- Add `bedrock_api_key_no_administrative_privileges` check for AWS provider [(#8321)](https://github.com/prowler-cloud/prowler/pull/8321)
- Support App Key Content in GitHub provider [(#8271)](https://github.com/prowler-cloud/prowler/pull/8271)
---
## [v5.9.3] (Prowler UNRELEASED)
### Fixed
- Add more validations to Azure Storage models when some values are None to avoid serialization issues [(#8325)](https://github.com/prowler-cloud/prowler/pull/8325)
- `sns_topics_not_publicly_accessible` false positive with `aws:SourceArn` conditions [(#8326)](https://github.com/prowler-cloud/prowler/issues/8326)
---
## [v5.9.2] (Prowler v5.9.2)
### Fixed
- Use the correct resource name in `defender_domain_dkim_enabled` check [(#8334)](https://github.com/prowler-cloud/prowler/pull/8334)
---
## [v5.9.0] (Prowler v5.9.0)
### Added
+1 -1
View File
@@ -12,7 +12,7 @@ from prowler.lib.logger import logger
timestamp = datetime.today()
timestamp_utc = datetime.now(timezone.utc).replace(tzinfo=timezone.utc)
prowler_version = "5.9.0"
prowler_version = "5.9.3"
html_logo_url = "https://github.com/prowler-cloud/prowler/"
square_logo_img = "https://prowler.com/wp-content/uploads/logo-html.png"
aws_logo = "https://user-images.githubusercontent.com/38561120/235953920-3e3fba08-0795-41dc-b480-9bea57db9f2e.png"
+191 -61
View File
@@ -223,6 +223,108 @@ def check_full_service_access(service: str, policy: dict) -> bool:
return all_target_service_actions.issubset(actions_allowed_on_all_resources)
def has_public_principal(statement: dict) -> bool:
"""
Check if a policy statement has a public principal.
Args:
statement (dict): IAM policy statement
Returns:
bool: True if the statement has a public principal, False otherwise
"""
principal = statement.get("Principal", "")
return (
"*" in principal
or "arn:aws:iam::*:root" in principal
or (
isinstance(principal, dict)
and (
"*" in principal.get("AWS", "")
or "arn:aws:iam::*:root" in principal.get("AWS", "")
or (
isinstance(principal.get("AWS"), list)
and (
"*" in principal["AWS"]
or "arn:aws:iam::*:root" in principal["AWS"]
)
)
or "*" in principal.get("CanonicalUser", "")
or "arn:aws:iam::*:root" in principal.get("CanonicalUser", "")
)
)
)
def has_restrictive_source_arn_condition(
statement: dict, source_account: str = ""
) -> bool:
"""
Check if a policy statement has a restrictive aws:SourceArn condition.
A SourceArn condition is considered restrictive if:
1. It doesn't contain overly permissive wildcards (like "*" or "arn:aws:s3:::*")
2. When source_account is provided, the ARN either contains no account field (like S3 buckets)
or contains the source_account
Args:
statement (dict): IAM policy statement
source_account (str): The account to check restrictions for (optional)
Returns:
bool: True if the statement has a restrictive aws:SourceArn condition, False otherwise
"""
if "Condition" not in statement:
return False
for condition_operator in statement["Condition"]:
for condition_key, condition_value in statement["Condition"][
condition_operator
].items():
if condition_key.lower() == "aws:sourcearn":
arn_values = (
condition_value
if isinstance(condition_value, list)
else [condition_value]
)
for arn_value in arn_values:
if (
arn_value == "*" # Global wildcard
or arn_value.count("*")
>= 3 # Too many wildcards (e.g., arn:aws:*:*:*:*)
or (
isinstance(arn_value, str)
and (
arn_value.endswith(
":::*"
) # Service-wide wildcard (e.g., arn:aws:s3:::*)
or arn_value.endswith(
":*"
) # Resource wildcard (e.g., arn:aws:sns:us-east-1:123456789012:*)
)
)
):
return False
if source_account:
arn_parts = arn_value.split(":")
if len(arn_parts) > 4 and arn_parts[4] and arn_parts[4] != "*":
if arn_parts[4].isdigit():
if source_account not in arn_value:
return False
else:
if arn_parts[4] != source_account:
return False
elif len(arn_parts) > 4 and arn_parts[4] == "*":
return False
# else: ARN doesn't contain account field (like S3 bucket), so it's restrictive
return True
return False
def is_condition_restricting_from_private_ip(condition_statement: dict) -> bool:
"""Check if the policy condition is coming from a private IP address.
@@ -303,61 +405,49 @@ def is_policy_public(
for statement in policy.get("Statement", []):
# Only check allow statements
if statement["Effect"] == "Allow":
has_public_access = has_public_principal(statement)
principal = statement.get("Principal", "")
if (
"*" in principal
or "arn:aws:iam::*:root" in principal
or (
isinstance(principal, dict)
and (
"*" in principal.get("AWS", "")
or "arn:aws:iam::*:root" in principal.get("AWS", "")
or (
isinstance(principal.get("AWS"), str)
and source_account
and not is_cross_account_allowed
and source_account not in principal.get("AWS", "")
)
or (
isinstance(principal.get("AWS"), list)
and (
"*" in principal["AWS"]
or "arn:aws:iam::*:root" in principal["AWS"]
or (
source_account
and not is_cross_account_allowed
and not any(
source_account in principal_aws
for principal_aws in principal["AWS"]
)
)
)
)
or "*" in principal.get("CanonicalUser", "")
or "arn:aws:iam::*:root"
in principal.get("CanonicalUser", "")
or check_cross_service_confused_deputy
and (
# Check if function can be invoked by other AWS services if check_cross_service_confused_deputy is True
(
".amazonaws.com" in principal.get("Service", "")
or ".amazon.com" in principal.get("Service", "")
or "*" in principal.get("Service", "")
)
and (
"secretsmanager.amazonaws.com"
not in principal.get(
"Service", ""
) # AWS ensures that resources called by SecretsManager are executed in the same AWS account
or "eks.amazonaws.com"
not in principal.get(
"Service", ""
) # AWS ensures that resources called by EKS are executed in the same AWS account
)
)
if not has_public_access and isinstance(principal, dict):
# Check for cross-account access when not allowed
if (
isinstance(principal.get("AWS"), str)
and source_account
and not is_cross_account_allowed
and source_account not in principal.get("AWS", "")
) or (
isinstance(principal.get("AWS"), list)
and source_account
and not is_cross_account_allowed
and not any(
source_account in principal_aws
for principal_aws in principal["AWS"]
)
)
) and (
):
has_public_access = True
# Check for cross-service confused deputy
if check_cross_service_confused_deputy and (
# Check if function can be invoked by other AWS services if check_cross_service_confused_deputy is True
(
".amazonaws.com" in principal.get("Service", "")
or ".amazon.com" in principal.get("Service", "")
or "*" in principal.get("Service", "")
)
and (
"secretsmanager.amazonaws.com"
not in principal.get(
"Service", ""
) # AWS ensures that resources called by SecretsManager are executed in the same AWS account
or "eks.amazonaws.com"
not in principal.get(
"Service", ""
) # AWS ensures that resources called by EKS are executed in the same AWS account
)
):
has_public_access = True
if has_public_access and (
not not_allowed_actions # If not_allowed_actions is empty, the function will not consider the actions in the policy
or (
statement.get(
@@ -498,9 +588,29 @@ def is_condition_block_restrictive(
"aws:sourcevpc" != value
and "aws:sourcevpce" != value
):
if source_account not in item:
is_condition_key_restrictive = False
break
if value == "aws:sourcearn":
# Use the specialized function to properly validate SourceArn restrictions
# Create a minimal statement to test with our function
test_statement = {
"Condition": {
condition_operator: {
value: condition_statement[
condition_operator
][value]
}
}
}
is_condition_key_restrictive = (
has_restrictive_source_arn_condition(
test_statement, source_account
)
)
if not is_condition_key_restrictive:
break
else:
if source_account not in item:
is_condition_key_restrictive = False
break
if is_condition_key_restrictive:
is_condition_valid = True
@@ -516,11 +626,31 @@ def is_condition_block_restrictive(
if is_cross_account_allowed:
is_condition_valid = True
else:
if (
source_account
in condition_statement[condition_operator][value]
):
is_condition_valid = True
if value == "aws:sourcearn":
# Use the specialized function to properly validate SourceArn restrictions
# Create a minimal statement to test with our function
test_statement = {
"Condition": {
condition_operator: {
value: condition_statement[
condition_operator
][value]
}
}
}
is_condition_valid = (
has_restrictive_source_arn_condition(
test_statement, source_account
)
)
else:
if (
source_account
in condition_statement[condition_operator][
value
]
):
is_condition_valid = True
return is_condition_valid
@@ -1,5 +1,7 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.iam.lib.policy import (
has_public_principal,
has_restrictive_source_arn_condition,
is_condition_block_restrictive,
is_condition_block_restrictive_organization,
is_condition_block_restrictive_sns_endpoint,
@@ -16,46 +18,26 @@ class sns_topics_not_publicly_accessible(Check):
report.status_extended = (
f"SNS topic {topic.name} is not publicly accessible."
)
if topic.policy:
for statement in topic.policy["Statement"]:
# Only check allow statements
if statement["Effect"] == "Allow":
if (
"*" in statement["Principal"]
or (
"AWS" in statement["Principal"]
and "*" in statement["Principal"]["AWS"]
if statement["Effect"] == "Allow" and has_public_principal(
statement
):
if has_restrictive_source_arn_condition(statement):
break
elif "Condition" in statement:
condition_account = is_condition_block_restrictive(
statement["Condition"], sns_client.audited_account
)
or (
"CanonicalUser" in statement["Principal"]
and "*" in statement["Principal"]["CanonicalUser"]
condition_org = is_condition_block_restrictive_organization(
statement["Condition"]
)
):
condition_account = False
condition_org = False
condition_endpoint = False
if (
"Condition" in statement
and is_condition_block_restrictive(
statement["Condition"],
sns_client.audited_account,
condition_endpoint = (
is_condition_block_restrictive_sns_endpoint(
statement["Condition"]
)
):
condition_account = True
if (
"Condition" in statement
and is_condition_block_restrictive_organization(
statement["Condition"],
)
):
condition_org = True
if (
"Condition" in statement
and is_condition_block_restrictive_sns_endpoint(
statement["Condition"],
)
):
condition_endpoint = True
)
if condition_account and condition_org:
report.status_extended = f"SNS topic {topic.name} is not public because its policy only allows access from the account {sns_client.audited_account} and an organization."
@@ -69,7 +51,11 @@ class sns_topics_not_publicly_accessible(Check):
report.status = "FAIL"
report.status_extended = f"SNS topic {topic.name} is public because its policy allows public access."
break
else:
# Public principal with no conditions = public
report.status = "FAIL"
report.status_extended = f"SNS topic {topic.name} is public because its policy allows public access."
break
findings.append(report)
return findings
@@ -70,17 +70,44 @@ class Storage(AzureService):
],
key_expiration_period_in_days=key_expiration_period_in_days,
location=storage_account.location,
default_to_entra_authorization=getattr(
storage_account,
"default_to_o_auth_authentication",
False,
default_to_entra_authorization=(
False
if getattr(
storage_account,
"default_to_o_auth_authentication",
False,
)
is None
else getattr(
storage_account,
"default_to_o_auth_authentication",
False,
)
),
replication_settings=replication_settings,
allow_cross_tenant_replication=getattr(
storage_account, "allow_cross_tenant_replication", True
allow_cross_tenant_replication=(
True
if getattr(
storage_account,
"allow_cross_tenant_replication",
True,
)
is None
else getattr(
storage_account,
"allow_cross_tenant_replication",
True,
)
),
allow_shared_key_access=getattr(
storage_account, "allow_shared_key_access", True
allow_shared_key_access=(
True
if getattr(
storage_account, "allow_shared_key_access", True
)
is None
else getattr(
storage_account, "allow_shared_key_access", True
)
),
)
)
@@ -26,7 +26,7 @@ class defender_domain_dkim_enabled(Check):
report = CheckReportM365(
metadata=self.metadata(),
resource=config,
resource_name="DKIM Configuration",
resource_name=config.id,
resource_id=config.id,
)
report.status = "FAIL"
+1 -1
View File
@@ -71,7 +71,7 @@ maintainers = [{name = "Prowler Engineering", email = "engineering@prowler.com"}
name = "prowler"
readme = "README.md"
requires-python = ">3.9.1,<3.13"
version = "5.9.0"
version = "5.9.3"
[project.scripts]
prowler = "prowler.__main__:prowler"
@@ -0,0 +1,256 @@
#!/usr/bin/env python
"""
Security test for prowler-wrapper.py command injection vulnerability
This test demonstrates the command injection vulnerability and validates the fix
"""
import os
import shutil
import sys
import tempfile
import unittest
from unittest.mock import MagicMock, patch
class TestProwlerWrapperSecurity(unittest.TestCase):
"""Test cases for command injection vulnerability in prowler-wrapper.py"""
def setUp(self):
"""Set up test environment"""
# Create a temporary directory for testing
self.test_dir = tempfile.mkdtemp()
self.prowler_wrapper_path = os.path.join(
os.path.dirname(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
),
"contrib",
"wazuh",
"prowler-wrapper.py",
)
def tearDown(self):
"""Clean up test environment"""
shutil.rmtree(self.test_dir, ignore_errors=True)
def _import_prowler_wrapper(self):
"""Helper to import prowler_wrapper with mocked WAZUH_PATH"""
sys.path.insert(0, os.path.dirname(self.prowler_wrapper_path))
# Mock the WAZUH_PATH that's read at module level
with patch("builtins.open", create=True) as mock_open:
mock_open.return_value.readline.return_value = 'DIRECTORY="/opt/wazuh"'
import importlib.util
spec = importlib.util.spec_from_file_location(
"prowler_wrapper", self.prowler_wrapper_path
)
prowler_wrapper = importlib.util.module_from_spec(spec)
spec.loader.exec_module(prowler_wrapper)
return prowler_wrapper._run_prowler
def test_command_injection_semicolon(self):
"""Test command injection using semicolon"""
# Create a test file that should not be created if injection is prevented
test_file = os.path.join(self.test_dir, "pwned.txt")
# Malicious profile that attempts to create a file
malicious_profile = f"test; touch {test_file}"
# Mock the subprocess.Popen to capture the command
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the vulnerable function
_run_prowler = self._import_prowler_wrapper()
# Run with malicious input
_run_prowler(f'-p "{malicious_profile}" -V')
# Check that Popen was called
self.assertTrue(mock_popen.called)
# Get the actual command that was passed to Popen
actual_command = mock_popen.call_args[0][0]
# With the fix, the command should be a list (from shlex.split)
# and should NOT have shell=True
self.assertIsInstance(
actual_command, list, "Command should be a list after shlex.split"
)
# Check that shell=True is not in the call
call_kwargs = mock_popen.call_args[1]
self.assertNotIn(
"shell",
call_kwargs,
"shell parameter should not be present (defaults to False)",
)
def test_command_injection_ampersand(self):
"""Test command injection using ampersand"""
# Create a test file that should not be created if injection is prevented
test_file = os.path.join(self.test_dir, "pwned2.txt")
# Malicious profile that attempts to create a file
malicious_profile = f"test && touch {test_file}"
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the function
_run_prowler = self._import_prowler_wrapper()
# Run with malicious input
_run_prowler(f'-p "{malicious_profile}" -V')
# Get the actual command
actual_command = mock_popen.call_args[0][0]
# Verify it's a list (safe execution)
self.assertIsInstance(actual_command, list)
# The malicious characters should be preserved as part of the argument
# not interpreted as shell commands
command_str = " ".join(actual_command)
self.assertIn(
"&&",
command_str,
"Shell metacharacters should be preserved as literals",
)
def test_command_injection_pipe(self):
"""Test command injection using pipe"""
malicious_profile = 'test | echo "injected"'
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the function
_run_prowler = self._import_prowler_wrapper()
# Run with malicious input
_run_prowler(f'-p "{malicious_profile}" -V')
# Get the actual command
actual_command = mock_popen.call_args[0][0]
# Verify safe execution
self.assertIsInstance(actual_command, list)
# Pipe should be preserved as literal
command_str = " ".join(actual_command)
self.assertIn("|", command_str)
def test_command_injection_backticks(self):
"""Test command injection using backticks"""
malicious_profile = "test `echo injected`"
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the function
_run_prowler = self._import_prowler_wrapper()
# Run with malicious input
_run_prowler(f'-p "{malicious_profile}" -V')
# Get the actual command
actual_command = mock_popen.call_args[0][0]
# Verify safe execution
self.assertIsInstance(actual_command, list)
# Backticks should be preserved as literals
command_str = " ".join(actual_command)
self.assertIn("`", command_str)
def test_command_injection_dollar_parentheses(self):
"""Test command injection using $() syntax"""
malicious_profile = "test $(echo injected)"
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the function
_run_prowler = self._import_prowler_wrapper()
# Run with malicious input
_run_prowler(f'-p "{malicious_profile}" -V')
# Get the actual command
actual_command = mock_popen.call_args[0][0]
# Verify safe execution
self.assertIsInstance(actual_command, list)
# $() should be preserved as literals
command_str = " ".join(actual_command)
self.assertIn("$(", command_str)
def test_legitimate_profile_name(self):
"""Test that legitimate profile names still work correctly"""
legitimate_profile = "production-aws-profile"
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the function
_run_prowler = self._import_prowler_wrapper()
# Run with legitimate input
result = _run_prowler(f"-p {legitimate_profile} -V")
# Verify the function returns output
self.assertEqual(result, b"test output")
# Verify Popen was called correctly
actual_command = mock_popen.call_args[0][0]
self.assertIsInstance(actual_command, list)
# Check the profile is passed correctly
command_str = " ".join(actual_command)
self.assertIn(legitimate_profile, command_str)
def test_shlex_split_behavior(self):
"""Test that shlex properly handles quoted arguments"""
profile_with_spaces = "my profile name"
with patch("subprocess.Popen") as mock_popen:
mock_process = MagicMock()
mock_process.communicate.return_value = (b"test output", None)
mock_popen.return_value = mock_process
# Import and run the function
_run_prowler = self._import_prowler_wrapper()
# Run with profile containing spaces
_run_prowler(f'-p "{profile_with_spaces}" -V')
# Get the actual command
actual_command = mock_popen.call_args[0][0]
# Verify it's properly split
self.assertIsInstance(actual_command, list)
# The profile name should be preserved as a single argument
# despite containing spaces
self.assertIn("my profile name", actual_command)
if __name__ == "__main__":
unittest.main()
@@ -404,7 +404,7 @@ class Test_ec2_securitygroup_allow_ingress_from_internet_to_all_ports:
new=EC2(aws_provider),
),
mock.patch(
"prowler.providers.aws.services.vpc.vpc_service.VPC",
"prowler.providers.aws.services.ec2.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.ec2_securitygroup_allow_ingress_from_internet_to_all_ports.vpc_client",
new=VPC(aws_provider),
),
mock.patch(
@@ -6,6 +6,8 @@ from prowler.providers.aws.services.iam.lib.policy import (
check_full_service_access,
get_effective_actions,
has_codebuild_trusted_principal,
has_public_principal,
has_restrictive_source_arn_condition,
is_codebuild_using_allowed_github_org,
is_condition_block_restrictive,
is_condition_block_restrictive_organization,
@@ -2451,3 +2453,266 @@ def test_has_codebuild_trusted_principal_list():
],
}
assert has_codebuild_trusted_principal(trust_policy) is True
class Test_has_public_principal:
"""Tests for the has_public_principal function"""
def test_has_public_principal_wildcard_string(self):
"""Test public principal detection with wildcard string"""
statement = {"Principal": "*"}
assert has_public_principal(statement) is True
def test_has_public_principal_root_arn_string(self):
"""Test public principal detection with root ARN string"""
statement = {"Principal": "arn:aws:iam::*:root"}
assert has_public_principal(statement) is True
def test_has_public_principal_aws_dict_wildcard(self):
"""Test public principal detection with AWS dict containing wildcard"""
statement = {"Principal": {"AWS": "*"}}
assert has_public_principal(statement) is True
def test_has_public_principal_aws_dict_root_arn(self):
"""Test public principal detection with AWS dict containing root ARN"""
statement = {"Principal": {"AWS": "arn:aws:iam::*:root"}}
assert has_public_principal(statement) is True
def test_has_public_principal_aws_list_wildcard(self):
"""Test public principal detection with AWS list containing wildcard"""
statement = {"Principal": {"AWS": ["arn:aws:iam::123456789012:user/test", "*"]}}
assert has_public_principal(statement) is True
def test_has_public_principal_aws_list_root_arn(self):
"""Test public principal detection with AWS list containing root ARN"""
statement = {
"Principal": {
"AWS": ["arn:aws:iam::123456789012:user/test", "arn:aws:iam::*:root"]
}
}
assert has_public_principal(statement) is True
def test_has_public_principal_canonical_user_wildcard(self):
"""Test public principal detection with CanonicalUser wildcard"""
statement = {"Principal": {"CanonicalUser": "*"}}
assert has_public_principal(statement) is True
def test_has_public_principal_canonical_user_root_arn(self):
"""Test public principal detection with CanonicalUser root ARN"""
statement = {"Principal": {"CanonicalUser": "arn:aws:iam::*:root"}}
assert has_public_principal(statement) is True
def test_has_public_principal_no_principal(self):
"""Test with statement that has no Principal field"""
statement = {"Effect": "Allow", "Action": "s3:GetObject"}
assert has_public_principal(statement) is False
def test_has_public_principal_empty_principal(self):
"""Test with empty principal"""
statement = {"Principal": ""}
assert has_public_principal(statement) is False
def test_has_public_principal_specific_account(self):
"""Test with specific account principal (not public)"""
statement = {"Principal": {"AWS": "arn:aws:iam::123456789012:root"}}
assert has_public_principal(statement) is False
def test_has_public_principal_service_principal(self):
"""Test with service principal (not public)"""
statement = {"Principal": {"Service": "lambda.amazonaws.com"}}
assert has_public_principal(statement) is False
def test_has_public_principal_mixed_principals(self):
"""Test with mixed principals including public one"""
statement = {
"Principal": {
"AWS": ["arn:aws:iam::123456789012:user/test"],
"Service": "lambda.amazonaws.com",
"CanonicalUser": "*",
}
}
assert has_public_principal(statement) is True
class Test_has_restrictive_source_arn_condition:
"""Tests for the has_restrictive_source_arn_condition function"""
def test_no_condition_block(self):
"""Test statement without Condition block"""
statement = {"Effect": "Allow", "Principal": "*", "Action": "s3:GetObject"}
assert has_restrictive_source_arn_condition(statement) is False
def test_no_source_arn_condition(self):
"""Test with condition block but no aws:SourceArn"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "s3:GetObject",
"Condition": {"StringEquals": {"aws:SourceAccount": "123456789012"}},
}
assert has_restrictive_source_arn_condition(statement) is False
def test_restrictive_source_arn_s3_bucket(self):
"""Test restrictive SourceArn condition with S3 bucket"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"aws:SourceArn": "arn:aws:s3:::my-bucket"}},
}
assert has_restrictive_source_arn_condition(statement) is True
def test_restrictive_source_arn_lambda_function(self):
"""Test restrictive SourceArn condition with Lambda function"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "arn:aws:lambda:us-east-1:123456789012:function:MyFunction"
}
},
}
assert has_restrictive_source_arn_condition(statement) is True
def test_non_restrictive_global_wildcard(self):
"""Test non-restrictive SourceArn with global wildcard"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"aws:SourceArn": "*"}},
}
assert has_restrictive_source_arn_condition(statement) is False
def test_non_restrictive_service_wildcard(self):
"""Test non-restrictive SourceArn with service wildcard"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"aws:SourceArn": "arn:aws:s3:::*"}},
}
assert has_restrictive_source_arn_condition(statement) is False
def test_non_restrictive_multi_wildcard(self):
"""Test non-restrictive SourceArn with multiple wildcards"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"aws:SourceArn": "arn:aws:*:*:*:*"}},
}
assert has_restrictive_source_arn_condition(statement) is False
def test_non_restrictive_resource_wildcard(self):
"""Test non-restrictive SourceArn with resource wildcard"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnLike": {"aws:SourceArn": "arn:aws:lambda:us-east-1:123456789012:*"}
},
}
assert has_restrictive_source_arn_condition(statement) is False
def test_source_arn_list_with_valid_arn(self):
"""Test SourceArn condition with list containing valid ARN"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnLike": {
"aws:SourceArn": ["arn:aws:s3:::bucket1", "arn:aws:s3:::bucket2"]
}
},
}
assert has_restrictive_source_arn_condition(statement) is True
def test_source_arn_list_with_wildcard(self):
"""Test SourceArn condition with list containing wildcard"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"aws:SourceArn": ["arn:aws:s3:::bucket1", "*"]}},
}
assert has_restrictive_source_arn_condition(statement) is False
def test_source_arn_with_account_validation_match(self):
"""Test SourceArn with account validation - matching account"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:lambda:us-east-1:123456789012:function:MyFunction"
}
},
}
assert has_restrictive_source_arn_condition(statement, "123456789012") is True
def test_source_arn_with_account_validation_mismatch(self):
"""Test SourceArn with account validation - non-matching account"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:lambda:us-east-1:123456789012:function:MyFunction"
}
},
}
assert has_restrictive_source_arn_condition(statement, "987654321098") is False
def test_source_arn_with_account_wildcard(self):
"""Test SourceArn with account wildcard"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:lambda:us-east-1:*:function:MyFunction"
}
},
}
assert has_restrictive_source_arn_condition(statement, "123456789012") is False
def test_source_arn_s3_bucket_no_account_field(self):
"""Test SourceArn with S3 bucket (no account field) - should be restrictive"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"aws:SourceArn": "arn:aws:s3:::my-bucket"}},
}
assert has_restrictive_source_arn_condition(statement, "123456789012") is True
def test_source_arn_case_insensitive(self):
"""Test SourceArn condition key is case insensitive"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {"ArnLike": {"AWS:SourceArn": "arn:aws:s3:::my-bucket"}},
}
assert has_restrictive_source_arn_condition(statement) is True
def test_source_arn_mixed_operators(self):
"""Test SourceArn with multiple condition operators"""
statement = {
"Effect": "Allow",
"Principal": "*",
"Action": "sns:Publish",
"Condition": {
"ArnLike": {"aws:SourceArn": "arn:aws:s3:::my-bucket"},
"StringEquals": {"aws:SourceAccount": "123456789012"},
},
}
assert has_restrictive_source_arn_condition(statement) is True
@@ -2,9 +2,10 @@ from typing import Any, Dict
from unittest import mock
from uuid import uuid4
import pytest
from prowler.providers.aws.services.sns.sns_service import Topic
from tests.providers.aws.utils import AWS_ACCOUNT_NUMBER, AWS_REGION_EU_WEST_1
import pytest
kms_key_id = str(uuid4())
topic_name = "test-topic"
@@ -98,6 +99,73 @@ test_policy_restricted_principal_account_organization = {
]
}
test_policy_restricted_source_arn = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SNS:Publish",
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {
"ArnLike": {"aws:SourceArn": "arn:aws:s3:::test-bucket-name"}
},
}
],
}
test_policy_invalid_source_arn = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SNS:Publish",
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {"ArnLike": {"aws:SourceArn": "invalid-arn-format"}},
}
],
}
test_policy_unrestricted_source_arn_wildcard = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SNS:Publish",
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {"ArnLike": {"aws:SourceArn": "*"}},
}
],
}
test_policy_unrestricted_source_arn_service_wildcard = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SNS:Publish",
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {"ArnLike": {"aws:SourceArn": "arn:aws:s3:::*"}},
}
],
}
test_policy_unrestricted_source_arn_multi_wildcard = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": "*"},
"Action": "SNS:Publish",
"Resource": f"arn:aws:sns:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:{topic_name}",
"Condition": {"ArnLike": {"aws:SourceArn": "arn:aws:*:*:*:*"}},
}
],
}
def generate_policy_restricted_on_sns_endpoint(endpoint: str) -> Dict[str, Any]:
return {
@@ -396,6 +464,78 @@ class Test_sns_topics_not_publicly_accessible:
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_source_arn_restriction(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_restricted_source_arn,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not publicly accessible."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_invalid_source_arn(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_invalid_source_arn,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not publicly accessible."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
@pytest.mark.parametrize(
"endpoint",
[
@@ -443,6 +583,114 @@ class Test_sns_topics_not_publicly_accessible:
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_unrestricted_source_arn_wildcard(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_unrestricted_source_arn_wildcard,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is public because its policy allows public access."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_unrestricted_source_arn_service_wildcard(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_unrestricted_source_arn_service_wildcard,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is public because its policy allows public access."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
def test_topic_public_with_unrestricted_source_arn_multi_wildcard(self):
sns_client = mock.MagicMock
sns_client.audited_account = AWS_ACCOUNT_NUMBER
sns_client.topics = []
sns_client.topics.append(
Topic(
arn=topic_arn,
name=topic_name,
policy=test_policy_unrestricted_source_arn_multi_wildcard,
region=AWS_REGION_EU_WEST_1,
)
)
sns_client.provider = mock.MagicMock()
sns_client.provider.organizations_metadata = mock.MagicMock()
sns_client.provider.organizations_metadata.organization_id = org_id
with mock.patch(
"prowler.providers.aws.services.sns.sns_service.SNS",
sns_client,
):
from prowler.providers.aws.services.sns.sns_topics_not_publicly_accessible.sns_topics_not_publicly_accessible import (
sns_topics_not_publicly_accessible,
)
check = sns_topics_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is public because its policy allows public access."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_tags == []
@pytest.mark.parametrize(
"endpoint",
[
@@ -43,7 +43,7 @@ class Test_defender_domain_dkim_enabled:
== "DKIM is enabled for domain with ID domain1."
)
assert result[0].resource == defender_client.dkim_configurations[0].dict()
assert result[0].resource_name == "DKIM Configuration"
assert result[0].resource_name == "domain1"
assert result[0].resource_id == "domain1"
assert result[0].location == "global"
@@ -86,7 +86,7 @@ class Test_defender_domain_dkim_enabled:
== "DKIM is not enabled for domain with ID domain2."
)
assert result[0].resource == defender_client.dkim_configurations[0].dict()
assert result[0].resource_name == "DKIM Configuration"
assert result[0].resource_name == "domain2"
assert result[0].resource_id == "domain2"
assert result[0].location == "global"
+2 -2
View File
@@ -127,8 +127,8 @@ You operate in an agent loop, iterating through these steps:
- Fetches information related to:
- All findings data across providers. Supports filtering by severity, status, etc.
- Unique metadata values from findings
- Remediation for checks
- Check IDs supported by different provider types
- Available checks for a specific provider (aws, gcp, azure, kubernetes, etc)
- Details of a specific check including details about severity, risk, remediation, compliances that are associated with the check, etc
### roles_agent
+1 -1
View File
@@ -10,5 +10,5 @@ export const checkSchema = z.object({
});
export const checkDetailsSchema = z.object({
id: z.string(),
checkId: z.string(),
});