mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
feat(api): support timestamp precision in findings filters (#11754)
This commit is contained in:
@@ -4,6 +4,10 @@ All notable changes to the **Prowler API** are documented in this file.
|
||||
|
||||
## [1.33.0] (Prowler UNRELEASED)
|
||||
|
||||
### 🚀 Added
|
||||
|
||||
- Added timestamp precision support for `/api/v1/findings` `inserted_at` and `updated_at` filters [(#11754)](https://github.com/prowler-cloud/prowler/pull/11754)
|
||||
|
||||
### 🔄 Changed
|
||||
|
||||
- Attack Paths: AWS Neptune is now supported as a persistent sink database, selectable via `ATTACK_PATHS_SINK_DATABASE=neptune` (default `neo4j`), Cartography's (bumped to 0.138.1) per-scan ingest database stays on Neo4j [(#11524)](https://github.com/prowler-cloud/prowler/pull/11524)
|
||||
|
||||
+192
-45
@@ -67,6 +67,7 @@ from django_filters.rest_framework import (
|
||||
)
|
||||
from rest_framework_json_api.django_filters.backends import DjangoFilterBackend
|
||||
from rest_framework_json_api.serializers import ValidationError
|
||||
from uuid6 import UUID
|
||||
|
||||
|
||||
class CustomDjangoFilterBackend(DjangoFilterBackend):
|
||||
@@ -672,24 +673,7 @@ class LatestResourceFilter(ProviderRelationshipFilterSet):
|
||||
return queryset.filter(tags__text_search=value)
|
||||
|
||||
|
||||
class FindingFilter(CommonFindingFilters):
|
||||
scan = UUIDFilter(method="filter_scan_id")
|
||||
scan__in = UUIDInFilter(method="filter_scan_id_in")
|
||||
|
||||
inserted_at = DateFilter(method="filter_inserted_at", lookup_expr="date")
|
||||
inserted_at__date = DateFilter(method="filter_inserted_at", lookup_expr="date")
|
||||
inserted_at__gte = DateFilter(
|
||||
method="filter_inserted_at_gte",
|
||||
help_text=f"Maximum date range is {settings.FINDINGS_MAX_DAYS_IN_RANGE} days.",
|
||||
)
|
||||
inserted_at__lte = DateFilter(
|
||||
method="filter_inserted_at_lte",
|
||||
help_text=f"Maximum date range is {settings.FINDINGS_MAX_DAYS_IN_RANGE} days.",
|
||||
)
|
||||
|
||||
class Meta:
|
||||
model = Finding
|
||||
fields = {
|
||||
FINDING_BASE_FILTER_FIELDS = {
|
||||
"id": ["exact", "in"],
|
||||
"uid": ["exact", "in"],
|
||||
"scan": ["exact", "in"],
|
||||
@@ -698,9 +682,23 @@ class FindingFilter(CommonFindingFilters):
|
||||
"severity": ["exact", "in"],
|
||||
"impact": ["exact", "in"],
|
||||
"check_id": ["exact", "in", "icontains"],
|
||||
"inserted_at": ["date", "gte", "lte"],
|
||||
"updated_at": ["gte", "lte"],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
class BaseFindingFilter(CommonFindingFilters):
|
||||
DATE_FILTER_FIELDS = ()
|
||||
DATE_FILTER_NAMES = ()
|
||||
DATE_RANGE_HELP_TEXT = (
|
||||
f"Maximum date range is {settings.FINDINGS_MAX_DAYS_IN_RANGE} days."
|
||||
)
|
||||
DATE_FILTER_REQUIRED_DETAIL = "At least one date filter is required."
|
||||
|
||||
scan = UUIDFilter(method="filter_scan_id")
|
||||
scan__in = UUIDInFilter(method="filter_scan_id_in")
|
||||
|
||||
class Meta:
|
||||
model = Finding
|
||||
fields = FINDING_BASE_FILTER_FIELDS
|
||||
filter_overrides = {
|
||||
FindingDeltaEnumField: {
|
||||
"filter_class": CharFilter,
|
||||
@@ -723,17 +721,13 @@ class FindingFilter(CommonFindingFilters):
|
||||
return queryset.filter(resource_services__contains=[value])
|
||||
|
||||
def filter_queryset(self, queryset):
|
||||
if not (self.data.get("scan") or self.data.get("scan__in")) and not (
|
||||
self.data.get("inserted_at")
|
||||
or self.data.get("inserted_at__date")
|
||||
or self.data.get("inserted_at__gte")
|
||||
or self.data.get("inserted_at__lte")
|
||||
if not (self.data.get("scan") or self.data.get("scan__in")) and not any(
|
||||
self.data.get(filter_name) for filter_name in self.DATE_FILTER_NAMES
|
||||
):
|
||||
raise ValidationError(
|
||||
[
|
||||
{
|
||||
"detail": "At least one date filter is required: filter[inserted_at], filter[inserted_at.gte], "
|
||||
"or filter[inserted_at.lte].",
|
||||
"detail": self.DATE_FILTER_REQUIRED_DETAIL,
|
||||
"status": 400,
|
||||
"source": {"pointer": "/data/attributes/inserted_at"},
|
||||
"code": "required",
|
||||
@@ -742,31 +736,42 @@ class FindingFilter(CommonFindingFilters):
|
||||
)
|
||||
|
||||
cleaned = self.form.cleaned_data
|
||||
exact_date = cleaned.get("inserted_at") or cleaned.get("inserted_at__date")
|
||||
gte_date = cleaned.get("inserted_at__gte") or exact_date
|
||||
lte_date = cleaned.get("inserted_at__lte") or exact_date
|
||||
for field_name in self.DATE_FILTER_FIELDS:
|
||||
self.validate_datetime_filter_range(cleaned, field_name)
|
||||
|
||||
if gte_date is None:
|
||||
gte_date = datetime.now(UTC).date()
|
||||
if lte_date is None:
|
||||
lte_date = datetime.now(UTC).date()
|
||||
return super().filter_queryset(queryset)
|
||||
|
||||
if abs(lte_date - gte_date) > timedelta(
|
||||
def validate_datetime_filter_range(self, cleaned, field_name):
|
||||
exact_value = cleaned.get(field_name) or cleaned.get(f"{field_name}__date")
|
||||
gte_value = cleaned.get(f"{field_name}__gte") or exact_value
|
||||
lte_value = cleaned.get(f"{field_name}__lte") or exact_value
|
||||
|
||||
if not (exact_value or gte_value or lte_value):
|
||||
return
|
||||
|
||||
default_value = datetime.now(UTC).date()
|
||||
gte_value = gte_value or default_value
|
||||
lte_value = lte_value or default_value
|
||||
|
||||
gte_datetime = self.filter_value_to_datetime(gte_value, field_name)
|
||||
lte_datetime = self.filter_value_to_datetime(lte_value, field_name)
|
||||
|
||||
if abs(lte_datetime - gte_datetime) <= timedelta(
|
||||
days=settings.FINDINGS_MAX_DAYS_IN_RANGE
|
||||
):
|
||||
return
|
||||
|
||||
raise ValidationError(
|
||||
[
|
||||
{
|
||||
"detail": f"The date range cannot exceed {settings.FINDINGS_MAX_DAYS_IN_RANGE} days.",
|
||||
"status": 400,
|
||||
"source": {"pointer": "/data/attributes/inserted_at"},
|
||||
"source": {"pointer": f"/data/attributes/{field_name}"},
|
||||
"code": "invalid",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
return super().filter_queryset(queryset)
|
||||
|
||||
# Convert filter values to UUIDv7 values for use with partitioning
|
||||
def filter_scan_id(self, queryset, name, value):
|
||||
try:
|
||||
@@ -824,27 +829,169 @@ class FindingFilter(CommonFindingFilters):
|
||||
datetime_value = self.maybe_date_to_datetime(value)
|
||||
start = uuid7_start(datetime_to_uuid7(datetime_value))
|
||||
end = uuid7_start(datetime_to_uuid7(datetime_value + timedelta(days=1)))
|
||||
|
||||
return queryset.filter(id__gte=start, id__lt=end)
|
||||
|
||||
def filter_inserted_at_gte(self, queryset, name, value):
|
||||
datetime_value = self.maybe_date_to_datetime(value)
|
||||
start = uuid7_start(datetime_to_uuid7(datetime_value))
|
||||
|
||||
return queryset.filter(id__gte=start)
|
||||
|
||||
def filter_inserted_at_lte(self, queryset, name, value):
|
||||
datetime_value = self.maybe_date_to_datetime(value)
|
||||
end = uuid7_start(datetime_to_uuid7(datetime_value + timedelta(days=1)))
|
||||
|
||||
return queryset.filter(id__lt=end)
|
||||
|
||||
@staticmethod
|
||||
def maybe_date_to_datetime(value):
|
||||
dt = value
|
||||
if isinstance(value, datetime):
|
||||
return value
|
||||
if isinstance(value, date):
|
||||
dt = datetime.combine(value, datetime.min.time(), tzinfo=UTC)
|
||||
return dt
|
||||
return datetime.combine(value, datetime.min.time(), tzinfo=UTC)
|
||||
if isinstance(value, str):
|
||||
return parse(value)
|
||||
return value
|
||||
|
||||
@classmethod
|
||||
def filter_value_to_datetime(cls, value, field_name):
|
||||
try:
|
||||
datetime_value = cls.maybe_date_to_datetime(value)
|
||||
except (TypeError, ValueError, OverflowError):
|
||||
raise ValidationError(
|
||||
[
|
||||
{
|
||||
"detail": "Enter a valid date or datetime.",
|
||||
"status": 400,
|
||||
"source": {"pointer": f"/data/attributes/{field_name}"},
|
||||
"code": "invalid",
|
||||
}
|
||||
]
|
||||
)
|
||||
|
||||
if datetime_value.tzinfo is None:
|
||||
return datetime_value.replace(tzinfo=UTC)
|
||||
return datetime_value.astimezone(UTC)
|
||||
|
||||
|
||||
class FindingFilter(BaseFindingFilter):
|
||||
DATE_FILTER_FIELDS = ("inserted_at", "updated_at")
|
||||
DATE_FILTER_NAMES = (
|
||||
"inserted_at",
|
||||
"inserted_at__date",
|
||||
"inserted_at__gte",
|
||||
"inserted_at__lte",
|
||||
"updated_at",
|
||||
"updated_at__date",
|
||||
"updated_at__gte",
|
||||
"updated_at__lte",
|
||||
)
|
||||
DATE_FILTER_REQUIRED_DETAIL = (
|
||||
"At least one date filter is required: filter[inserted_at], filter[updated_at], "
|
||||
"filter[inserted_at.gte], filter[updated_at.gte], filter[inserted_at.lte], "
|
||||
"or filter[updated_at.lte]."
|
||||
)
|
||||
|
||||
inserted_at = CharFilter(method="filter_inserted_at")
|
||||
inserted_at__date = DateFilter(method="filter_inserted_at", lookup_expr="date")
|
||||
inserted_at__gte = CharFilter(
|
||||
method="filter_inserted_at",
|
||||
help_text=BaseFindingFilter.DATE_RANGE_HELP_TEXT,
|
||||
)
|
||||
inserted_at__lte = CharFilter(
|
||||
method="filter_inserted_at",
|
||||
help_text=BaseFindingFilter.DATE_RANGE_HELP_TEXT,
|
||||
)
|
||||
updated_at = CharFilter(method="filter_updated_at")
|
||||
updated_at__date = DateFilter(method="filter_updated_at", lookup_expr="date")
|
||||
updated_at__gte = CharFilter(
|
||||
method="filter_updated_at",
|
||||
help_text=BaseFindingFilter.DATE_RANGE_HELP_TEXT,
|
||||
)
|
||||
updated_at__lte = CharFilter(
|
||||
method="filter_updated_at",
|
||||
help_text=BaseFindingFilter.DATE_RANGE_HELP_TEXT,
|
||||
)
|
||||
|
||||
class Meta(BaseFindingFilter.Meta):
|
||||
fields = FINDING_BASE_FILTER_FIELDS | {
|
||||
"inserted_at": ["date", "gte", "lte"],
|
||||
"updated_at": ["date", "gte", "lte"],
|
||||
}
|
||||
|
||||
def filter_inserted_at(self, queryset, name, value):
|
||||
start, end = self.filter_value_to_datetime_bounds(value, "inserted_at")
|
||||
|
||||
if name.endswith("__gte"):
|
||||
return queryset.filter(id__gte=self.datetime_to_uuid7_boundary(start))
|
||||
if name.endswith("__lte"):
|
||||
return queryset.filter(id__lt=self.datetime_to_uuid7_boundary(end))
|
||||
|
||||
return queryset.filter(
|
||||
id__gte=self.datetime_to_uuid7_boundary(start),
|
||||
id__lt=self.datetime_to_uuid7_boundary(end),
|
||||
)
|
||||
|
||||
def filter_updated_at(self, queryset, name, value):
|
||||
start, end = self.filter_value_to_datetime_bounds(value, "updated_at")
|
||||
|
||||
if name.endswith("__gte"):
|
||||
return queryset.filter(updated_at__gte=start)
|
||||
if name.endswith("__lte"):
|
||||
return queryset.filter(updated_at__lt=end)
|
||||
|
||||
return queryset.filter(updated_at__gte=start, updated_at__lt=end)
|
||||
|
||||
@classmethod
|
||||
def filter_value_to_datetime_bounds(cls, value, field_name):
|
||||
start = cls.filter_value_to_datetime(value, field_name)
|
||||
if cls.is_date_filter_value(value):
|
||||
return start, start + timedelta(days=1)
|
||||
return start, start + timedelta(milliseconds=1)
|
||||
|
||||
@staticmethod
|
||||
def datetime_to_uuid7_boundary(datetime_value):
|
||||
timestamp_ms = int(datetime_value.timestamp() * 1000) & 0xFFFFFFFFFFFF
|
||||
uuid_int = timestamp_ms << 80
|
||||
uuid_int |= 0x7 << 76
|
||||
uuid_int |= 0x2 << 62
|
||||
return UUID(int=uuid_int)
|
||||
|
||||
@staticmethod
|
||||
def is_date_filter_value(value):
|
||||
if isinstance(value, datetime):
|
||||
return False
|
||||
if isinstance(value, date):
|
||||
return True
|
||||
return isinstance(value, str) and len(value.strip()) == 10
|
||||
|
||||
|
||||
class FindingMetadataFilter(BaseFindingFilter):
|
||||
DATE_FILTER_FIELDS = ("inserted_at",)
|
||||
DATE_FILTER_NAMES = (
|
||||
"inserted_at",
|
||||
"inserted_at__date",
|
||||
"inserted_at__gte",
|
||||
"inserted_at__lte",
|
||||
)
|
||||
DATE_FILTER_REQUIRED_DETAIL = (
|
||||
"At least one date filter is required: filter[inserted_at], filter[inserted_at.gte], "
|
||||
"or filter[inserted_at.lte]."
|
||||
)
|
||||
|
||||
inserted_at = DateFilter(method="filter_inserted_at", lookup_expr="date")
|
||||
inserted_at__date = DateFilter(method="filter_inserted_at", lookup_expr="date")
|
||||
inserted_at__gte = DateFilter(
|
||||
method="filter_inserted_at_gte",
|
||||
help_text=BaseFindingFilter.DATE_RANGE_HELP_TEXT,
|
||||
)
|
||||
inserted_at__lte = DateFilter(
|
||||
method="filter_inserted_at_lte",
|
||||
help_text=BaseFindingFilter.DATE_RANGE_HELP_TEXT,
|
||||
)
|
||||
|
||||
class Meta(BaseFindingFilter.Meta):
|
||||
fields = FINDING_BASE_FILTER_FIELDS | {
|
||||
"inserted_at": ["date", "gte", "lte"],
|
||||
}
|
||||
|
||||
|
||||
class LatestFindingFilter(CommonFindingFilters):
|
||||
|
||||
+1593
-305
File diff suppressed because it is too large
Load Diff
@@ -57,6 +57,7 @@ from api.models import (
|
||||
UserRoleRelationship,
|
||||
)
|
||||
from api.rls import Tenant
|
||||
from api.uuid_utils import datetime_to_uuid7
|
||||
from api.v1.serializers import TokenSerializer
|
||||
from api.v1.views import ComplianceOverviewViewSet, TenantFinishACSView
|
||||
from botocore.exceptions import ClientError, NoCredentialsError
|
||||
@@ -7218,6 +7219,26 @@ class TestFindingViewSet:
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert response.json()["errors"][0]["code"] == "invalid"
|
||||
|
||||
def test_findings_updated_at_range_too_large_with_inserted_at_filter(
|
||||
self, authenticated_client
|
||||
):
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"),
|
||||
{
|
||||
"filter[inserted_at]": TODAY,
|
||||
"filter[updated_at.gte]": today_after_n_days(
|
||||
-(settings.FINDINGS_MAX_DAYS_IN_RANGE + 1)
|
||||
),
|
||||
"filter[updated_at.lte]": TODAY,
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
assert response.json()["errors"][0]["code"] == "invalid"
|
||||
assert response.json()["errors"][0]["source"]["pointer"] == (
|
||||
"/data/attributes/updated_at"
|
||||
)
|
||||
|
||||
def test_findings_list(self, authenticated_client, findings_fixture):
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"), {"filter[inserted_at]": TODAY}
|
||||
@@ -7229,6 +7250,170 @@ class TestFindingViewSet:
|
||||
== findings_fixture[0].status
|
||||
)
|
||||
|
||||
def test_findings_list_inserted_at_accepts_timestamp_precision_filters(
|
||||
self, authenticated_client, scans_fixture
|
||||
):
|
||||
scan, *_ = scans_fixture
|
||||
|
||||
def create_finding(uid, inserted_at):
|
||||
finding = Finding.objects.create(
|
||||
id=datetime_to_uuid7(inserted_at),
|
||||
tenant_id=scan.tenant_id,
|
||||
uid=uid,
|
||||
scan=scan,
|
||||
status=Status.FAIL,
|
||||
status_extended="timestamp precision status",
|
||||
impact=Severity.medium,
|
||||
severity=Severity.medium,
|
||||
check_id="timestamp_precision_check",
|
||||
check_metadata={
|
||||
"CheckId": "timestamp_precision_check",
|
||||
"Description": "timestamp precision check",
|
||||
"servicename": "ec2",
|
||||
},
|
||||
first_seen_at=inserted_at,
|
||||
)
|
||||
Finding.all_objects.filter(pk=finding.pk).update(
|
||||
inserted_at=inserted_at,
|
||||
updated_at=inserted_at,
|
||||
)
|
||||
finding.refresh_from_db()
|
||||
return finding
|
||||
|
||||
create_finding(
|
||||
"timestamp_precision_early",
|
||||
datetime(2026, 1, 15, 10, 30, 0, 100000, tzinfo=UTC),
|
||||
)
|
||||
late_finding = create_finding(
|
||||
"timestamp_precision_late",
|
||||
datetime(2026, 1, 15, 10, 30, 0, 200000, tzinfo=UTC),
|
||||
)
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"),
|
||||
{
|
||||
"filter[inserted_at.gte]": "2026-01-15T10:30:00.150Z",
|
||||
"filter[inserted_at.lte]": "2026-01-15T10:30:00.250Z",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
returned_uids = {
|
||||
finding["attributes"]["uid"] for finding in response.json()["data"]
|
||||
}
|
||||
assert returned_uids == {late_finding.uid}
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"),
|
||||
{"filter[inserted_at]": "2026-01-15T10:30:00.200Z"},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
returned_uids = {
|
||||
finding["attributes"]["uid"] for finding in response.json()["data"]
|
||||
}
|
||||
assert returned_uids == {late_finding.uid}
|
||||
|
||||
def test_findings_list_updated_at_accepts_timestamp_precision_filters(
|
||||
self, authenticated_client, findings_fixture
|
||||
):
|
||||
early_finding, late_finding, *_ = findings_fixture
|
||||
early_updated_at = datetime(2026, 1, 15, 10, 30, 0, 100000, tzinfo=UTC)
|
||||
late_updated_at = datetime(2026, 1, 15, 10, 30, 0, 200000, tzinfo=UTC)
|
||||
Finding.all_objects.filter(pk=early_finding.pk).update(
|
||||
updated_at=early_updated_at
|
||||
)
|
||||
Finding.all_objects.filter(pk=late_finding.pk).update(
|
||||
updated_at=late_updated_at
|
||||
)
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"),
|
||||
{
|
||||
"filter[updated_at.gte]": "2026-01-15T10:30:00.150Z",
|
||||
"filter[updated_at.lte]": "2026-01-15T10:30:00.250Z",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
returned_uids = {
|
||||
finding["attributes"]["uid"] for finding in response.json()["data"]
|
||||
}
|
||||
assert returned_uids == {late_finding.uid}
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"),
|
||||
{"filter[updated_at]": "2026-01-15T10:30:00.200Z"},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
returned_uids = {
|
||||
finding["attributes"]["uid"] for finding in response.json()["data"]
|
||||
}
|
||||
assert returned_uids == {late_finding.uid}
|
||||
|
||||
def test_findings_list_inserted_at_and_updated_at_filters_are_combined(
|
||||
self, authenticated_client, scans_fixture
|
||||
):
|
||||
scan, *_ = scans_fixture
|
||||
|
||||
def create_finding(uid, inserted_at, updated_at):
|
||||
finding = Finding.objects.create(
|
||||
id=datetime_to_uuid7(inserted_at),
|
||||
tenant_id=scan.tenant_id,
|
||||
uid=uid,
|
||||
scan=scan,
|
||||
status=Status.FAIL,
|
||||
status_extended="timestamp precision status",
|
||||
impact=Severity.medium,
|
||||
severity=Severity.medium,
|
||||
check_id="timestamp_precision_check",
|
||||
check_metadata={
|
||||
"CheckId": "timestamp_precision_check",
|
||||
"Description": "timestamp precision check",
|
||||
"servicename": "ec2",
|
||||
},
|
||||
first_seen_at=inserted_at,
|
||||
)
|
||||
Finding.all_objects.filter(pk=finding.pk).update(
|
||||
inserted_at=inserted_at,
|
||||
updated_at=updated_at,
|
||||
)
|
||||
finding.refresh_from_db()
|
||||
return finding
|
||||
|
||||
matching_finding = create_finding(
|
||||
"timestamp_precision_combined_match",
|
||||
datetime(2026, 1, 15, 10, 30, 0, 200000, tzinfo=UTC),
|
||||
datetime(2026, 1, 15, 11, 30, 0, 200000, tzinfo=UTC),
|
||||
)
|
||||
create_finding(
|
||||
"timestamp_precision_combined_inserted_only",
|
||||
datetime(2026, 1, 15, 10, 30, 0, 200000, tzinfo=UTC),
|
||||
datetime(2026, 1, 15, 12, 30, 0, 200000, tzinfo=UTC),
|
||||
)
|
||||
create_finding(
|
||||
"timestamp_precision_combined_updated_only",
|
||||
datetime(2026, 1, 15, 9, 30, 0, 200000, tzinfo=UTC),
|
||||
datetime(2026, 1, 15, 11, 30, 0, 200000, tzinfo=UTC),
|
||||
)
|
||||
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-list"),
|
||||
{
|
||||
"filter[inserted_at.gte]": "2026-01-15T10:30:00.150Z",
|
||||
"filter[inserted_at.lte]": "2026-01-15T10:30:00.250Z",
|
||||
"filter[updated_at.gte]": "2026-01-15T11:30:00.150Z",
|
||||
"filter[updated_at.lte]": "2026-01-15T11:30:00.250Z",
|
||||
},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_200_OK
|
||||
returned_uids = {
|
||||
finding["attributes"]["uid"] for finding in response.json()["data"]
|
||||
}
|
||||
assert returned_uids == {matching_finding.uid}
|
||||
|
||||
def test_findings_list_resource_tags_no_n_plus_one(
|
||||
self, authenticated_client, findings_fixture
|
||||
):
|
||||
@@ -7694,6 +7879,23 @@ class TestFindingViewSet:
|
||||
]
|
||||
}
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"filter_name",
|
||||
["inserted_at", "inserted_at.gte", "inserted_at.lte"],
|
||||
)
|
||||
def test_findings_metadata_rejects_timestamp_precision_filters(
|
||||
self, authenticated_client, filter_name
|
||||
):
|
||||
response = authenticated_client.get(
|
||||
reverse("finding-metadata"),
|
||||
{f"filter[{filter_name}]": "2048-01-01T10:30:00Z"},
|
||||
)
|
||||
|
||||
assert response.status_code == status.HTTP_400_BAD_REQUEST
|
||||
error = response.json()["errors"][0]
|
||||
assert error["detail"] == "Enter a valid date."
|
||||
assert error["code"] == "invalid"
|
||||
|
||||
def test_findings_metadata_backfill(
|
||||
self, authenticated_client, scans_fixture, findings_fixture
|
||||
):
|
||||
|
||||
@@ -50,6 +50,7 @@ from api.filters import (
|
||||
FindingGroupAggregatedComputedFilter,
|
||||
FindingGroupFilter,
|
||||
FindingGroupSummaryFilter,
|
||||
FindingMetadataFilter,
|
||||
IntegrationFilter,
|
||||
IntegrationJiraFindingsFilter,
|
||||
InvitationFilter,
|
||||
@@ -3833,6 +3834,8 @@ class FindingViewSet(PaginateByPkMixin, BaseRLSViewSet):
|
||||
def get_filterset_class(self):
|
||||
if self.action in ["latest", "metadata_latest"]:
|
||||
return LatestFindingFilter
|
||||
if self.action == "metadata":
|
||||
return FindingMetadataFilter
|
||||
return FindingFilter
|
||||
|
||||
def get_queryset(self):
|
||||
|
||||
Reference in New Issue
Block a user