fix(aws): fallback lookup events to resource name (#10828)

This commit is contained in:
Pepe Fagoaga
2026-04-21 18:31:50 +02:00
committed by GitHub
parent 61a62fd6e0
commit f2c5d2ec87
3 changed files with 204 additions and 11 deletions
+8
View File
@@ -2,6 +2,14 @@
All notable changes to the **Prowler SDK** are documented in this file.
## [5.24.3] (Prowler v5.24.3)
### 🐞 Fixed
- CloudTrail resource timeline uses resource name as fallback in `LookupEvents` [(#10828)](https://github.com/prowler-cloud/prowler/pull/10828)
---
## [5.24.1] (Prowler v5.24.1)
### 🚀 Added
@@ -135,25 +135,54 @@ class CloudTrailTimeline(TimelineService):
) -> List[Dict[str, Any]]:
"""Query CloudTrail for events related to a specific resource.
Uses MaxResults to limit the number of events returned, preparing
for API-level pagination. Currently returns up to max_results events
from the first page only.
CloudTrail's ResourceName attribute is populated per-service by AWS
and is not consistent: KMS and SNS store full ARNs, while S3, IAM,
EC2, Lambda, RDS and others store only the resource name or ID. We
first look up using the identifier as-is, and if no events come back
we retry with the last segment extracted from the ARN.
"""
client = self._get_client(region)
start_time = datetime.now(timezone.utc) - timedelta(days=self._lookback_days)
# Use direct API call with MaxResults instead of paginator
# This limits CloudTrail to return only max_results events
events = self._lookup_events_by_name(client, resource_identifier, start_time)
if not events and resource_identifier.startswith("arn:"):
short_name = self._extract_short_name(resource_identifier)
if short_name and short_name != resource_identifier:
logger.debug(
f"CloudTrail timeline: no events for '{resource_identifier}', "
f"retrying lookup with short name '{short_name}'"
)
events = self._lookup_events_by_name(client, short_name, start_time)
return events
def _lookup_events_by_name(
self, client, resource_name: str, start_time: datetime
) -> List[Dict[str, Any]]:
response = client.lookup_events(
LookupAttributes=[
{"AttributeKey": "ResourceName", "AttributeValue": resource_identifier}
{"AttributeKey": "ResourceName", "AttributeValue": resource_name}
],
StartTime=start_time,
MaxResults=self._max_results,
)
return response.get("Events", [])
@staticmethod
def _extract_short_name(identifier: str) -> str:
"""Return the last segment of an ARN or identifier.
ARNs take the form `arn:partition:service:region:account:resource-info`
where resource-info is one of `name`, `type/name`, or `type:name`.
Splitting on the final `/` and then the final `:` yields the value
CloudTrail stores for most services: S3 bucket name, IAM user/role
name, EC2 resource ID, Lambda function name, RDS DB identifier, etc.
"""
if not identifier:
return identifier
return identifier.rsplit("/", 1)[-1].rsplit(":", 1)[-1]
def _parse_event(self, raw_event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Parse a raw CloudTrail event into a TimelineEvent dictionary."""
try:
@@ -120,7 +120,7 @@ class TestCloudTrailTimeline:
assert result[0]["event_name"] == "RunInstances"
def test_get_resource_timeline_prefers_uid_over_id(self, mock_session):
"""When both resource_id and resource_uid are provided, UID should be used."""
"""When both resource_id and resource_uid are provided, UID is tried first."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
@@ -132,9 +132,9 @@ class TestCloudTrailTimeline:
resource_uid="arn:aws:ec2:us-east-1:123:instance/i-1234",
)
# Verify UID was used in the lookup
call_args = mock_client.lookup_events.call_args
lookup_attrs = call_args.kwargs["LookupAttributes"]
# Verify UID was used on the first lookup call
first_call = mock_client.lookup_events.call_args_list[0]
lookup_attrs = first_call.kwargs["LookupAttributes"]
assert (
lookup_attrs[0]["AttributeValue"]
== "arn:aws:ec2:us-east-1:123:instance/i-1234"
@@ -606,3 +606,159 @@ class TestIsReadOnlyEvent:
"""Verify write events are not marked as read-only."""
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._is_read_only_event(event_name) is False
class TestExtractShortName:
"""Tests for _extract_short_name static method."""
@pytest.mark.parametrize(
"identifier,expected",
[
("arn:aws:s3:::my-bucket", "my-bucket"),
("arn:aws:iam::123456789012:user/alice", "alice"),
("arn:aws:iam::123456789012:role/MyRole", "MyRole"),
(
"arn:aws:ec2:us-east-1:123456789012:instance/i-0abc1234",
"i-0abc1234",
),
(
"arn:aws:lambda:us-east-1:123456789012:function:my-func",
"my-func",
),
("arn:aws:rds:us-east-1:123456789012:db:mydb", "mydb"),
("arn:aws:dynamodb:us-east-1:123456789012:table/MyTable", "MyTable"),
(
"arn:aws:kms:us-east-1:123456789012:key/abcd-efgh",
"abcd-efgh",
),
("i-0abc1234", "i-0abc1234"),
("my-bucket", "my-bucket"),
("", ""),
],
)
def test_extract_short_name(self, identifier, expected):
assert CloudTrailTimeline._extract_short_name(identifier) == expected
class TestLookupEventsFallback:
"""Tests for the ARN-to-short-name fallback in _lookup_events."""
@pytest.fixture
def mock_session(self):
return MagicMock()
@pytest.fixture
def sample_event(self):
return {
"EventId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "CreateBucket",
"EventSource": "s3.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
"userName": "admin",
}
}
),
}
def test_no_fallback_when_arn_returns_events(self, mock_session, sample_event):
"""When the ARN lookup returns events, we do not retry with the short name."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": [sample_event]}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1",
resource_uid="arn:aws:kms:us-east-1:123456789012:key/abcd-efgh",
)
assert len(result) == 1
assert mock_client.lookup_events.call_count == 1
call = mock_client.lookup_events.call_args
assert (
call.kwargs["LookupAttributes"][0]["AttributeValue"]
== "arn:aws:kms:us-east-1:123456789012:key/abcd-efgh"
)
def test_fallback_to_short_name_when_arn_returns_empty(
self, mock_session, sample_event
):
"""When the ARN lookup returns nothing, we retry with the short name."""
mock_client = MagicMock()
mock_client.lookup_events.side_effect = [
{"Events": []},
{"Events": [sample_event]},
]
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1", resource_uid="arn:aws:s3:::my-bucket"
)
assert len(result) == 1
assert mock_client.lookup_events.call_count == 2
first_call, second_call = mock_client.lookup_events.call_args_list
assert (
first_call.kwargs["LookupAttributes"][0]["AttributeValue"]
== "arn:aws:s3:::my-bucket"
)
assert (
second_call.kwargs["LookupAttributes"][0]["AttributeValue"] == "my-bucket"
)
def test_no_fallback_when_identifier_has_no_short_name(self, mock_session):
"""A non-ARN identifier collapses to itself; no retry should fire."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1", resource_id="i-0abc1234"
)
assert result == []
assert mock_client.lookup_events.call_count == 1
def test_no_fallback_when_identifier_is_not_arn(self, mock_session):
"""A non-ARN identifier with / or : must not trigger the retry."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1", resource_id="some-prefix/weird:value"
)
assert result == []
assert mock_client.lookup_events.call_count == 1
def test_both_lookups_empty_returns_empty_list(self, mock_session):
"""If both the ARN and short-name lookups return empty, we return []."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1",
resource_uid="arn:aws:ec2:us-east-1:123456789012:instance/i-0abc1234",
)
assert result == []
assert mock_client.lookup_events.call_count == 2
first_call, second_call = mock_client.lookup_events.call_args_list
assert (
first_call.kwargs["LookupAttributes"][0]["AttributeValue"]
== "arn:aws:ec2:us-east-1:123456789012:instance/i-0abc1234"
)
assert (
second_call.kwargs["LookupAttributes"][0]["AttributeValue"] == "i-0abc1234"
)