Compare commits

...

2 Commits

Author SHA1 Message Date
Pepe Fagoaga 36fa5c0a53 chore: merge 2026-05-07 14:49:49 +02:00
Pepe Fagoaga 31ae587745 feat(events): return text/plain for LLMs 2026-05-07 10:56:59 +02:00
10 changed files with 776 additions and 115 deletions
+1
View File
@@ -9,6 +9,7 @@ All notable changes to the **Prowler API** are documented in this file.
- New `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
- ASD Essential Eight (AWS) compliance framework support [(#10982)](https://github.com/prowler-cloud/prowler/pull/10982)
- `scan-reset-ephemeral-resources` post-scan task zeroes `failed_findings_count` for resources missing from the latest full-scope scan, keeping ephemeral resources from polluting the Resources page sort [(#10929)](https://github.com/prowler-cloud/prowler/pull/10929)
- `GET /resources/{id}/events` now supports `Accept: text/plain` for LLM consumption [(#XXXXX)](https://github.com/prowler-cloud/prowler/pull/XXXXX)
### 🔐 Security
+141
View File
@@ -0,0 +1,141 @@
"""Helpers for serializing resource timeline events into LLM-friendly formats.
The text renderer is a 1:1 markdown projection of what the JSON endpoint
returns: same events, same order, same fields. We do not infer sessions or
relationships between events — grouping is left to the consumer.
"""
from datetime import datetime, timezone
from typing import Any, Iterable
# Truncation thresholds for payload values. Strings longer than this are
# clipped with an ellipsis; lists/dicts larger than this collapse to a count
# placeholder. The goal is to bound a single event's token cost without
# losing the API call's identity.
MAX_STRING_LEN = 200
MAX_LIST_INLINE = 5
MAX_DICT_INLINE = 8
def serialize_events_as_text(
events: Iterable[dict[str, Any]],
resource: Any,
lookback_days: int,
write_events_only: bool,
) -> str:
"""Render resource events as a flat markdown list of what the API returns."""
events = list(events)
lines: list[str] = []
lines.append("# Resource Events")
lines.append(f"- Resource: {getattr(resource, 'uid', '')}")
lines.append(f"- Region: {getattr(resource, 'region', '') or 'global'}")
lines.append(f"- Lookback: {lookback_days} days")
lines.append(f"- Write events only: {str(write_events_only).lower()}")
lines.append(f"- Events: {len(events)}")
lines.append("")
if not events:
lines.append("No events recorded in the lookback window.")
return "\n".join(lines) + "\n"
lines.append("## Events")
lines.append("")
for index, event in enumerate(events, 1):
lines.extend(_format_event(index, event))
lines.append("")
return "\n".join(lines).rstrip() + "\n"
def _format_event(index: int, event: dict[str, Any]) -> list[str]:
when = _format_time(_event_time(event))
name = event.get("event_name") or "Unknown"
source = event.get("event_source") or "unknown"
error_code = event.get("error_code")
status = f"ERROR({error_code})" if error_code else "ok"
lines = [f"### {index}. {name} at {when}"]
lines.append(f"- Source: {source}")
lines.append(f"- Status: {status}")
if event.get("actor"):
lines.append(f"- Actor: {event['actor']}")
if event.get("actor_type"):
lines.append(f"- Actor type: {event['actor_type']}")
if event.get("actor_uid"):
lines.append(f"- Actor ARN: {event['actor_uid']}")
if event.get("source_ip_address"):
lines.append(f"- Source IP: {event['source_ip_address']}")
if event.get("user_agent"):
lines.append(f"- User agent: {event['user_agent']}")
request = _format_payload(event.get("request_data"))
if request:
lines.append(f"- Request: {request}")
response = _format_payload(event.get("response_data"))
if response:
lines.append(f"- Response: {response}")
if error_code and event.get("error_message"):
lines.append(f"- Error: {event['error_message']}")
if event.get("event_id"):
lines.append(f"- Event ID: {event['event_id']}")
return lines
def _format_payload(payload: Any) -> str:
if not isinstance(payload, dict) or not payload:
return ""
return (
"{" + ", ".join(f"{k}: {_summarize_value(v)}" for k, v in payload.items()) + "}"
)
def _summarize_value(value: Any) -> str:
if isinstance(value, str):
if len(value) <= MAX_STRING_LEN:
return f'"{value}"'
return f'"{value[: MAX_STRING_LEN - 3]}..."'
if isinstance(value, bool):
return "true" if value else "false"
if value is None:
return "null"
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, (list, tuple)):
if len(value) > MAX_LIST_INLINE:
return f"[{len(value)} items]"
return "[" + ", ".join(_summarize_value(v) for v in value) + "]"
if isinstance(value, dict):
if len(value) > MAX_DICT_INLINE:
return f"{{{len(value)} keys}}"
return (
"{"
+ ", ".join(f"{k}: {_summarize_value(v)}" for k, v in value.items())
+ "}"
)
return str(value)
def _event_time(event: dict[str, Any]) -> datetime:
value = event.get("event_time")
if isinstance(value, datetime):
return value if value.tzinfo else value.replace(tzinfo=timezone.utc)
if isinstance(value, str):
try:
parsed = datetime.fromisoformat(value.replace("Z", "+00:00"))
return parsed if parsed.tzinfo else parsed.replace(tzinfo=timezone.utc)
except ValueError:
return datetime.min.replace(tzinfo=timezone.utc)
return datetime.min.replace(tzinfo=timezone.utc)
def _format_time(value: datetime) -> str:
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
return value.strftime("%Y-%m-%dT%H:%M:%SZ")
@@ -0,0 +1,366 @@
"""Unit tests for api.events.views_helpers.
These tests exercise the text-renderer in isolation: no Django, no DRF, no DB.
The behavior under test is the markdown shape, payload sanitization, and
truncation rules.
"""
from datetime import datetime, timezone
from types import SimpleNamespace
import pytest
from api.events import views_helpers
@pytest.fixture
def resource():
return SimpleNamespace(
uid="arn:aws:s3:::acme-prod-data",
region="us-east-1",
)
def _event(**overrides):
base = {
"event_id": "evt-1",
"event_time": datetime(2026, 5, 4, 16, 55, 1, tzinfo=timezone.utc),
"event_name": "PutBucketPolicy",
"event_source": "s3.amazonaws.com",
"actor": "assumed-role/AdminRole/alice",
"actor_uid": "arn:aws:sts::123:assumed-role/AdminRole/alice",
"actor_type": "AssumedRole",
"source_ip_address": "1.2.3.4",
"user_agent": "aws-cli/2.15.30",
"request_data": None,
"response_data": None,
"error_code": None,
"error_message": None,
}
base.update(overrides)
return base
class TestSerializeEventsAsTextHeader:
def test_empty_events_renders_header_and_no_events_marker(self, resource):
text = views_helpers.serialize_events_as_text(
events=[],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert text.startswith("# Resource Events\n")
assert "- Resource: arn:aws:s3:::acme-prod-data" in text
assert "- Region: us-east-1" in text
assert "- Lookback: 90 days" in text
assert "- Write events only: true" in text
assert "- Events: 0" in text
assert "No events recorded in the lookback window." in text
# No "## Events" section when empty
assert "## Events" not in text
def test_missing_region_renders_global(self, resource):
resource.region = ""
text = views_helpers.serialize_events_as_text(
events=[], resource=resource, lookback_days=7, write_events_only=False
)
assert "- Region: global" in text
assert "- Write events only: false" in text
assert "- Lookback: 7 days" in text
def test_resource_without_uid_attribute_renders_blank(self):
text = views_helpers.serialize_events_as_text(
events=[],
resource=SimpleNamespace(),
lookback_days=1,
write_events_only=True,
)
# getattr defaults to "" for both fields, no crash.
assert "- Resource: \n" in text
assert "- Region: global" in text
class TestSerializeEventsAsTextBody:
def test_single_event_renders_all_present_fields(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event()],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "## Events" in text
assert "### 1. PutBucketPolicy at 2026-05-04T16:55:01Z" in text
assert "- Source: s3.amazonaws.com" in text
assert "- Status: ok" in text
assert "- Actor: assumed-role/AdminRole/alice" in text
assert "- Actor type: AssumedRole" in text
assert "- Actor ARN: arn:aws:sts::123:assumed-role/AdminRole/alice" in text
assert "- Source IP: 1.2.3.4" in text
assert "- User agent: aws-cli/2.15.30" in text
assert "- Event ID: evt-1" in text
def test_optional_fields_are_omitted_when_absent(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
actor_type=None,
actor_uid=None,
source_ip_address=None,
user_agent=None,
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "- Actor type:" not in text
assert "- Actor ARN:" not in text
assert "- Source IP:" not in text
assert "- User agent:" not in text
# Required field still rendered
assert "- Actor: assumed-role/AdminRole/alice" in text
def test_error_event_renders_error_code_and_message(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
error_code="AccessDenied",
error_message=(
"User: arn:aws:sts::123:assumed-role/AdminRole/alice "
"is not authorized to perform: s3:PutBucketAcl"
),
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "- Status: ERROR(AccessDenied)" in text
assert "- Error: User: arn:aws:sts::123:assumed-role/AdminRole/alice" in text
def test_error_message_omitted_when_no_error_code(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
error_code=None,
error_message="orphaned message that should be ignored",
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "- Status: ok" in text
assert "orphaned message" not in text
def test_event_order_is_preserved_no_sorting(self, resource):
# API returns CloudTrail events in its native order; the renderer
# must NOT re-sort them.
first = _event(
event_id="newest",
event_name="GetBucketPolicy",
event_time=datetime(2026, 5, 4, 17, 2, 11, tzinfo=timezone.utc),
)
second = _event(
event_id="middle",
event_name="PutBucketAcl",
event_time=datetime(2026, 5, 4, 16, 58, 33, tzinfo=timezone.utc),
)
third = _event(
event_id="oldest",
event_name="PutBucketPolicy",
event_time=datetime(2026, 5, 4, 16, 55, 1, tzinfo=timezone.utc),
)
text = views_helpers.serialize_events_as_text(
events=[first, second, third],
resource=resource,
lookback_days=90,
write_events_only=True,
)
idx_first = text.index("### 1. GetBucketPolicy")
idx_second = text.index("### 2. PutBucketAcl")
idx_third = text.index("### 3. PutBucketPolicy")
assert idx_first < idx_second < idx_third
def test_event_count_in_header_matches_body(self, resource):
events = [_event(event_id=f"e{i}") for i in range(3)]
text = views_helpers.serialize_events_as_text(
events=events,
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "- Events: 3" in text
assert text.count("### ") == 3
class TestPayloadFormatting:
def test_request_data_renders_inline(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
request_data={
"bucketName": "acme-prod-data",
"encrypted": True,
}
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert '- Request: {bucketName: "acme-prod-data", encrypted: true}' in text
def test_request_data_empty_dict_omits_line(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event(request_data={})],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "- Request:" not in text
def test_response_data_renders_when_present(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
response_data={"versionId": "abc123"},
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert '- Response: {versionId: "abc123"}' in text
def test_long_strings_are_truncated(self, resource):
long_policy = "x" * 500
text = views_helpers.serialize_events_as_text(
events=[_event(request_data={"policy": long_policy})],
resource=resource,
lookback_days=90,
write_events_only=True,
)
# 200-char threshold, with "..." marker on truncation
assert "..." in text
# The full 500-char value must NOT be present
assert long_policy not in text
def test_large_list_summarized_as_count(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event(request_data={"tags": list(range(20))})],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "tags: [20 items]" in text
def test_small_list_renders_inline(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event(request_data={"ports": [80, 443]})],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "ports: [80, 443]" in text
def test_large_dict_summarized_as_count(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
request_data={
"config": {f"key{i}": i for i in range(15)},
}
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "config: {15 keys}" in text
def test_bool_and_none_values_lowercased(self, resource):
text = views_helpers.serialize_events_as_text(
events=[
_event(
request_data={
"publicAccess": True,
"encryption": False,
"kmsKey": None,
}
)
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "publicAccess: true" in text
assert "encryption: false" in text
assert "kmsKey: null" in text
def test_request_data_non_dict_is_ignored(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event(request_data="not a dict")],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "- Request:" not in text
class TestTimeFormatting:
def test_event_time_as_naive_datetime_is_treated_as_utc(self, resource):
# Defensive: providers occasionally hand back naive datetimes;
# they must be normalized rather than crashing the renderer.
text = views_helpers.serialize_events_as_text(
events=[
_event(event_time=datetime(2026, 5, 4, 16, 55, 1)),
],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "### 1. PutBucketPolicy at 2026-05-04T16:55:01Z" in text
def test_event_time_as_iso_string_is_parsed(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event(event_time="2026-05-04T16:55:01Z")],
resource=resource,
lookback_days=90,
write_events_only=True,
)
assert "### 1. PutBucketPolicy at 2026-05-04T16:55:01Z" in text
def test_unparseable_event_time_does_not_crash(self, resource):
text = views_helpers.serialize_events_as_text(
events=[_event(event_time="garbage")],
resource=resource,
lookback_days=90,
write_events_only=True,
)
# Falls back to datetime.min — exact value isn't important, but
# the renderer must not raise.
assert "### 1. PutBucketPolicy at " in text
+231 -60
View File
@@ -1469,9 +1469,9 @@ class TestProviderViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
def test_providers_retrieve(self, authenticated_client, providers_fixture):
provider1, *_ = providers_fixture
@@ -5468,13 +5468,13 @@ class TestAttackPathsScanViewSet:
content_type=API_JSON_CONTENT_TYPE,
)
if i < 10:
assert (
response.status_code == status.HTTP_200_OK
), f"Request {i + 1} should succeed with 200 OK, got {response.status_code}"
assert response.status_code == status.HTTP_200_OK, (
f"Request {i + 1} should succeed with 200 OK, got {response.status_code}"
)
else:
assert (
response.status_code == status.HTTP_429_TOO_MANY_REQUESTS
), f"Request {i + 1} should be throttled"
assert response.status_code == status.HTTP_429_TOO_MANY_REQUESTS, (
f"Request {i + 1} should be throttled"
)
# -- Timeout simulation -------------------------------------------------------
@@ -5677,9 +5677,9 @@ class TestResourceViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
@pytest.mark.parametrize(
"filter_name, filter_value, expected_count",
@@ -6228,9 +6228,9 @@ class TestResourceViewSet:
(e for e in errors if e["source"]["parameter"] == expected_invalid_param),
None,
)
assert (
error is not None
), f"Expected error for parameter '{expected_invalid_param}'"
assert error is not None, (
f"Expected error for parameter '{expected_invalid_param}'"
)
assert error["code"] == "invalid"
assert error["status"] == "400" # Must be string per JSON:API spec
assert expected_invalid_param in error["detail"]
@@ -6762,16 +6762,187 @@ class TestResourceViewSet:
# Test with completely malformed token
client.credentials(HTTP_AUTHORIZATION="Bearer not.a.valid.jwt.token")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
assert (
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"Expected 401 for malformed token but got {response.status_code}"
assert response.status_code == status.HTTP_401_UNAUTHORIZED, (
f"Expected 401 for malformed token but got {response.status_code}"
)
# Test with empty bearer token
client.credentials(HTTP_AUTHORIZATION="Bearer ")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
assert (
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"Expected 401 for empty bearer token but got {response.status_code}"
assert response.status_code == status.HTTP_401_UNAUTHORIZED, (
f"Expected 401 for empty bearer token but got {response.status_code}"
)
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_text_plain_renders_markdown(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""`Accept: text/plain` returns a markdown report instead of JSON:API."""
from api.models import Resource
aws_provider = providers_fixture[0]
resource = Resource.objects.create(
uid="arn:aws:s3:::acme-prod-data",
name="acme-prod-data",
type="bucket",
region="us-east-1",
service="s3",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
mock_timeline_instance = Mock()
mock_timeline_instance.get_resource_timeline.return_value = [
{
"event_id": "evt-1",
"event_time": "2026-05-04T16:55:01Z",
"event_name": "PutBucketPolicy",
"event_source": "s3.amazonaws.com",
"actor": "assumed-role/AdminRole/alice",
"actor_uid": "arn:aws:sts::123:assumed-role/AdminRole/alice",
"actor_type": "AssumedRole",
"source_ip_address": "1.2.3.4",
"user_agent": "aws-cli/2.15.30",
"request_data": {"bucketName": "acme-prod-data"},
},
{
"event_id": "evt-2",
"event_time": "2026-05-04T16:58:33Z",
"event_name": "PutBucketAcl",
"event_source": "s3.amazonaws.com",
"actor": "assumed-role/AdminRole/alice",
"error_code": "AccessDenied",
"error_message": "User not authorized",
},
]
mock_cloudtrail_timeline.return_value = mock_timeline_instance
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
HTTP_ACCEPT="text/plain",
)
assert response.status_code == status.HTTP_200_OK
assert response["Content-Type"].startswith("text/plain")
body = response.content.decode("utf-8")
# Header
assert body.startswith("# Resource Events\n")
assert "- Resource: arn:aws:s3:::acme-prod-data" in body
assert "- Region: us-east-1" in body
assert "- Events: 2" in body
# Body
assert "### 1. PutBucketPolicy at 2026-05-04T16:55:01Z" in body
assert "- Status: ok" in body
assert "- Status: ERROR(AccessDenied)" in body
assert "- Error: User not authorized" in body
assert '- Request: {bucketName: "acme-prod-data"}' in body
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_default_accept_still_returns_json(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Adding text/plain renderer must not regress the default JSON:API path."""
from api.models import Resource
aws_provider = providers_fixture[0]
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-default-accept",
name="Default Accept Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
mock_timeline_instance = Mock()
mock_timeline_instance.get_resource_timeline.return_value = [
{
"event_id": "evt-json",
"event_time": "2026-05-04T16:55:01Z",
"event_name": "RunInstances",
"event_source": "ec2.amazonaws.com",
"actor": "user/alice",
}
]
mock_cloudtrail_timeline.return_value = mock_timeline_instance
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
assert response.status_code == status.HTTP_200_OK
assert "json" in response["Content-Type"]
payload = response.json()
assert payload["data"][0]["type"] == "resource-events"
assert payload["data"][0]["id"] == "evt-json"
assert payload["data"][0]["attributes"]["event_name"] == "RunInstances"
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_text_plain_no_events_renders_empty_marker(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Empty timeline still produces a valid text response, not a 500."""
from api.models import Resource
aws_provider = providers_fixture[0]
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-empty",
name="Empty Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
mock_timeline_instance = Mock()
mock_timeline_instance.get_resource_timeline.return_value = []
mock_cloudtrail_timeline.return_value = mock_timeline_instance
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
HTTP_ACCEPT="text/plain",
)
assert response.status_code == status.HTTP_200_OK
assert response["Content-Type"].startswith("text/plain")
body = response.content.decode("utf-8")
assert "- Events: 0" in body
assert "No events recorded in the lookback window." in body
@pytest.mark.django_db
@@ -6832,9 +7003,9 @@ class TestFindingViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
@pytest.mark.parametrize(
"filter_name, filter_value, expected_count",
@@ -7373,9 +7544,9 @@ class TestJWTFields:
reverse("token-obtain"), data, format="json"
)
assert (
response.status_code == status.HTTP_200_OK
), f"Unexpected status code: {response.status_code}"
assert response.status_code == status.HTTP_200_OK, (
f"Unexpected status code: {response.status_code}"
)
access_token = response.data["attributes"]["access"]
payload = jwt.decode(access_token, options={"verify_signature": False})
@@ -7389,23 +7560,23 @@ class TestJWTFields:
# Verify expected fields
for field in expected_fields:
assert field in payload, f"The field '{field}' is not in the JWT"
assert (
payload[field] == expected_fields[field]
), f"The value of '{field}' does not match"
assert payload[field] == expected_fields[field], (
f"The value of '{field}' does not match"
)
# Verify time fields are integers
for time_field in ["exp", "iat", "nbf"]:
assert time_field in payload, f"The field '{time_field}' is not in the JWT"
assert isinstance(
payload[time_field], int
), f"The field '{time_field}' is not an integer"
assert isinstance(payload[time_field], int), (
f"The field '{time_field}' is not an integer"
)
# Verify identification fields are non-empty strings
for id_field in ["jti", "sub", "tenant_id"]:
assert id_field in payload, f"The field '{id_field}' is not in the JWT"
assert (
isinstance(payload[id_field], str) and payload[id_field]
), f"The field '{id_field}' is not a valid string"
assert isinstance(payload[id_field], str) and payload[id_field], (
f"The field '{id_field}' is not a valid string"
)
@pytest.mark.django_db
@@ -11346,9 +11517,9 @@ class TestIntegrationViewSet:
included_data = response.json()["included"]
for expected_type in expected_resources:
assert any(
d.get("type") == expected_type for d in included_data
), f"Expected type '{expected_type}' not found in included data"
assert any(d.get("type") == expected_type for d in included_data), (
f"Expected type '{expected_type}' not found in included data"
)
@pytest.mark.parametrize(
"integration_type, configuration, credentials",
@@ -12785,9 +12956,9 @@ class TestLighthouseConfigViewSet:
)
# Check that API key is masked with asterisks only
masked_api_key = data["attributes"]["api_key"]
assert all(
c == "*" for c in masked_api_key
), "API key should contain only asterisks"
assert all(c == "*" for c in masked_api_key), (
"API key should contain only asterisks"
)
@pytest.mark.parametrize(
"field_name, invalid_value",
@@ -16532,9 +16703,9 @@ class TestFindingGroupViewSet:
assert len(data) == 2
for item in data:
resource = item["attributes"]["resource"]
assert (
resource["resource_group"] == "storage"
), "resource_group must be 'storage'"
assert resource["resource_group"] == "storage", (
"resource_group must be 'storage'"
)
def test_resources_name_icontains(
self, authenticated_client, finding_groups_fixture
@@ -16848,12 +17019,12 @@ class TestFindingGroupViewSet:
assert response_p1.status_code == status.HTTP_200_OK
p1_check_ids = {item["id"] for item in response_p1.json()["data"]}
# Provider1 has scan1 with 4 checks
assert (
len(p1_check_ids) == 4
), f"Provider1 should have 4 checks, got {len(p1_check_ids)}"
assert (
"cloudtrail_enabled" not in p1_check_ids
), "cloudtrail_enabled should NOT be in provider1"
assert len(p1_check_ids) == 4, (
f"Provider1 should have 4 checks, got {len(p1_check_ids)}"
)
assert "cloudtrail_enabled" not in p1_check_ids, (
"cloudtrail_enabled should NOT be in provider1"
)
# Get finding groups for provider2 only
response_p2 = authenticated_client.get(
@@ -16863,12 +17034,12 @@ class TestFindingGroupViewSet:
assert response_p2.status_code == status.HTTP_200_OK
p2_check_ids = {item["id"] for item in response_p2.json()["data"]}
# Provider2 has scan2 with 1 check
assert (
len(p2_check_ids) == 1
), f"Provider2 should have 1 check, got {len(p2_check_ids)}"
assert (
"cloudtrail_enabled" in p2_check_ids
), "cloudtrail_enabled should be in provider2"
assert len(p2_check_ids) == 1, (
f"Provider2 should have 1 check, got {len(p2_check_ids)}"
)
assert "cloudtrail_enabled" in p2_check_ids, (
"cloudtrail_enabled should be in provider2"
)
# Test provider_type filter actually filters data
def test_finding_groups_provider_type_filter_actually_filters(
@@ -16891,9 +17062,9 @@ class TestFindingGroupViewSet:
{"filter[inserted_at]": TODAY, "filter[provider_type]": "gcp"},
)
assert response_gcp.status_code == status.HTTP_200_OK
assert (
len(response_gcp.json()["data"]) == 0
), "GCP filter should return 0 results"
assert len(response_gcp.json()["data"]) == 0, (
"GCP filter should return 0 results"
)
def test_finding_groups_pagination(
self, authenticated_client, finding_groups_fixture
+14
View File
@@ -109,6 +109,7 @@ from tasks.tasks import (
from api.attack_paths import database as graph_database
from api.attack_paths import get_queries_for_provider, get_query_by_id
from api.attack_paths import views_helpers as attack_paths_views_helpers
from api.events import views_helpers as events_views_helpers
from api.base_views import BaseRLSViewSet, BaseTenantViewset, BaseUserViewset
from api.compliance import (
PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE,
@@ -3390,6 +3391,9 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
description=(
"Retrieve events showing modification history for a resource. "
"Returns who modified the resource and when. Currently only available for AWS resources.\n\n"
"**Content negotiation:** send `Accept: text/plain` to receive a "
"compact markdown report optimized for LLM consumption "
"instead of the default JSON:API document.\n\n"
"**Note:** Some events may not appear due to CloudTrail indexing limitations. "
"Not all AWS API calls record the resource identifier in a searchable format."
),
@@ -3437,6 +3441,7 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
methods=["get"],
url_name="events",
filter_backends=[], # Disable filters - we're calling external API, not filtering queryset
renderer_classes=[APIJSONRenderer, PlainTextRenderer],
)
def events(self, request, pk=None):
"""Get events for a resource."""
@@ -3569,6 +3574,15 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
resource_uid=resource.uid,
)
if isinstance(request.accepted_renderer, PlainTextRenderer):
text = events_views_helpers.serialize_events_as_text(
events,
resource=resource,
lookback_days=lookback_days,
write_events_only=not include_read_events,
)
return Response(text)
serializer = ResourceEventSerializer(events, many=True)
return Response(serializer.data)
+8
View File
@@ -4,6 +4,14 @@ All notable changes to the **Prowler MCP Server** are documented in this file.
## [0.7.0] (Prowler UNRELEASED)
### 🔄 Changed
- `prowler_app_get_resource_events` now requests the API's `text/plain` representation and returns a markdown report [(#XXXXX)](https://github.com/prowler-cloud/prowler/pull/XXXXX)
### 🗑️ Removed
- `ResourceEvent`/`ResourceEventsResponse` models are removed since the tool no longer parses JSON:API for API `resources/{id}/events` [(#XXXXX)](https://github.com/prowler-cloud/prowler/pull/XXXXX)
### 🔐 Security
- `cryptography` from 46.0.1 to 47.0.0 (transitive) for CVE-2026-39892 and CVE-2026-26007 / CVE-2026-34073 [(#10978)](https://github.com/prowler-cloud/prowler/pull/10978)
@@ -135,48 +135,3 @@ class ResourcesMetadataResponse(BaseModel):
regions=attributes.get("regions"),
types=attributes.get("types"),
)
class ResourceEvent(MinimalSerializerMixin, BaseModel):
"""A cloud API action performed on a resource.
Sourced from cloud provider audit logs (AWS CloudTrail, Azure Activity Logs,
GCP Audit Logs, etc.).
"""
id: str
event_time: str
event_name: str
event_source: str
actor: str
actor_uid: str | None = None
actor_type: str | None = None
source_ip_address: str | None = None
user_agent: str | None = None
request_data: dict | None = None
response_data: dict | None = None
error_code: str | None = None
error_message: str | None = None
@classmethod
def from_api_response(cls, data: dict) -> "ResourceEvent":
"""Transform JSON:API resource event response."""
return cls(id=data["id"], **data.get("attributes", {}))
class ResourceEventsResponse(BaseModel):
"""Response wrapper for resource events list."""
events: list[ResourceEvent]
total_events: int
@classmethod
def from_api_response(cls, response: dict) -> "ResourceEventsResponse":
"""Transform JSON:API response to events list."""
data = response.get("data", [])
events = [ResourceEvent.from_api_response(item) for item in data]
return cls(
events=events,
total_events=len(events),
)
@@ -8,7 +8,6 @@ from typing import Any
from prowler_mcp_server.prowler_app.models.resources import (
DetailedResource,
ResourceEventsResponse,
ResourcesListResponse,
ResourcesMetadataResponse,
)
@@ -371,12 +370,13 @@ class ResourcesTools(BaseTool):
IMPORTANT: Currently only available for AWS resources. Uses CloudTrail to retrieve
the modification history of a resource, showing who did what and when.
Each event includes:
- What happened: event_name (e.g., PutBucketPolicy), event_source (e.g., s3.amazonaws.com)
Returns a markdown report (via the API's `Accept: text/plain` representation)
with one section per event, each containing:
- What happened: event_name (e.g., PutBucketPolicy), event source
- Who did it: actor, actor_type, actor_uid
- From where: source_ip_address, user_agent
- What changed: request_data, response_data (full API payloads)
- Errors: error_code, error_message (if the action failed)
- What changed: request_data, response_data (the API call payloads)
- Errors: error code and message when the action failed
Use cases:
- Investigating security incidents (who modified this resource?)
@@ -396,9 +396,14 @@ class ResourcesTools(BaseTool):
clean_params = self.api_client.build_filter_params(params)
api_response = await self.api_client.get(
f"/resources/{resource_id}/events", params=clean_params
token = await self.api_client.auth_manager.get_valid_token()
headers = self.api_client.auth_manager.get_headers(token)
headers["Accept"] = "text/plain"
response = await self.api_client.client.get(
f"{self.api_client.auth_manager.base_url}/resources/{resource_id}/events",
headers=headers,
params=clean_params,
)
events_response = ResourceEventsResponse.from_api_response(api_response)
response.raise_for_status()
return events_response.model_dump()
return {"report": response.text}
+1 -1
View File
@@ -11,7 +11,7 @@ description = "MCP server for Prowler ecosystem"
name = "prowler-mcp"
readme = "README.md"
requires-python = ">=3.12"
version = "0.5.0"
version = "0.7.0"
[project.scripts]
prowler-mcp = "prowler_mcp_server.main:main"