feat(aws): CloudTrail timeline for findings (#9101)

Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
This commit is contained in:
Sergio Garcia
2026-01-27 13:00:46 +01:00
committed by GitHub
parent 255ce0e866
commit 9e7ecb39fa
17 changed files with 2370 additions and 28 deletions
+1
View File
@@ -8,6 +8,7 @@ All notable changes to the **Prowler API** are documented in this file.
- Attack Paths: Bedrock Code Interpreter and AttachRolePolicy privilege escalation queries [(#9885)](https://github.com/prowler-cloud/prowler/pull/9885)
- Added memory optimizations for large compliance report generation [(#9444)](https://github.com/prowler-cloud/prowler/pull/9444)
- `GET /api/v1/resources/{id}/events` endpoint to retrieve AWS resource modification history from CloudTrail [(#9101)](https://github.com/prowler-cloud/prowler/pull/9101)
### 🔄 Changed
+102
View File
@@ -107,3 +107,105 @@ class ConflictException(APIException):
error_detail["source"] = {"pointer": pointer}
super().__init__(detail=[error_detail])
# Upstream Provider Errors (for external API calls like CloudTrail)
# These indicate issues with the provider, not with the user's API authentication
class UpstreamAuthenticationError(APIException):
"""Provider credentials are invalid or expired (502 Bad Gateway).
Used when AWS/Azure/GCP credentials fail to authenticate with the upstream
provider. This is NOT the user's API authentication failing.
"""
status_code = status.HTTP_502_BAD_GATEWAY
default_detail = (
"Provider credentials are invalid or expired. Please reconnect the provider."
)
default_code = "upstream_auth_failed"
def __init__(self, detail=None):
super().__init__(
detail=[
{
"detail": detail or self.default_detail,
"status": str(self.status_code),
"code": self.default_code,
}
]
)
class UpstreamAccessDeniedError(APIException):
"""Provider credentials lack required permissions (502 Bad Gateway).
Used when credentials are valid but don't have the IAM permissions
needed for the requested operation (e.g., cloudtrail:LookupEvents).
This is 502 (not 403) because it's an upstream/gateway error - the USER
authenticated fine, but the PROVIDER's credentials are misconfigured.
"""
status_code = status.HTTP_502_BAD_GATEWAY
default_detail = (
"Access denied. The provider credentials do not have the required permissions."
)
default_code = "upstream_access_denied"
def __init__(self, detail=None):
super().__init__(
detail=[
{
"detail": detail or self.default_detail,
"status": str(self.status_code),
"code": self.default_code,
}
]
)
class UpstreamServiceUnavailableError(APIException):
"""Provider service is unavailable (503 Service Unavailable).
Used when the upstream provider API returns an error or is unreachable.
"""
status_code = status.HTTP_503_SERVICE_UNAVAILABLE
default_detail = "Unable to communicate with the provider. Please try again later."
default_code = "service_unavailable"
def __init__(self, detail=None):
super().__init__(
detail=[
{
"detail": detail or self.default_detail,
"status": str(self.status_code),
"code": self.default_code,
}
]
)
class UpstreamInternalError(APIException):
"""Unexpected error communicating with provider (500 Internal Server Error).
Used as a catch-all for unexpected errors during provider communication.
"""
status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
default_detail = (
"An unexpected error occurred while communicating with the provider."
)
default_code = "internal_error"
def __init__(self, detail=None):
super().__init__(
detail=[
{
"detail": detail or self.default_detail,
"status": str(self.status_code),
"code": self.default_code,
}
]
)
+124
View File
@@ -8502,6 +8502,64 @@ paths:
schema:
$ref: '#/components/schemas/ResourceResponse'
description: ''
/api/v1/resources/{id}/events:
get:
operationId: resources_events_list
description: |-
Retrieve events showing modification history for a resource. Returns who modified the resource and when. Currently only available for AWS resources.
**Note:** Some events may not appear due to CloudTrail indexing limitations. Not all AWS API calls record the resource identifier in a searchable format.
summary: Get events for a resource
parameters:
- in: path
name: id
schema:
type: string
format: uuid
description: A UUID string identifying this resource.
required: true
- in: query
name: include_read_events
schema:
type: boolean
description: 'Include read-only events (Describe*, Get*, List*, etc.). Default:
false. Set to true to include all events.'
- in: query
name: lookback_days
schema:
type: integer
description: 'Number of days to look back (default: 90, min: 1, max: 90).'
- name: page[number]
required: false
in: query
description: A page number within the paginated result set.
schema:
type: integer
- in: query
name: page[size]
schema:
type: integer
description: 'Maximum number of events to return (default: 50, min: 1, max:
50).'
tags:
- Resource
security:
- JWT or API Key: []
responses:
'200':
content:
application/vnd.api+json:
schema:
$ref: '#/components/schemas/PaginatedResourceEventList'
description: ''
'400':
description: Invalid provider or parameters
'500':
description: Unexpected error retrieving events
'502':
description: Provider credentials invalid, expired, or lack required permissions
'503':
description: Provider service unavailable
/api/v1/resources/latest:
get:
operationId: resources_latest_retrieve
@@ -15885,6 +15943,15 @@ components:
$ref: '#/components/schemas/ProviderSecret'
required:
- data
PaginatedResourceEventList:
type: object
properties:
data:
type: array
items:
$ref: '#/components/schemas/ResourceEvent'
required:
- data
PaginatedResourceGroupOverviewList:
type: object
properties:
@@ -19967,6 +20034,63 @@ components:
readOnly: true
required:
- provider
ResourceEvent:
type: object
required:
- type
- id
additionalProperties: false
properties:
type:
type: string
description: The [type](https://jsonapi.org/format/#document-resource-object-identification)
member is used to describe resource objects that share common attributes
and relationships.
enum:
- resource-events
id: {}
attributes:
type: object
properties:
id:
type: string
event_time:
type: string
format: date-time
event_name:
type: string
event_source:
type: string
actor:
type: string
actor_uid:
type: string
nullable: true
actor_type:
type: string
nullable: true
source_ip_address:
type: string
nullable: true
user_agent:
type: string
nullable: true
request_data:
nullable: true
response_data:
nullable: true
error_code:
type: string
nullable: true
error_message:
type: string
nullable: true
required:
- id
- event_time
- event_name
- event_source
- actor
ResourceGroupOverview:
type: object
required:
+655
View File
@@ -4356,6 +4356,661 @@ class TestResourceViewSet:
assert attributes["types"] == [latest_scan_resource.type]
assert "groups" in attributes
# Events endpoint tests
def test_events_non_aws_provider(self, authenticated_client, providers_fixture):
"""Test events endpoint rejects non-AWS providers."""
from api.models import Resource
azure_provider = providers_fixture[4] # Azure provider from fixture
resource = Resource.objects.create(
uid="test-resource-id",
name="Test Resource",
type="test-type",
region="us-east-1",
service="test-service",
provider=azure_provider,
tenant_id=azure_provider.tenant_id,
)
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == "invalid_provider"
assert error["status"] == "400" # Must be string per JSON:API spec
assert error["source"]["pointer"] == "/data/attributes/provider"
assert "AWS" in error["detail"]
@pytest.mark.parametrize(
"lookback_days,expected_status,expected_code,expected_detail_contains",
[
("abc", status.HTTP_400_BAD_REQUEST, "invalid", "valid integer"),
("0", status.HTTP_400_BAD_REQUEST, "out_of_range", "between 1 and 90"),
("91", status.HTTP_400_BAD_REQUEST, "out_of_range", "between 1 and 90"),
("-5", status.HTTP_400_BAD_REQUEST, "out_of_range", "between 1 and 90"),
],
)
def test_events_invalid_lookback_days(
self,
authenticated_client,
providers_fixture,
lookback_days,
expected_status,
expected_code,
expected_detail_contains,
):
"""Test events endpoint validates lookback_days with JSON:API compliant errors."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
{"lookback_days": lookback_days},
)
assert response.status_code == expected_status
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == expected_code
assert error["status"] == "400" # Must be string per JSON:API spec
assert error["source"]["parameter"] == "lookback_days"
assert expected_detail_contains in error["detail"]
@pytest.mark.parametrize(
"page_size,expected_status,expected_code,expected_detail_contains",
[
("abc", status.HTTP_400_BAD_REQUEST, "invalid", "valid integer"),
("0", status.HTTP_400_BAD_REQUEST, "out_of_range", "between 1 and 50"),
("51", status.HTTP_400_BAD_REQUEST, "out_of_range", "between 1 and 50"),
("-1", status.HTTP_400_BAD_REQUEST, "out_of_range", "between 1 and 50"),
],
)
def test_events_invalid_page_size(
self,
authenticated_client,
providers_fixture,
page_size,
expected_status,
expected_code,
expected_detail_contains,
):
"""Test events endpoint validates page[size] with JSON:API compliant errors."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-pagesize-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
{"page[size]": page_size},
)
assert response.status_code == expected_status
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == expected_code
assert error["status"] == "400" # Must be string per JSON:API spec
assert error["source"]["parameter"] == "page[size]"
assert expected_detail_contains in error["detail"]
@pytest.mark.parametrize(
"invalid_params,expected_invalid_param",
[
({"filter[service]": "ec2"}, "filter[service]"),
({"filter[region]": "us-east-1"}, "filter[region]"),
({"sort": "-name"}, "sort"),
({"unknown_param": "value"}, "unknown_param"),
({"filter[servic]": "ec2"}, "filter[servic]"), # Typo in filter name
],
)
def test_events_invalid_query_parameter(
self,
authenticated_client,
providers_fixture,
invalid_params,
expected_invalid_param,
):
"""Test events endpoint rejects unknown query parameters with JSON:API compliant errors."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
invalid_params,
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
# Verify JSON:API error structure
errors = response.json()["errors"]
assert len(errors) >= 1
# Find the error for our expected invalid param
error = next(
(e for e in errors if e["source"]["parameter"] == expected_invalid_param),
None,
)
assert (
error is not None
), f"Expected error for parameter '{expected_invalid_param}'"
assert error["code"] == "invalid"
assert error["status"] == "400" # Must be string per JSON:API spec
assert expected_invalid_param in error["detail"]
def test_events_multiple_invalid_query_parameters(
self,
authenticated_client,
providers_fixture,
):
"""Test events endpoint returns error for first unknown parameter."""
from api.models import Resource
aws_provider = providers_fixture[0]
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Send multiple invalid parameters - only first one triggers error
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
{"filter[service]": "ec2", "sort": "-name", "unknown": "value"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
# Should have one error for the first invalid parameter encountered
errors = response.json()["errors"]
assert len(errors) == 1
assert errors[0]["code"] == "invalid"
assert errors[0]["status"] == "400"
assert errors[0]["source"]["parameter"] in {
"filter[service]",
"sort",
"unknown",
}
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_success(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Test successful events retrieval."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
# Create test resource
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-test123",
name="Test EC2 Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Mock provider session
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
# Mock CloudTrail timeline response - events need event_id for serializer
mock_timeline_instance = Mock()
mock_events = [
{
"event_id": "event-1-id",
"event_time": "2024-01-15T10:30:00Z",
"event_name": "RunInstances",
"event_source": "ec2.amazonaws.com",
"actor": "admin@example.com",
"actor_type": "IAMUser",
"source_ip_address": "203.0.113.1",
"user_agent": "aws-cli/2.0.0",
},
{
"event_id": "event-2-id",
"event_time": "2024-01-16T14:20:00Z",
"event_name": "StopInstances",
"event_source": "ec2.amazonaws.com",
"actor": "operator@example.com",
"actor_type": "IAMUser",
},
]
mock_timeline_instance.get_resource_timeline.return_value = mock_events
mock_cloudtrail_timeline.return_value = mock_timeline_instance
# Make request with lookback_days parameter
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id}),
{"lookback_days": "30"},
)
# Assertions - response is wrapped by JSON:API renderer
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
events = response_data["data"]
assert len(events) == 2
# Verify JSON:API structure: type and id are present
assert events[0]["type"] == "resource-events"
assert events[0]["id"] == "event-1-id"
assert events[1]["type"] == "resource-events"
assert events[1]["id"] == "event-2-id"
# Verify attributes
assert events[0]["attributes"]["event_name"] == "RunInstances"
assert events[0]["attributes"]["actor"] == "admin@example.com"
assert events[1]["attributes"]["event_name"] == "StopInstances"
# Verify CloudTrail was called with correct parameters
mock_cloudtrail_timeline.assert_called_once_with(
session=mock_session,
lookback_days=30,
max_results=50, # Default page size
write_events_only=True, # Default: exclude read events
)
mock_timeline_instance.get_resource_timeline.assert_called_once_with(
region=resource.region,
resource_uid=resource.uid,
)
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_default_lookback_days(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Test events uses default lookback_days (90) when not provided."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:s3:::test-bucket",
name="Test Bucket",
type="bucket",
region="us-east-1",
service="s3",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Mock provider session
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
# Mock CloudTrail timeline response
mock_timeline_instance = Mock()
mock_timeline_instance.get_resource_timeline.return_value = []
mock_cloudtrail_timeline.return_value = mock_timeline_instance
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
assert response.status_code == status.HTTP_200_OK
# Verify default lookback_days (90) was used
mock_cloudtrail_timeline.assert_called_once_with(
session=mock_session,
lookback_days=90, # Default
max_results=50,
write_events_only=True,
)
@patch("api.v1.views.initialize_prowler_provider")
def test_events_no_credentials_error(
self, mock_initialize_provider, authenticated_client, providers_fixture
):
"""Test events handles missing credentials errors."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:rds:us-west-2:123456789012:db:test-db",
name="Test Database",
type="db-instance",
region="us-west-2",
service="rds",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
mock_initialize_provider.side_effect = NoCredentialsError()
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
# 502 because this is an upstream auth failure, not API auth failure
assert response.status_code == status.HTTP_502_BAD_GATEWAY
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == "upstream_auth_failed"
assert error["status"] == "502" # Must be string per JSON:API spec
assert "detail" in error
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_access_denied_error(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Test events handles AccessDenied errors from AWS."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:lambda:eu-west-1:123456789012:function:test-func",
name="Test Function",
type="function",
region="eu-west-1",
service="lambda",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Mock provider
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
# Mock ClientError with AccessDenied
mock_timeline_instance = Mock()
mock_timeline_instance.get_resource_timeline.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
"LookupEvents",
)
mock_cloudtrail_timeline.return_value = mock_timeline_instance
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
# AccessDenied returns 502 (upstream error, not user's fault)
assert response.status_code == status.HTTP_502_BAD_GATEWAY
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == "upstream_access_denied"
assert error["status"] == "502" # Must be string per JSON:API spec
assert "detail" in error
@patch("api.v1.views.initialize_prowler_provider")
@patch("api.v1.views.CloudTrailTimeline")
def test_events_service_unavailable_error(
self,
mock_cloudtrail_timeline,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Test events handles generic AWS API errors as 503."""
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:lambda:eu-west-1:123456789012:function:test-func2",
name="Test Function 2",
type="function",
region="eu-west-1",
service="lambda",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Mock provider
mock_session = Mock()
mock_provider = Mock()
mock_provider._session.current_session = mock_session
mock_initialize_provider.return_value = mock_provider
# Mock ClientError with non-AccessDenied error
mock_timeline_instance = Mock()
mock_timeline_instance.get_resource_timeline.side_effect = ClientError(
{"Error": {"Code": "ServiceUnavailable", "Message": "Service unavailable"}},
"LookupEvents",
)
mock_cloudtrail_timeline.return_value = mock_timeline_instance
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
# Non-AccessDenied errors return 503
assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == "service_unavailable"
assert error["status"] == "503" # Must be string per JSON:API spec
assert "detail" in error
def test_events_unauthenticated_returns_401(self, providers_fixture):
"""Test events endpoint returns 401 when no credentials are provided.
This ensures the endpoint follows API conventions where missing authentication
returns 401 Unauthorized, not 404 Not Found.
"""
from rest_framework.test import APIClient
from api.models import Resource
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-unauth-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Use unauthenticated client (no JWT token)
unauthenticated_client = APIClient()
response = unauthenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
# Must return 401 Unauthorized, not 404 Not Found
assert response.status_code == status.HTTP_401_UNAUTHORIZED, (
f"Expected 401 Unauthorized but got {response.status_code}. "
"Unauthenticated requests should return 401, not 404."
)
def test_events_cross_tenant_returns_404(
self, authenticated_client, tenants_fixture
):
"""Test events endpoint returns 404 for resources in other tenants (RLS).
Users cannot access resources belonging to other tenants due to
Row-Level Security. The resource should appear to not exist.
"""
from api.models import Provider, Resource
# tenant3 (tenants_fixture[2]) has no membership for the test user
isolated_tenant = tenants_fixture[2]
# Create provider in the isolated tenant
other_tenant_provider = Provider.objects.create(
provider="aws",
uid="999999999999",
alias="other_tenant_aws",
tenant_id=isolated_tenant.id,
)
# Create resource in the OTHER tenant (not the authenticated user's tenant)
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:999999999999:instance/i-other-tenant",
name="Other Tenant Resource",
type="instance",
region="us-east-1",
service="ec2",
provider=other_tenant_provider,
tenant_id=isolated_tenant.id,
)
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
# RLS hides resources from other tenants - should appear as not found
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_events_expired_token_returns_401(self, providers_fixture, tenants_fixture):
"""Test events endpoint returns 401 when JWT token is expired.
Expired tokens should return 401 Unauthorized, not 404 Not Found.
This ensures authentication errors are properly distinguished from
resource not found errors.
"""
from rest_framework.test import APIClient
from api.models import Resource
aws_provider = providers_fixture[0]
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-expired-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Create an expired JWT token
tenant = tenants_fixture[0]
expired_payload = {
"token_type": "access",
"exp": datetime.now(timezone.utc)
- timedelta(hours=1), # Expired 1 hour ago
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
"jti": str(uuid4()),
"user_id": str(uuid4()),
"tenant_id": str(tenant.id),
}
expired_token = jwt.encode(
expired_payload, settings.SECRET_KEY, algorithm="HS256"
)
client = APIClient()
client.credentials(HTTP_AUTHORIZATION=f"Bearer {expired_token}")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
# Must return 401 Unauthorized, not 404 Not Found
assert response.status_code == status.HTTP_401_UNAUTHORIZED, (
f"Expected 401 Unauthorized but got {response.status_code}. "
"Expired tokens should return 401, not 404."
)
def test_events_invalid_token_returns_401(self, providers_fixture):
"""Test events endpoint returns 401 when JWT token is completely invalid.
Malformed or invalid tokens should return 401 Unauthorized, not 404 Not Found.
"""
from rest_framework.test import APIClient
from api.models import Resource
aws_provider = providers_fixture[0]
resource = Resource.objects.create(
uid="arn:aws:ec2:us-east-1:123456789012:instance/i-invalid-test",
name="Test Instance",
type="instance",
region="us-east-1",
service="ec2",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
client = APIClient()
# Test with completely malformed token
client.credentials(HTTP_AUTHORIZATION="Bearer not.a.valid.jwt.token")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
assert (
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"Expected 401 for malformed token but got {response.status_code}"
# Test with empty bearer token
client.credentials(HTTP_AUTHORIZATION="Bearer ")
response = client.get(reverse("resource-events", kwargs={"pk": resource.id}))
assert (
response.status_code == status.HTTP_401_UNAUTHORIZED
), f"Expected 401 for empty bearer token but got {response.status_code}"
@pytest.mark.django_db
class TestFindingViewSet:
+28
View File
@@ -3975,3 +3975,31 @@ class ThreatScoreSnapshotSerializer(RLSSerializer):
if getattr(obj, "_aggregated", False):
return "n/a"
return str(obj.id)
# Resource Events Serializers
class ResourceEventSerializer(BaseSerializerV1):
"""Serializer for resource events (CloudTrail modification history).
NOTE: drf-spectacular auto-generates fields[resource-events] sparse fieldsets
parameter in the OpenAPI schema. This endpoint does not support sparse fieldsets.
"""
id = serializers.CharField(source="event_id")
event_time = serializers.DateTimeField()
event_name = serializers.CharField()
event_source = serializers.CharField()
actor = serializers.CharField()
actor_uid = serializers.CharField(allow_null=True, required=False)
actor_type = serializers.CharField(allow_null=True, required=False)
source_ip_address = serializers.CharField(allow_null=True, required=False)
user_agent = serializers.CharField(allow_null=True, required=False)
request_data = serializers.JSONField(allow_null=True, required=False)
response_data = serializers.JSONField(allow_null=True, required=False)
error_code = serializers.CharField(allow_null=True, required=False)
error_message = serializers.CharField(allow_null=True, required=False)
class Meta:
resource_name = "resource-events"
+261 -27
View File
@@ -3,7 +3,6 @@ import glob
import json
import logging
import os
from collections import defaultdict
from copy import deepcopy
from datetime import datetime, timedelta, timezone
@@ -11,7 +10,6 @@ from decimal import ROUND_HALF_UP, Decimal, InvalidOperation
from urllib.parse import urljoin
import sentry_sdk
from allauth.socialaccount.models import SocialAccount, SocialApp
from allauth.socialaccount.providers.github.views import GitHubOAuth2Adapter
from allauth.socialaccount.providers.google.views import GoogleOAuth2Adapter
@@ -75,13 +73,27 @@ from rest_framework.generics import GenericAPIView, get_object_or_404
from rest_framework.permissions import SAFE_METHODS
from rest_framework_json_api.views import RelationshipView, Response
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from api.attack_paths import (
database as graph_database,
get_queries_for_provider,
get_query_by_id,
views_helpers as attack_paths_views_helpers,
from tasks.beat import schedule_provider_scan
from tasks.jobs.attack_paths import db_utils as attack_paths_db_utils
from tasks.jobs.export import get_s3_client
from tasks.tasks import (
backfill_compliance_summaries_task,
backfill_scan_resource_summaries_task,
check_integration_connection_task,
check_lighthouse_connection_task,
check_lighthouse_provider_connection_task,
check_provider_connection_task,
delete_provider_task,
delete_tenant_task,
jira_integration_task,
mute_historical_findings_task,
perform_scan_task,
refresh_lighthouse_provider_models_task,
)
from api.attack_paths import database as graph_database
from api.attack_paths import get_queries_for_provider, get_query_by_id
from api.attack_paths import views_helpers as attack_paths_views_helpers
from api.base_views import BaseRLSViewSet, BaseTenantViewset, BaseUserViewset
from api.compliance import (
PROWLER_COMPLIANCE_OVERVIEW_TEMPLATE,
@@ -89,7 +101,13 @@ from api.compliance import (
)
from api.db_router import MainRouter
from api.db_utils import rls_transaction
from api.exceptions import TaskFailedException
from api.exceptions import (
TaskFailedException,
UpstreamAccessDeniedError,
UpstreamAuthenticationError,
UpstreamInternalError,
UpstreamServiceUnavailableError,
)
from api.filters import (
AttackPathsScanFilter,
AttackSurfaceOverviewFilter,
@@ -173,6 +191,7 @@ from api.rls import Tenant
from api.utils import (
CustomOAuth2Client,
get_findings_metadata_no_aggregations,
initialize_prowler_provider,
validate_invitation,
)
from api.uuid_utils import datetime_to_uuid7, uuid7_start
@@ -234,6 +253,7 @@ from api.v1.serializers import (
ProviderSecretUpdateSerializer,
ProviderSerializer,
ProviderUpdateSerializer,
ResourceEventSerializer,
ResourceGroupOverviewSerializer,
ResourceMetadataSerializer,
ResourceSerializer,
@@ -264,22 +284,9 @@ from api.v1.serializers import (
UserSerializer,
UserUpdateSerializer,
)
from tasks.beat import schedule_provider_scan
from tasks.jobs.attack_paths import db_utils as attack_paths_db_utils
from tasks.jobs.export import get_s3_client
from tasks.tasks import (
backfill_compliance_summaries_task,
backfill_scan_resource_summaries_task,
check_integration_connection_task,
check_lighthouse_connection_task,
check_lighthouse_provider_connection_task,
check_provider_connection_task,
delete_provider_task,
delete_tenant_task,
jira_integration_task,
mute_historical_findings_task,
perform_scan_task,
refresh_lighthouse_provider_models_task,
from prowler.providers.aws.exceptions.exceptions import AWSCredentialsError
from prowler.providers.aws.lib.cloudtrail_timeline.cloudtrail_timeline import (
CloudTrailTimeline,
)
logger = logging.getLogger(BackendLogger.API)
@@ -2504,6 +2511,20 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
http_method_names = ["get"]
filterset_class = ResourceFilter
ordering = ["-failed_findings_count", "-updated_at"]
# Events endpoint constants (currently AWS-only, limited to 90 days by CloudTrail Event History)
EVENTS_DEFAULT_LOOKBACK_DAYS = 90
EVENTS_MIN_LOOKBACK_DAYS = 1
EVENTS_MAX_LOOKBACK_DAYS = 90
# Page size controls how many events CloudTrail returns (prepares for API pagination)
EVENTS_DEFAULT_PAGE_SIZE = 50
EVENTS_MIN_PAGE_SIZE = 1
EVENTS_MAX_PAGE_SIZE = 50 # CloudTrail lookup_events max is 50
# Allowed query parameters for the events endpoint
EVENTS_ALLOWED_PARAMS = frozenset(
{"lookback_days", "page[size]", "include_read_events"}
)
ordering_fields = [
"provider_uid",
"uid",
@@ -2579,6 +2600,8 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
def get_serializer_class(self):
if self.action in ["metadata", "metadata_latest"]:
return ResourceMetadataSerializer
if self.action == "events":
return ResourceEventSerializer
return super().get_serializer_class()
def get_filterset_class(self):
@@ -2587,8 +2610,8 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
return ResourceFilter
def filter_queryset(self, queryset):
# Do not apply filters when retrieving specific resource
if self.action == "retrieve":
# Do not apply filters when retrieving specific resource or events
if self.action in ["retrieve", "events"]:
return queryset
return super().filter_queryset(queryset)
@@ -2828,6 +2851,217 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
serializer.is_valid(raise_exception=True)
return Response(serializer.data)
@extend_schema(
tags=["Resource"],
summary="Get events for a resource",
description=(
"Retrieve events showing modification history for a resource. "
"Returns who modified the resource and when. Currently only available for AWS resources.\n\n"
"**Note:** Some events may not appear due to CloudTrail indexing limitations. "
"Not all AWS API calls record the resource identifier in a searchable format."
),
parameters=[
OpenApiParameter(
name="lookback_days",
type=OpenApiTypes.INT,
location=OpenApiParameter.QUERY,
description="Number of days to look back (default: 90, min: 1, max: 90).",
required=False,
),
OpenApiParameter(
name="page[size]",
type=OpenApiTypes.INT,
location=OpenApiParameter.QUERY,
description="Maximum number of events to return (default: 50, min: 1, max: 50).",
required=False,
),
OpenApiParameter(
name="include_read_events",
type=OpenApiTypes.BOOL,
location=OpenApiParameter.QUERY,
description=(
"Include read-only events (Describe*, Get*, List*, etc.). "
"Default: false. Set to true to include all events."
),
required=False,
),
# NOTE: drf-spectacular auto-generates page[number] and fields[resource-events]
# parameters. This endpoint does not support pagination (results are limited by
# page[size] only) nor sparse fieldsets.
],
responses={
200: ResourceEventSerializer(many=True),
400: OpenApiResponse(description="Invalid provider or parameters"),
500: OpenApiResponse(description="Unexpected error retrieving events"),
502: OpenApiResponse(
description="Provider credentials invalid, expired, or lack required permissions"
),
503: OpenApiResponse(description="Provider service unavailable"),
},
)
@action(
detail=True,
methods=["get"],
url_name="events",
filter_backends=[], # Disable filters - we're calling external API, not filtering queryset
)
def events(self, request, pk=None):
"""Get events for a resource."""
resource = self.get_object()
# Validate query parameters - reject unknown parameters
for param in request.query_params.keys():
if param not in self.EVENTS_ALLOWED_PARAMS:
raise ValidationError(
[
{
"detail": f"invalid parameter '{param}'",
"status": "400",
"source": {"parameter": param},
"code": "invalid",
}
]
)
# Validate provider - currently only AWS CloudTrail is supported
if resource.provider.provider != Provider.ProviderChoices.AWS:
raise ValidationError(
[
{
"detail": "Events are only available for AWS resources",
"status": "400",
"source": {"pointer": "/data/attributes/provider"},
"code": "invalid_provider",
}
]
)
# Validate and parse lookback_days from query params
lookback_days_str = request.query_params.get("lookback_days")
if lookback_days_str is None:
lookback_days = self.EVENTS_DEFAULT_LOOKBACK_DAYS
else:
try:
lookback_days = int(lookback_days_str)
except (ValueError, TypeError):
raise ValidationError(
[
{
"detail": "lookback_days must be a valid integer",
"status": "400",
"source": {"parameter": "lookback_days"},
"code": "invalid",
}
]
)
if not (
self.EVENTS_MIN_LOOKBACK_DAYS
<= lookback_days
<= self.EVENTS_MAX_LOOKBACK_DAYS
):
raise ValidationError(
[
{
"detail": (
f"lookback_days must be between {self.EVENTS_MIN_LOOKBACK_DAYS} "
f"and {self.EVENTS_MAX_LOOKBACK_DAYS}"
),
"status": "400",
"source": {"parameter": "lookback_days"},
"code": "out_of_range",
}
]
)
# Validate and parse page[size] from query params (JSON:API pagination)
page_size_str = request.query_params.get("page[size]")
if page_size_str is None:
page_size = self.EVENTS_DEFAULT_PAGE_SIZE
else:
try:
page_size = int(page_size_str)
except (ValueError, TypeError):
raise ValidationError(
[
{
"detail": "page[size] must be a valid integer",
"status": "400",
"source": {"parameter": "page[size]"},
"code": "invalid",
}
]
)
if not (
self.EVENTS_MIN_PAGE_SIZE <= page_size <= self.EVENTS_MAX_PAGE_SIZE
):
raise ValidationError(
[
{
"detail": (
f"page[size] must be between {self.EVENTS_MIN_PAGE_SIZE} "
f"and {self.EVENTS_MAX_PAGE_SIZE}"
),
"status": "400",
"source": {"parameter": "page[size]"},
"code": "out_of_range",
}
]
)
# Parse include_read_events (default: false)
include_read_events = (
request.query_params.get("include_read_events", "").lower() == "true"
)
try:
# Initialize Prowler provider using existing utility
prowler_provider = initialize_prowler_provider(resource.provider)
# Get the boto3 session from the Prowler provider
session = prowler_provider._session.current_session
# Create timeline service (currently only AWS/CloudTrail is supported)
timeline_service = CloudTrailTimeline(
session=session,
lookback_days=lookback_days,
max_results=page_size,
write_events_only=not include_read_events,
)
# Get timeline events
events = timeline_service.get_resource_timeline(
region=resource.region,
resource_uid=resource.uid,
)
serializer = ResourceEventSerializer(events, many=True)
return Response(serializer.data)
except NoCredentialsError:
# 502 because this is an upstream auth failure, not API auth failure
raise UpstreamAuthenticationError(
detail="Credentials not found for this provider. Please reconnect the provider."
)
except AWSCredentialsError:
# Handles expired tokens, invalid keys, profile not found, etc.
raise UpstreamAuthenticationError()
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
logger.error(
f"Provider API error retrieving events: {str(e)}",
exc_info=True,
)
# AccessDenied means the credentials don't have the required permissions
if error_code in ("AccessDenied", "AccessDeniedException"):
raise UpstreamAccessDeniedError()
raise UpstreamServiceUnavailableError()
except Exception as e:
sentry_sdk.capture_exception(e)
raise UpstreamInternalError(detail="Failed to retrieve events")
@extend_schema_view(
list=extend_schema(
+3 -1
View File
@@ -4,12 +4,14 @@ All notable changes to the **Prowler SDK** are documented in this file.
## [5.18.0] (Prowler UNRELEASED)
### Added
### 🚀 Added
- `defender_zap_for_teams_enabled` check for M365 provider [(#9838)](https://github.com/prowler-cloud/prowler/pull/9838)
- `compute_instance_suspended_without_persistent_disks` check for GCP provider [(#9747)](https://github.com/prowler-cloud/prowler/pull/9747)
- `codebuild_project_webhook_filters_use_anchored_patterns` check for AWS provider to detect CodeBreach vulnerability [(#9840)](https://github.com/prowler-cloud/prowler/pull/9840)
- `exchange_shared_mailbox_sign_in_disabled` check for M365 provider [(#9828)](https://github.com/prowler-cloud/prowler/pull/9828)
- CloudTrail Timeline abstraction for querying resource modification history [(#9101)](https://github.com/prowler-cloud/prowler/pull/9101)
### Changed
View File
+27
View File
@@ -0,0 +1,27 @@
from datetime import datetime
from typing import Any, Dict, Optional
from pydantic.v1 import BaseModel
class TimelineEvent(BaseModel):
"""A timeline event representing a resource modification.
Provider-agnostic model that can be used by any timeline implementation
(AWS CloudTrail, Azure Activity Logs, GCP Audit Logs, etc.).
"""
event_id: str
event_time: datetime
event_name: str
event_source: str
actor: str
actor_uid: Optional[str] = None
actor_type: Optional[str] = None
source_ip_address: Optional[str] = None
user_agent: Optional[str] = None
request_data: Optional[Dict[str, Any]] = None
response_data: Optional[Dict[str, Any]] = None
error_code: Optional[str] = None
error_message: Optional[str] = None
metadata: Optional[Dict[str, Any]] = None
+36
View File
@@ -0,0 +1,36 @@
"""Abstract base class for timeline services."""
from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
class TimelineService(ABC):
"""Abstract base class for provider-specific timeline implementations.
Subclasses should implement the get_resource_timeline method to query
their provider's audit/activity log service (e.g., AWS CloudTrail,
Azure Activity Logs, GCP Audit Logs).
"""
@abstractmethod
def get_resource_timeline(
self,
region: Optional[str] = None,
resource_id: Optional[str] = None,
resource_uid: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get timeline events for a resource.
Args:
region: Region/location where the resource exists. Implementations
may provide a sensible default for global/regionless resources.
resource_id: Provider-specific resource ID (e.g., bucket name, instance ID)
resource_uid: Provider-specific unique identifier (e.g., AWS ARN, Azure Resource ID)
Returns:
List of timeline event dictionaries
Raises:
ValueError: If neither resource_id nor resource_uid is provided
"""
raise NotImplementedError
@@ -0,0 +1,218 @@
"""CloudTrail timeline service for AWS.
Queries AWS CloudTrail to retrieve timeline events for resources,
showing who performed actions and when.
"""
import json
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
from botocore.exceptions import ClientError
from prowler.lib.logger import logger
from prowler.lib.timeline.models import TimelineEvent
from prowler.lib.timeline.timeline import TimelineService
class CloudTrailTimeline(TimelineService):
"""AWS CloudTrail implementation of TimelineService.
Args:
session: boto3.Session for AWS API calls
lookback_days: Number of days to look back (default 90, max 90 for Event History)
max_results: Maximum number of events to return
write_events_only: If True, filter out read-only events (Describe*, Get*, List*, etc.)
"""
MAX_LOOKBACK_DAYS = 90
DEFAULT_MAX_RESULTS = 50 # Default page size for CloudTrail queries
# Prefixes for read-only API operations that don't modify resources
READ_ONLY_PREFIXES = (
"Describe",
"Get",
"List",
"Head",
"Check",
"Lookup",
"Search",
"Scan",
"Query",
"BatchGet",
"Select",
)
def __init__(
self,
session,
lookback_days: int = 90,
max_results: Optional[int] = None,
write_events_only: bool = True,
):
self._session = session
self._lookback_days = min(lookback_days, self.MAX_LOOKBACK_DAYS)
self._max_results = max_results or self.DEFAULT_MAX_RESULTS
self._write_events_only = write_events_only
self._clients: Dict[str, Any] = {}
DEFAULT_REGION = "us-east-1" # Default for global resources in commercial partition
def get_resource_timeline(
self,
region: Optional[str] = None,
resource_id: Optional[str] = None,
resource_uid: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Get CloudTrail timeline events for a resource.
Args:
region: AWS region to query. Defaults to us-east-1 for global resources
(IAM, S3, Route53, etc.) in the commercial partition. Caller
should provide the correct region for regional resources.
resource_id: AWS resource ID (e.g., sg-1234567890abcdef0)
resource_uid: AWS resource ARN (unique identifier)
Returns:
List of timeline event dictionaries
Raises:
ValueError: If neither resource_id nor resource_uid is provided
ClientError: If AWS API call fails
"""
resource_identifier = resource_uid or resource_id
if not resource_identifier:
raise ValueError("Either resource_id or resource_uid must be provided")
region = region or self.DEFAULT_REGION
try:
raw_events = self._lookup_events(resource_identifier, region)
events = []
for raw_event in raw_events:
# Filter read-only events if write_events_only is enabled
if self._write_events_only:
event_name = raw_event.get("EventName", "")
if self._is_read_only_event(event_name):
continue
parsed = self._parse_event(raw_event)
if parsed:
events.append(parsed)
return events
except ClientError as e:
logger.error(
f"CloudTrail timeline error for {resource_identifier} in {region}: "
f"{e.response['Error']['Code']} - {e.response['Error']['Message']}"
)
raise
except Exception as e:
lineno = e.__traceback__.tb_lineno if e.__traceback__ else "?"
logger.error(
f"CloudTrail timeline unexpected error: "
f"{e.__class__.__name__}[{lineno}]: {e}"
)
return []
def _is_read_only_event(self, event_name: str) -> bool:
"""Check if an event is a read-only operation."""
return event_name.startswith(self.READ_ONLY_PREFIXES)
def _get_client(self, region: str):
"""Get or create a CloudTrail client for the specified region."""
if region not in self._clients:
self._clients[region] = self._session.client(
"cloudtrail", region_name=region
)
return self._clients[region]
def _lookup_events(
self, resource_identifier: str, region: str
) -> List[Dict[str, Any]]:
"""Query CloudTrail for events related to a specific resource.
Uses MaxResults to limit the number of events returned, preparing
for API-level pagination. Currently returns up to max_results events
from the first page only.
"""
client = self._get_client(region)
start_time = datetime.now(timezone.utc) - timedelta(days=self._lookback_days)
# Use direct API call with MaxResults instead of paginator
# This limits CloudTrail to return only max_results events
response = client.lookup_events(
LookupAttributes=[
{"AttributeKey": "ResourceName", "AttributeValue": resource_identifier}
],
StartTime=start_time,
MaxResults=self._max_results,
)
return response.get("Events", [])
def _parse_event(self, raw_event: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""Parse a raw CloudTrail event into a TimelineEvent dictionary."""
try:
cloud_trail_event = raw_event.get("CloudTrailEvent", "{}")
if isinstance(cloud_trail_event, str):
details = json.loads(cloud_trail_event)
else:
details = cloud_trail_event
user_identity = details.get("userIdentity", {})
event = TimelineEvent(
event_id=raw_event.get("EventId"),
event_time=raw_event["EventTime"],
event_name=raw_event.get("EventName", "Unknown"),
event_source=raw_event.get("EventSource", "Unknown"),
actor=self._extract_actor(user_identity),
actor_uid=user_identity.get("arn"),
actor_type=user_identity.get("type"),
source_ip_address=details.get("sourceIPAddress"),
user_agent=details.get("userAgent"),
request_data=details.get("requestParameters"),
response_data=details.get("responseElements"),
error_code=details.get("errorCode"),
error_message=details.get("errorMessage"),
)
return event.dict()
except Exception as e:
logger.warning(
f"CloudTrail timeline: failed to parse event: "
f"{e.__class__.__name__}: {e}"
)
return None
@staticmethod
def _extract_actor(user_identity: Dict[str, Any]) -> str:
"""Extract a human-readable actor name from CloudTrail userIdentity."""
# Try ARN first - most reliable
if arn := user_identity.get("arn"):
if "/" in arn:
parts = arn.split("/")
# For assumed-role, return the role name (second-to-last part)
if "assumed-role" in arn and len(parts) >= 2:
return parts[-2]
return parts[-1]
return arn.split(":")[-1]
# Fall back to userName
if username := user_identity.get("userName"):
return username
# Fall back to principalId
if principal_id := user_identity.get("principalId"):
return principal_id
# For service-invoked actions
if invoking_service := user_identity.get("invokedBy"):
return invoking_service
return "Unknown"
View File
+163
View File
@@ -0,0 +1,163 @@
from datetime import datetime, timezone
import pytest
from prowler.lib.timeline.models import TimelineEvent
class TestTimelineEvent:
"""Tests for TimelineEvent model."""
def test_minimal_event(self):
"""Test creating an event with only required fields."""
event = TimelineEvent(
event_id="test-event-id-123",
event_time=datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
event_name="CreateResource",
event_source="service.example.com",
actor="user@example.com",
actor_type="User",
)
assert event.event_id == "test-event-id-123"
assert event.event_time == datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc)
assert event.event_name == "CreateResource"
assert event.event_source == "service.example.com"
assert event.actor == "user@example.com"
assert event.actor_type == "User"
# Optional fields should be None
assert event.actor_uid is None
assert event.source_ip_address is None
assert event.user_agent is None
assert event.request_data is None
assert event.response_data is None
assert event.error_code is None
assert event.error_message is None
def test_full_event(self):
"""Test creating an event with all fields populated."""
event = TimelineEvent(
event_id="full-event-id-456",
event_time=datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
event_name="ModifyResource",
event_source="storage.example.com",
actor="admin-role",
actor_uid="arn:aws:sts::123456789012:assumed-role/admin-role/session",
actor_type="AssumedRole",
source_ip_address="192.168.1.100",
user_agent="aws-cli/2.0.0",
request_data={"bucket": "my-bucket", "acl": "private"},
response_data={"status": "success"},
error_code=None,
error_message=None,
)
assert event.event_id == "full-event-id-456"
assert (
event.actor_uid
== "arn:aws:sts::123456789012:assumed-role/admin-role/session"
)
assert event.source_ip_address == "192.168.1.100"
assert event.user_agent == "aws-cli/2.0.0"
assert event.request_data == {"bucket": "my-bucket", "acl": "private"}
assert event.response_data == {"status": "success"}
def test_error_event(self):
"""Test creating an event that represents a failed operation."""
event = TimelineEvent(
event_id="error-event-id-789",
event_time=datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
event_name="DeleteResource",
event_source="storage.example.com",
actor="unauthorized-user",
actor_type="User",
error_code="AccessDenied",
error_message="User does not have permission to delete this resource",
)
assert event.error_code == "AccessDenied"
assert (
event.error_message
== "User does not have permission to delete this resource"
)
def test_event_to_dict(self):
"""Test that event can be serialized to dictionary."""
event = TimelineEvent(
event_id="dict-test-id",
event_time=datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
event_name="CreateResource",
event_source="service.example.com",
actor="user@example.com",
actor_type="User",
)
event_dict = event.dict()
assert event_dict["event_id"] == "dict-test-id"
assert event_dict["event_name"] == "CreateResource"
assert event_dict["actor"] == "user@example.com"
assert event_dict["actor_type"] == "User"
def test_event_from_dict(self):
"""Test creating an event from a dictionary."""
data = {
"event_id": "from-dict-id",
"event_time": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"event_name": "UpdateResource",
"event_source": "compute.example.com",
"actor": "service-account",
"actor_type": "ServiceAccount",
}
event = TimelineEvent(**data)
assert event.event_id == "from-dict-id"
assert event.event_name == "UpdateResource"
assert event.actor == "service-account"
assert event.actor_type == "ServiceAccount"
def test_required_fields_validation(self):
"""Test that missing required fields raise validation error."""
with pytest.raises(Exception): # Pydantic validation error
TimelineEvent(
event_id="validation-test",
event_time=datetime.now(timezone.utc),
event_name="CreateResource",
# Missing: event_source, actor, actor_type
)
def test_actor_types_are_flexible(self):
"""Test that actor_type accepts any string value (provider-agnostic)."""
# AWS-style
aws_event = TimelineEvent(
event_id="aws-event-id",
event_time=datetime.now(timezone.utc),
event_name="CreateBucket",
event_source="s3.amazonaws.com",
actor="arn:aws:iam::123456789012:user/admin",
actor_type="IAMUser",
)
assert aws_event.actor_type == "IAMUser"
# Azure-style
azure_event = TimelineEvent(
event_id="azure-event-id",
event_time=datetime.now(timezone.utc),
event_name="CreateStorageAccount",
event_source="Microsoft.Storage",
actor="user@contoso.com",
actor_type="User",
)
assert azure_event.actor_type == "User"
# GCP-style
gcp_event = TimelineEvent(
event_id="gcp-event-id",
event_time=datetime.now(timezone.utc),
event_name="storage.buckets.create",
event_source="storage.googleapis.com",
actor="service-account@project.iam.gserviceaccount.com",
actor_type="serviceAccount",
)
assert gcp_event.actor_type == "serviceAccount"
+144
View File
@@ -0,0 +1,144 @@
"""Tests for prowler.lib.timeline.timeline module."""
from typing import Any, Dict, List, Optional
import pytest
from prowler.lib.timeline.timeline import TimelineService
class ConcreteTimelineService(TimelineService):
"""Concrete implementation for testing the abstract base class."""
def __init__(self, mock_events: Optional[List[Dict[str, Any]]] = None):
self.mock_events = mock_events or []
self.last_call_args = None
def get_resource_timeline(
self,
region: Optional[str] = None,
resource_id: Optional[str] = None,
resource_uid: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""Return mock events for testing."""
if not resource_id and not resource_uid:
raise ValueError("Either resource_id or resource_uid must be provided")
self.last_call_args = {
"region": region,
"resource_id": resource_id,
"resource_uid": resource_uid,
}
return self.mock_events
class TestTimelineServiceAbstract:
"""Tests for TimelineService abstract base class."""
def test_cannot_instantiate_abstract_class(self):
"""Test that TimelineService cannot be instantiated directly."""
with pytest.raises(TypeError) as exc_info:
TimelineService()
assert "abstract" in str(exc_info.value).lower()
def test_concrete_implementation_can_be_instantiated(self):
"""Test that a concrete implementation can be instantiated."""
service = ConcreteTimelineService()
assert service is not None
def test_get_resource_timeline_with_resource_id(self):
"""Test calling get_resource_timeline with resource_id."""
service = ConcreteTimelineService(mock_events=[{"event": "test"}])
result = service.get_resource_timeline(
region="us-east-1", resource_id="res-123"
)
assert result == [{"event": "test"}]
assert service.last_call_args["region"] == "us-east-1"
assert service.last_call_args["resource_id"] == "res-123"
assert service.last_call_args["resource_uid"] is None
def test_get_resource_timeline_with_resource_uid(self):
"""Test calling get_resource_timeline with resource_uid."""
service = ConcreteTimelineService(mock_events=[{"event": "test"}])
result = service.get_resource_timeline(
region="eu-west-1",
resource_uid="arn:aws:s3:::my-bucket",
)
assert result == [{"event": "test"}]
assert service.last_call_args["region"] == "eu-west-1"
assert service.last_call_args["resource_id"] is None
assert service.last_call_args["resource_uid"] == "arn:aws:s3:::my-bucket"
def test_get_resource_timeline_with_both_identifiers(self):
"""Test calling get_resource_timeline with both resource_id and resource_uid."""
service = ConcreteTimelineService(mock_events=[])
service.get_resource_timeline(
region="ap-south-1",
resource_id="res-123",
resource_uid="arn:aws:ec2:ap-south-1:123456789012:instance/i-12345",
)
assert service.last_call_args["resource_id"] == "res-123"
assert (
service.last_call_args["resource_uid"]
== "arn:aws:ec2:ap-south-1:123456789012:instance/i-12345"
)
def test_get_resource_timeline_missing_identifiers_raises(self):
"""Test that missing both identifiers raises ValueError."""
service = ConcreteTimelineService()
with pytest.raises(ValueError) as exc_info:
service.get_resource_timeline(region="us-west-2")
assert "resource_id" in str(exc_info.value) or "resource_uid" in str(
exc_info.value
)
def test_return_type_is_list_of_dicts(self):
"""Test that get_resource_timeline returns list of dicts (not TimelineEvent)."""
# The abstract interface returns list[dict] to allow flexibility
# Concrete implementations convert to TimelineEvent as needed
mock_events = [
{"event_name": "CreateBucket", "actor": "user1"},
{"event_name": "PutBucketPolicy", "actor": "user2"},
]
service = ConcreteTimelineService(mock_events=mock_events)
result = service.get_resource_timeline(
region="us-east-1", resource_id="my-bucket"
)
assert isinstance(result, list)
assert all(isinstance(event, dict) for event in result)
class TestTimelineServiceInheritance:
"""Tests for proper inheritance of TimelineService."""
def test_is_abstract_base_class(self):
"""Test that TimelineService is an ABC."""
from abc import ABC
assert issubclass(TimelineService, ABC)
def test_get_resource_timeline_is_abstract(self):
"""Test that get_resource_timeline is an abstract method."""
method = getattr(TimelineService, "get_resource_timeline")
assert getattr(method, "__isabstractmethod__", False)
def test_subclass_must_implement_abstract_method(self):
"""Test that subclass without implementation cannot be instantiated."""
class IncompleteService(TimelineService):
"""Subclass that doesn't implement abstract methods."""
with pytest.raises(TypeError):
IncompleteService()
@@ -0,0 +1,608 @@
import json
from datetime import datetime, timezone
from unittest.mock import MagicMock
import pytest
from botocore.exceptions import ClientError
from prowler.providers.aws.lib.cloudtrail_timeline.cloudtrail_timeline import (
CloudTrailTimeline,
)
class TestCloudTrailTimeline:
@pytest.fixture
def mock_session(self):
return MagicMock()
@pytest.fixture
def sample_cloudtrail_event(self):
return {
"EventId": "a1b2c3d4-e5f6-7890-abcd-ef1234567890",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
"userName": "admin",
},
"sourceIPAddress": "203.0.113.1",
"userAgent": "aws-cli/2.0.0",
"requestParameters": {"instanceType": "t3.micro"},
"responseElements": {
"instancesSet": {"items": [{"instanceId": "i-1234"}]}
},
}
),
}
def test_init_default_lookback(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._lookback_days == 90
def test_init_custom_lookback(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session, lookback_days=30)
assert timeline._lookback_days == 30
def test_init_lookback_capped_at_max(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session, lookback_days=365)
assert timeline._lookback_days == 90
def test_init_default_max_results(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._max_results == 50
def test_init_custom_max_results(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session, max_results=10)
assert timeline._max_results == 10
def test_init_default_write_events_only(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._write_events_only is True
def test_init_write_events_only_disabled(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session, write_events_only=False)
assert timeline._write_events_only is False
def test_get_resource_timeline_defaults_to_us_east_1(self, mock_session):
"""When no region is provided, should default to us-east-1 for global resources."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
timeline.get_resource_timeline(
resource_uid="arn:aws:iam::123456789012:user/admin"
)
# Verify us-east-1 was used as the default region
mock_session.client.assert_called_with("cloudtrail", region_name="us-east-1")
def test_get_resource_timeline_missing_identifier_raises(self, mock_session):
timeline = CloudTrailTimeline(session=mock_session)
with pytest.raises(ValueError, match="Either resource_id or resource_uid"):
timeline.get_resource_timeline(region="us-east-1")
def test_get_resource_timeline_with_resource_id(
self, mock_session, sample_cloudtrail_event
):
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": [sample_cloudtrail_event]}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1", resource_id="i-1234567890abcdef0"
)
assert len(result) == 1
assert result[0]["event_name"] == "RunInstances"
assert result[0]["actor"] == "admin"
assert result[0]["source_ip_address"] == "203.0.113.1"
def test_get_resource_timeline_with_resource_uid(
self, mock_session, sample_cloudtrail_event
):
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": [sample_cloudtrail_event]}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1",
resource_uid="arn:aws:ec2:us-east-1:123456789012:instance/i-1234567890abcdef0",
)
assert len(result) == 1
assert result[0]["event_name"] == "RunInstances"
def test_get_resource_timeline_prefers_uid_over_id(self, mock_session):
"""When both resource_id and resource_uid are provided, UID should be used."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
timeline.get_resource_timeline(
region="us-east-1",
resource_id="i-1234",
resource_uid="arn:aws:ec2:us-east-1:123:instance/i-1234",
)
# Verify UID was used in the lookup
call_args = mock_client.lookup_events.call_args
lookup_attrs = call_args.kwargs["LookupAttributes"]
assert (
lookup_attrs[0]["AttributeValue"]
== "arn:aws:ec2:us-east-1:123:instance/i-1234"
)
def test_get_resource_timeline_client_error(self, mock_session):
mock_client = MagicMock()
mock_client.lookup_events.side_effect = ClientError(
{"Error": {"Code": "AccessDenied", "Message": "Access denied"}},
"LookupEvents",
)
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
with pytest.raises(ClientError):
timeline.get_resource_timeline(region="us-east-1", resource_id="i-1234")
def test_get_resource_timeline_multiple_events(self, mock_session):
events = [
{
"EventId": "event-1-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
}
}
),
},
{
"EventId": "event-2-id",
"EventTime": datetime(2024, 1, 15, 11, 30, 0, tzinfo=timezone.utc),
"EventName": "StopInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/ops",
}
}
),
},
]
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": events}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1", resource_id="i-1234"
)
assert len(result) == 2
assert result[0]["event_name"] == "RunInstances"
assert result[1]["event_name"] == "StopInstances"
def test_get_resource_timeline_uses_max_results(self, mock_session):
"""Verify MaxResults is passed to lookup_events."""
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": []}
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session, max_results=25)
timeline.get_resource_timeline(region="us-east-1", resource_id="i-1234")
# Verify MaxResults was passed to lookup_events
call_args = mock_client.lookup_events.call_args
assert call_args.kwargs["MaxResults"] == 25
def test_get_resource_timeline_filters_read_only_events(self, mock_session):
"""Verify read-only events are filtered when write_events_only=True."""
events = [
{
"EventId": "write-event-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "CreateSecurityGroup",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{"userIdentity": {"type": "IAMUser", "userName": "admin"}}
),
},
{
"EventId": "read-event-id",
"EventTime": datetime(2024, 1, 15, 10, 31, 0, tzinfo=timezone.utc),
"EventName": "DescribeSecurityGroups",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{"userIdentity": {"type": "IAMUser", "userName": "admin"}}
),
},
{
"EventId": "another-read-id",
"EventTime": datetime(2024, 1, 15, 10, 32, 0, tzinfo=timezone.utc),
"EventName": "GetSecurityGroupRules",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{"userIdentity": {"type": "IAMUser", "userName": "admin"}}
),
},
]
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": events}
mock_session.client.return_value = mock_client
# Default: write_events_only=True
timeline = CloudTrailTimeline(session=mock_session)
result = timeline.get_resource_timeline(
region="us-east-1", resource_id="sg-123"
)
# Only the write event should be returned
assert len(result) == 1
assert result[0]["event_name"] == "CreateSecurityGroup"
def test_get_resource_timeline_includes_read_events_when_disabled(
self, mock_session
):
"""Verify all events returned when write_events_only=False."""
events = [
{
"EventId": "write-event-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "CreateSecurityGroup",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{"userIdentity": {"type": "IAMUser", "userName": "admin"}}
),
},
{
"EventId": "read-event-id",
"EventTime": datetime(2024, 1, 15, 10, 31, 0, tzinfo=timezone.utc),
"EventName": "DescribeSecurityGroups",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{"userIdentity": {"type": "IAMUser", "userName": "admin"}}
),
},
]
mock_client = MagicMock()
mock_client.lookup_events.return_value = {"Events": events}
mock_session.client.return_value = mock_client
# Disable filtering
timeline = CloudTrailTimeline(session=mock_session, write_events_only=False)
result = timeline.get_resource_timeline(
region="us-east-1", resource_id="sg-123"
)
# All events should be returned
assert len(result) == 2
assert result[0]["event_name"] == "CreateSecurityGroup"
assert result[1]["event_name"] == "DescribeSecurityGroups"
class TestExtractActor:
def test_extract_actor_iam_user(self):
user_identity = {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/alice",
"userName": "alice",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "alice"
def test_extract_actor_assumed_role(self):
user_identity = {
"type": "AssumedRole",
"arn": "arn:aws:sts::123456789012:assumed-role/MyRole/session-name",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "MyRole"
def test_extract_actor_root(self):
user_identity = {"type": "Root", "arn": "arn:aws:iam::123456789012:root"}
assert CloudTrailTimeline._extract_actor(user_identity) == "root"
def test_extract_actor_service(self):
user_identity = {
"type": "AWSService",
"invokedBy": "elasticloadbalancing.amazonaws.com",
}
assert (
CloudTrailTimeline._extract_actor(user_identity)
== "elasticloadbalancing.amazonaws.com"
)
def test_extract_actor_fallback_to_principal_id(self):
user_identity = {"type": "Unknown", "principalId": "AROAEXAMPLEID:session"}
assert (
CloudTrailTimeline._extract_actor(user_identity) == "AROAEXAMPLEID:session"
)
def test_extract_actor_unknown(self):
assert CloudTrailTimeline._extract_actor({}) == "Unknown"
def test_extract_actor_federated_user(self):
user_identity = {
"type": "FederatedUser",
"arn": "arn:aws:sts::123456789012:federated-user/developer",
}
assert CloudTrailTimeline._extract_actor(user_identity) == "developer"
class TestParseEvent:
@pytest.fixture
def mock_session(self):
return MagicMock()
@pytest.fixture
def sample_cloudtrail_event(self):
return {
"EventId": "b2c3d4e5-f6a7-8901-bcde-f23456789012",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {
"type": "IAMUser",
"arn": "arn:aws:iam::123456789012:user/admin",
"userName": "admin",
},
"sourceIPAddress": "203.0.113.1",
"userAgent": "aws-cli/2.0.0",
"requestParameters": {"instanceType": "t3.micro"},
"responseElements": {
"instancesSet": {"items": [{"instanceId": "i-1234"}]}
},
}
),
}
def test_parse_event_success(self, mock_session, sample_cloudtrail_event):
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(sample_cloudtrail_event)
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["event_source"] == "ec2.amazonaws.com"
assert result["actor"] == "admin"
assert result["actor_uid"] == "arn:aws:iam::123456789012:user/admin"
assert result["actor_type"] == "IAMUser"
def test_parse_event_malformed_json(self, mock_session):
event = {
"EventId": "malformed-event-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": "not valid json",
}
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._parse_event(event) is None
def test_parse_event_with_error_fields(self, mock_session):
event = {
"EventId": "error-event-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "CreateBucket",
"EventSource": "s3.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {"type": "IAMUser", "userName": "developer"},
"errorCode": "AccessDenied",
"errorMessage": "Access Denied",
}
),
}
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(event)
assert result is not None
assert result["error_code"] == "AccessDenied"
assert result["error_message"] == "Access Denied"
def test_parse_event_dict_cloud_trail_event(self, mock_session):
"""Test parsing when CloudTrailEvent is already a dict (not JSON string)."""
event = {
"EventId": "dict-event-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": {
"userIdentity": {"type": "IAMUser", "userName": "admin"},
},
}
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(event)
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
def test_parse_event_missing_event_id(self, mock_session):
"""Test parsing event without EventId returns None (event_id is required)."""
event = {
# Missing EventId
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{"userIdentity": {"type": "IAMUser", "userName": "admin"}}
),
}
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(event)
# Should return None because event_id is required by TimelineEvent model
assert result is None
def test_parse_event_uses_request_data_and_response_data_fields(self, mock_session):
"""Test that parsed event uses request_data and response_data field names."""
event = {
"EventId": "field-names-test-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "CreateBucket",
"EventSource": "s3.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {"type": "IAMUser", "userName": "admin"},
"requestParameters": {"bucketName": "my-bucket", "acl": "private"},
"responseElements": {
"location": "http://my-bucket.s3.amazonaws.com"
},
}
),
}
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(event)
assert result is not None
# Verify field names are request_data/response_data (not request_parameters/response_elements)
assert "request_data" in result
assert "response_data" in result
assert "request_parameters" not in result
assert "response_elements" not in result
# Verify the data is correctly mapped
assert result["request_data"] == {"bucketName": "my-bucket", "acl": "private"}
assert result["response_data"] == {
"location": "http://my-bucket.s3.amazonaws.com"
}
def test_parse_event_missing_actor_type(self, mock_session):
"""Test parsing event where userIdentity has no type field."""
event = {
"EventId": "no-actor-type-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "RunInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {
# No "type" field
"arn": "arn:aws:iam::123456789012:user/admin",
"userName": "admin",
},
"sourceIPAddress": "203.0.113.1",
}
),
}
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(event)
assert result is not None
assert result["event_name"] == "RunInstances"
assert result["actor"] == "admin"
# actor_type should be None when not present in userIdentity
assert result["actor_type"] is None
def test_parse_event_empty_request_response(self, mock_session):
"""Test parsing event with no requestParameters or responseElements."""
event = {
"EventId": "no-params-id",
"EventTime": datetime(2024, 1, 15, 10, 30, 0, tzinfo=timezone.utc),
"EventName": "DescribeInstances",
"EventSource": "ec2.amazonaws.com",
"CloudTrailEvent": json.dumps(
{
"userIdentity": {"type": "IAMUser", "userName": "reader"},
# No requestParameters or responseElements
}
),
}
timeline = CloudTrailTimeline(session=mock_session)
result = timeline._parse_event(event)
assert result is not None
assert result["request_data"] is None
assert result["response_data"] is None
class TestClientCaching:
def test_client_cached_per_region(self):
mock_session = MagicMock()
mock_client = MagicMock()
mock_session.client.return_value = mock_client
timeline = CloudTrailTimeline(session=mock_session)
# Get client twice for same region
client1 = timeline._get_client("us-east-1")
client2 = timeline._get_client("us-east-1")
# Should only create client once
assert mock_session.client.call_count == 1
assert client1 is client2
def test_different_clients_per_region(self):
mock_session = MagicMock()
timeline = CloudTrailTimeline(session=mock_session)
timeline._get_client("us-east-1")
timeline._get_client("eu-west-1")
# Should create client for each region
assert mock_session.client.call_count == 2
class TestIsReadOnlyEvent:
"""Tests for _is_read_only_event method."""
@pytest.fixture
def mock_session(self):
return MagicMock()
@pytest.mark.parametrize(
"event_name",
[
"DescribeSecurityGroups",
"GetBucketPolicy",
"ListBuckets",
"HeadObject",
"CheckAccessNotGranted",
"LookupEvents",
"SearchResources",
"ScanOnDemand",
"QueryObjects",
"BatchGetItem",
"SelectObjectContent",
],
)
def test_read_only_events_detected(self, mock_session, event_name):
"""Verify various read-only event prefixes are correctly identified."""
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._is_read_only_event(event_name) is True
@pytest.mark.parametrize(
"event_name",
[
"CreateSecurityGroup",
"DeleteSecurityGroup",
"ModifySecurityGroupRules",
"PutBucketPolicy",
"RunInstances",
"TerminateInstances",
"UpdateFunction",
"AttachRolePolicy",
"AuthorizeSecurityGroupIngress",
],
)
def test_write_events_not_filtered(self, mock_session, event_name):
"""Verify write events are not marked as read-only."""
timeline = CloudTrailTimeline(session=mock_session)
assert timeline._is_read_only_event(event_name) is False