fix(api): handle AccessDenied during AssumeRole in events endpoint (#9899)

This commit is contained in:
Pepe Fagoaga
2026-01-27 15:32:51 +01:00
committed by GitHub
parent 727fafb147
commit b2c18b69ee
2 changed files with 76 additions and 5 deletions
+62
View File
@@ -4851,6 +4851,68 @@ class TestResourceViewSet:
assert error["status"] == "503" # Must be string per JSON:API spec
assert "detail" in error
@patch("api.v1.views.initialize_prowler_provider")
def test_events_assume_role_access_denied(
self,
mock_initialize_provider,
authenticated_client,
providers_fixture,
):
"""Test events handles AWSAssumeRoleError during provider init.
This tests the scenario from CLOUD-API-3HJ where the API task role
cannot assume the customer's ProwlerScan role due to IAM permissions.
The error happens during initialize_prowler_provider, which wraps
the ClientError in AWSAssumeRoleError.
"""
from api.models import Resource
from prowler.providers.aws.exceptions.exceptions import AWSAssumeRoleError
aws_provider = providers_fixture[0] # AWS provider from fixture
resource = Resource.objects.create(
uid="arn:aws:lambda:eu-west-1:123456789012:function:assume-role-test",
name="AssumeRole Test Function",
type="function",
region="eu-west-1",
service="lambda",
provider=aws_provider,
tenant_id=aws_provider.tenant_id,
)
# Mock initialize_prowler_provider raising AWSAssumeRoleError
# (this is what aws_provider.py actually raises when AssumeRole fails)
original_error = ClientError(
{
"Error": {
"Code": "AccessDenied",
"Message": (
"User: arn:aws:sts::123456789012:assumed-role/api-task-role/xxx "
"is not authorized to perform: sts:AssumeRole on resource: "
"arn:aws:iam::123456789012:role/ProwlerScan"
),
}
},
"AssumeRole",
)
mock_initialize_provider.side_effect = AWSAssumeRoleError(
original_exception=original_error,
file="aws_provider.py",
)
response = authenticated_client.get(
reverse("resource-events", kwargs={"pk": resource.id})
)
# AWSAssumeRoleError returns 502 (upstream auth failure)
assert response.status_code == status.HTTP_502_BAD_GATEWAY
# Verify JSON:API error structure
error = response.json()["errors"][0]
assert error["code"] == "upstream_access_denied"
assert error["status"] == "502"
assert "detail" in error
def test_events_unauthenticated_returns_401(self, providers_fixture):
"""Test events endpoint returns 401 when no credentials are provided.
+14 -5
View File
@@ -284,7 +284,10 @@ from api.v1.serializers import (
UserSerializer,
UserUpdateSerializer,
)
from prowler.providers.aws.exceptions.exceptions import AWSCredentialsError
from prowler.providers.aws.exceptions.exceptions import (
AWSAssumeRoleError,
AWSCredentialsError,
)
from prowler.providers.aws.lib.cloudtrail_timeline.cloudtrail_timeline import (
CloudTrailTimeline,
)
@@ -3044,19 +3047,25 @@ class ResourceViewSet(PaginateByPkMixin, BaseRLSViewSet):
raise UpstreamAuthenticationError(
detail="Credentials not found for this provider. Please reconnect the provider."
)
except AWSAssumeRoleError:
# AssumeRole failed - usually IAM permission issue (not authorized to sts:AssumeRole)
raise UpstreamAccessDeniedError(
detail="Cannot assume role for this provider. Check IAM Role permissions and trust relationship."
)
except AWSCredentialsError:
# Handles expired tokens, invalid keys, profile not found, etc.
raise UpstreamAuthenticationError()
except ClientError as e:
error_code = e.response.get("Error", {}).get("Code", "")
# AccessDenied is expected when credentials lack permissions - don't log as error
if error_code in ("AccessDenied", "AccessDeniedException"):
raise UpstreamAccessDeniedError()
# Unexpected ClientErrors should be logged for debugging
logger.error(
f"Provider API error retrieving events: {str(e)}",
exc_info=True,
)
# AccessDenied means the credentials don't have the required permissions
if error_code in ("AccessDenied", "AccessDeniedException"):
raise UpstreamAccessDeniedError()
raise UpstreamServiceUnavailableError()
except Exception as e:
sentry_sdk.capture_exception(e)