fix(api): avoid mutating API key manager during auth (#11686)

This commit is contained in:
Josema Camacho
2026-06-24 16:50:55 +02:00
committed by GitHub
parent 917e5d07ff
commit 4e00cfd1b6
3 changed files with 100 additions and 9 deletions
+8
View File
@@ -2,6 +2,14 @@
All notable changes to the **Prowler API** are documented in this file.
## [1.32.1] (Prowler UNRELEASED)
### 🐞 Fixed
- API key auth no longer mutates `TenantAPIKey.objects` during admin DB lookups [(#11686)](https://github.com/prowler-cloud/prowler/pull/11686)
---
## [1.32.0] (Prowler v5.31.0)
### 🚀 Added
+43 -9
View File
@@ -1,11 +1,14 @@
from math import isfinite
from uuid import UUID
from api.db_router import MainRouter
from api.models import TenantAPIKey, TenantAPIKeyManager
from cryptography.fernet import InvalidToken
from django.core.exceptions import ObjectDoesNotExist
from django.utils import timezone
from drf_simple_apikey.backends import APIKeyAuthentication as BaseAPIKeyAuth
from drf_simple_apikey.crypto import get_crypto
from drf_simple_apikey.settings import package_settings
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.request import Request
@@ -21,18 +24,49 @@ class TenantAPIKeyAuthentication(BaseAPIKeyAuth):
def _authenticate_credentials(self, request, key):
"""
Override to use admin connection, bypassing RLS during authentication.
Delegates to parent after temporarily routing model queries to admin DB.
"""
# Temporarily point the model's manager to admin database
original_objects = self.model.objects
self.model.objects = self.model.objects.using(MainRouter.admin_db)
try:
payload = self.key_crypto.decrypt(key)
except ValueError:
raise AuthenticationFailed("Invalid API Key.")
if not isinstance(payload, dict):
raise AuthenticationFailed("Invalid API Key.")
payload_pk = payload.get("_pk")
payload_exp = payload.get("_exp")
if (
not isinstance(payload_pk, str)
or isinstance(payload_exp, bool)
or not isinstance(payload_exp, (int, float))
or not isfinite(payload_exp)
):
raise AuthenticationFailed("Invalid API Key.")
try:
# Call parent method which will now use admin database
return super()._authenticate_credentials(request, key)
finally:
# Restore original manager
self.model.objects = original_objects
api_key_pk = UUID(payload_pk)
except ValueError:
raise AuthenticationFailed("Invalid API Key.")
if payload_exp < timezone.now().timestamp():
raise AuthenticationFailed("API Key has already expired.")
try:
api_key = self.model.objects.using(MainRouter.admin_db).get(id=api_key_pk)
except ObjectDoesNotExist:
raise AuthenticationFailed("No entity matching this api key.")
if api_key.revoked:
raise AuthenticationFailed("This API Key has been revoked.")
client_ip = request.META.get(package_settings.IP_ADDRESS_HEADER)
if api_key.blacklisted_ips and client_ip in api_key.blacklisted_ips:
raise AuthenticationFailed("Access denied from blacklisted IP.")
if api_key.whitelisted_ips and client_ip not in api_key.whitelisted_ips:
raise AuthenticationFailed("Access restricted to specific IP addresses.")
return api_key.entity, key
def authenticate(self, request: Request):
prefixed_key = self.get_key(request)
@@ -7,6 +7,7 @@ import pytest
from api.authentication import SSEAuthentication, TenantAPIKeyAuthentication
from api.db_router import MainRouter
from api.models import TenantAPIKey
from django.db.models.query import QuerySet
from django.test import RequestFactory
from rest_framework.exceptions import AuthenticationFailed
@@ -64,6 +65,54 @@ class TestTenantAPIKeyAuthentication:
# Verify the manager was restored
assert TenantAPIKey.objects == original_manager
def test_authenticate_credentials_keeps_manager_during_lookup(
self, auth_backend, api_keys_fixture, request_factory
):
"""Authentication must not expose a QuerySet as the model manager."""
api_key = api_keys_fixture[0]
raw_key = api_key._raw_key
_, encrypted_key = raw_key.split(TenantAPIKey.objects.separator, 1)
original_get = QuerySet.get
manager_has_create_api_key = []
def observe_manager(queryset, *args, **kwargs):
manager_has_create_api_key.append(
hasattr(TenantAPIKey.objects, "create_api_key")
)
return original_get(queryset, *args, **kwargs)
request = request_factory.get("/")
with patch.object(QuerySet, "get", observe_manager):
auth_backend._authenticate_credentials(request, encrypted_key)
assert manager_has_create_api_key
assert all(manager_has_create_api_key)
@pytest.mark.parametrize(
"payload",
[
{"_pk": str(uuid4()), "_exp": "not-a-timestamp"},
{
"_pk": "not-a-uuid",
"_exp": (datetime.now(UTC) + timedelta(days=1)).timestamp(),
},
{"_pk": str(uuid4()), "_exp": True},
],
)
def test_authenticate_credentials_rejects_malformed_payloads(
self, auth_backend, request_factory, payload
):
"""Malformed decrypted payloads fail as authentication errors."""
request = request_factory.get("/")
encrypted_key = auth_backend.key_crypto.generate(payload)
with pytest.raises(AuthenticationFailed) as exc_info:
auth_backend._authenticate_credentials(request, encrypted_key)
assert str(exc_info.value.detail) == "Invalid API Key."
def test_authenticate_credentials_restores_manager_on_exception(
self, auth_backend, request_factory
):