feat(api-keys): Add API Key support for the Prowler API (#8805)

This commit is contained in:
Víctor Fernández Poyatos
2025-10-03 13:42:43 +02:00
committed by GitHub
parent 04177db648
commit 9a4fc784db
26 changed files with 2909 additions and 141 deletions
+1
View File
@@ -7,6 +7,7 @@ All notable changes to the **Prowler API** are documented in this file.
### Added
- Default JWT keys are generated and stored if they are missing from configuration [(#8655)](https://github.com/prowler-cloud/prowler/pull/8655)
- `compliance_name` for each compliance [(#7920)](https://github.com/prowler-cloud/prowler/pull/7920)
- API Key support [(#8805)](https://github.com/prowler-cloud/prowler/pull/8805)
### Changed
- Now the MANAGE_ACCOUNT permission is required to modify or read user permissions instead of MANAGE_USERS [(#8281)](https://github.com/prowler-cloud/prowler/pull/8281)
+23 -7
View File
@@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 2.1.4 and should not be changed by hand.
# This file is automatically @generated by Poetry 2.1.1 and should not be changed by hand.
[[package]]
name = "about-time"
@@ -1924,6 +1924,27 @@ files = [
Django = ">=4.2"
djangorestframework = ">=3.15.0"
[[package]]
name = "drf-simple-apikey"
version = "2.2.1"
description = "API Key authentication and permissions for Django REST."
optional = false
python-versions = "*"
groups = ["main"]
files = [
{file = "drf_simple_apikey-2.2.1-py2.py3-none-any.whl", hash = "sha256:2a60b35676d14f907c47dee179dd0fa7425a84c34d6ff5b48d08d3b87ff32809"},
{file = "drf_simple_apikey-2.2.1.tar.gz", hash = "sha256:e5a52804bbac12c8db80c10a3d51a8514fc59fc8385b5e751099a2bc944ad25d"},
]
[package.dependencies]
cryptography = ">=38.0.4"
django = ">=4.2"
djangorestframework = ">=3.14.0"
[package.extras]
test = ["coverage", "pytest", "pytest-django"]
tooling = ["black (==22.3.0)", "bump2version", "pylint"]
[[package]]
name = "drf-spectacular"
version = "0.27.2"
@@ -5296,7 +5317,6 @@ files = [
{file = "ruamel.yaml.clib-0.2.12-cp310-cp310-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f66efbc1caa63c088dead1c4170d148eabc9b80d95fb75b6c92ac0aad2437d76"},
{file = "ruamel.yaml.clib-0.2.12-cp310-cp310-musllinux_1_1_i686.whl", hash = "sha256:22353049ba4181685023b25b5b51a574bce33e7f51c759371a7422dcae5402a6"},
{file = "ruamel.yaml.clib-0.2.12-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:932205970b9f9991b34f55136be327501903f7c66830e9760a8ffb15b07f05cd"},
{file = "ruamel.yaml.clib-0.2.12-cp310-cp310-musllinux_1_2_aarch64.whl", hash = "sha256:a52d48f4e7bf9005e8f0a89209bf9a73f7190ddf0489eee5eb51377385f59f2a"},
{file = "ruamel.yaml.clib-0.2.12-cp310-cp310-win32.whl", hash = "sha256:3eac5a91891ceb88138c113f9db04f3cebdae277f5d44eaa3651a4f573e6a5da"},
{file = "ruamel.yaml.clib-0.2.12-cp310-cp310-win_amd64.whl", hash = "sha256:ab007f2f5a87bd08ab1499bdf96f3d5c6ad4dcfa364884cb4549aa0154b13a28"},
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-macosx_13_0_arm64.whl", hash = "sha256:4a6679521a58256a90b0d89e03992c15144c5f3858f40d7c18886023d7943db6"},
@@ -5305,7 +5325,6 @@ files = [
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:811ea1594b8a0fb466172c384267a4e5e367298af6b228931f273b111f17ef52"},
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-musllinux_1_1_i686.whl", hash = "sha256:cf12567a7b565cbf65d438dec6cfbe2917d3c1bdddfce84a9930b7d35ea59642"},
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:7dd5adc8b930b12c8fc5b99e2d535a09889941aa0d0bd06f4749e9a9397c71d2"},
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-musllinux_1_2_aarch64.whl", hash = "sha256:1492a6051dab8d912fc2adeef0e8c72216b24d57bd896ea607cb90bb0c4981d3"},
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-win32.whl", hash = "sha256:bd0a08f0bab19093c54e18a14a10b4322e1eacc5217056f3c063bd2f59853ce4"},
{file = "ruamel.yaml.clib-0.2.12-cp311-cp311-win_amd64.whl", hash = "sha256:a274fb2cb086c7a3dea4322ec27f4cb5cc4b6298adb583ab0e211a4682f241eb"},
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:20b0f8dc160ba83b6dcc0e256846e1a02d044e13f7ea74a3d1d56ede4e48c632"},
@@ -5314,7 +5333,6 @@ files = [
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:749c16fcc4a2b09f28843cda5a193e0283e47454b63ec4b81eaa2242f50e4ccd"},
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:bf165fef1f223beae7333275156ab2022cffe255dcc51c27f066b4370da81e31"},
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:32621c177bbf782ca5a18ba4d7af0f1082a3f6e517ac2a18b3974d4edf349680"},
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:b82a7c94a498853aa0b272fd5bc67f29008da798d4f93a2f9f289feb8426a58d"},
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-win32.whl", hash = "sha256:e8c4ebfcfd57177b572e2040777b8abc537cdef58a2120e830124946aa9b42c5"},
{file = "ruamel.yaml.clib-0.2.12-cp312-cp312-win_amd64.whl", hash = "sha256:0467c5965282c62203273b838ae77c0d29d7638c8a4e3a1c8bdd3602c10904e4"},
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:4c8c5d82f50bb53986a5e02d1b3092b03622c02c2eb78e29bec33fd9593bae1a"},
@@ -5323,7 +5341,6 @@ files = [
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:96777d473c05ee3e5e3c3e999f5d23c6f4ec5b0c38c098b3a5229085f74236c6"},
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-musllinux_1_1_i686.whl", hash = "sha256:3bc2a80e6420ca8b7d3590791e2dfc709c88ab9152c00eeb511c9875ce5778bf"},
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:e188d2699864c11c36cdfdada94d781fd5d6b0071cd9c427bceb08ad3d7c70e1"},
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:4f6f3eac23941b32afccc23081e1f50612bdbe4e982012ef4f5797986828cd01"},
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-win32.whl", hash = "sha256:6442cb36270b3afb1b4951f060eccca1ce49f3d087ca1ca4563a6eb479cb3de6"},
{file = "ruamel.yaml.clib-0.2.12-cp313-cp313-win_amd64.whl", hash = "sha256:e5b8daf27af0b90da7bb903a876477a9e6d7270be6146906b276605997c7e9a3"},
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-macosx_12_0_arm64.whl", hash = "sha256:fc4b630cd3fa2cf7fce38afa91d7cfe844a9f75d7f0f36393fa98815e911d987"},
@@ -5332,7 +5349,6 @@ files = [
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:e2f1c3765db32be59d18ab3953f43ab62a761327aafc1594a2a1fbe038b8b8a7"},
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-musllinux_1_1_i686.whl", hash = "sha256:d85252669dc32f98ebcd5d36768f5d4faeaeaa2d655ac0473be490ecdae3c285"},
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:e143ada795c341b56de9418c58d028989093ee611aa27ffb9b7f609c00d813ed"},
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-musllinux_1_2_aarch64.whl", hash = "sha256:2c59aa6170b990d8d2719323e628aaf36f3bfbc1c26279c0eeeb24d05d2d11c7"},
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-win32.whl", hash = "sha256:beffaed67936fbbeffd10966a4eb53c402fafd3d6833770516bf7314bc6ffa12"},
{file = "ruamel.yaml.clib-0.2.12-cp39-cp39-win_amd64.whl", hash = "sha256:040ae85536960525ea62868b642bdb0c2cc6021c9f9d507810c0c604e66f5a7b"},
{file = "ruamel.yaml.clib-0.2.12.tar.gz", hash = "sha256:6c8fbb13ec503f99a91901ab46e0b07ae7941cd527393187039aec586fdfd36f"},
@@ -6238,4 +6254,4 @@ type = ["pytest-mypy"]
[metadata]
lock-version = "2.1"
python-versions = ">=3.11,<3.13"
content-hash = "91058a14382b76136a82f45624a30aece7a6d77c8b36c290bb4c40ea60c8850b"
content-hash = "ea50c84bc5c6e46b101614b78f85d3408337cc61836fa04ae4b4597512c98055"
+2 -1
View File
@@ -32,7 +32,8 @@ dependencies = [
"openai (>=1.82.0,<2.0.0)",
"xmlsec==1.3.14",
"h2 (==4.3.0)",
"markdown (>=3.9,<4.0)"
"markdown (>=3.9,<4.0)",
"drf-simple-apikey (==2.2.1)"
]
description = "Prowler's API (Django/DRF)"
license = "Apache-2.0"
+4 -5
View File
@@ -1,14 +1,12 @@
import logging
import os
from pathlib import Path
import sys
from django.apps import AppConfig
from django.conf import settings
from pathlib import Path
from config.custom_logging import BackendLogger
from config.env import env
from django.apps import AppConfig
from django.conf import settings
logger = logging.getLogger(BackendLogger.API)
@@ -30,6 +28,7 @@ class ApiConfig(AppConfig):
name = "api"
def ready(self):
from api import schema_extensions # noqa: F401
from api import signals # noqa: F401
from api.compliance import load_prowler_compliance
+76
View File
@@ -0,0 +1,76 @@
from typing import Optional, Tuple
from uuid import UUID
from cryptography.fernet import InvalidToken
from django.utils import timezone
from drf_simple_apikey.backends import APIKeyAuthentication as BaseAPIKeyAuth
from drf_simple_apikey.crypto import get_crypto
from rest_framework.authentication import BaseAuthentication
from rest_framework.exceptions import AuthenticationFailed
from rest_framework.request import Request
from rest_framework_simplejwt.authentication import JWTAuthentication
from api.models import TenantAPIKey, TenantAPIKeyManager
class TenantAPIKeyAuthentication(BaseAPIKeyAuth):
model = TenantAPIKey
def __init__(self):
self.key_crypto = get_crypto()
def authenticate(self, request: Request):
prefixed_key = self.get_key(request)
# Split prefix from key (format: pk_xxxxxxxx.encrypted_key)
try:
prefix, key = prefixed_key.split(TenantAPIKeyManager.separator, 1)
except ValueError:
raise AuthenticationFailed("Invalid API Key.")
try:
entity, _ = self._authenticate_credentials(request, key)
except InvalidToken:
raise AuthenticationFailed("Invalid API Key.")
# Get the API key instance to update last_used_at and retrieve tenant info
# We need to decrypt again to get the pk (already validated by _authenticate_credentials)
payload = self.key_crypto.decrypt(key)
api_key_pk = payload["_pk"]
# Convert string UUID back to UUID object for lookup
if isinstance(api_key_pk, str):
api_key_pk = UUID(api_key_pk)
try:
api_key_instance = TenantAPIKey.objects.get(id=api_key_pk, prefix=prefix)
except TenantAPIKey.DoesNotExist:
raise AuthenticationFailed("Invalid API Key.")
# Update last_used_at
api_key_instance.last_used_at = timezone.now()
api_key_instance.save(update_fields=["last_used_at"])
return entity, {
"tenant_id": str(api_key_instance.tenant_id),
"sub": str(api_key_instance.entity.id),
"api_key_prefix": prefix,
}
class CombinedJWTOrAPIKeyAuthentication(BaseAuthentication):
jwt_auth = JWTAuthentication()
api_key_auth = TenantAPIKeyAuthentication()
def authenticate(self, request: Request) -> Optional[Tuple[object, dict]]:
auth_header = request.headers.get("Authorization", "")
# Prioritize JWT authentication if both are present
if auth_header.startswith("Bearer "):
return self.jwt_auth.authenticate(request)
if auth_header.startswith("Api-Key "):
return self.api_key_auth.authenticate(request)
# Default fallback
return self.jwt_auth.authenticate(request)
+2 -2
View File
@@ -5,8 +5,8 @@ from rest_framework.exceptions import NotAuthenticated
from rest_framework.filters import SearchFilter
from rest_framework_json_api import filters
from rest_framework_json_api.views import ModelViewSet
from rest_framework_simplejwt.authentication import JWTAuthentication
from api.authentication import CombinedJWTOrAPIKeyAuthentication
from api.db_router import MainRouter
from api.db_utils import POSTGRES_USER_VAR, rls_transaction
from api.filters import CustomDjangoFilterBackend
@@ -15,7 +15,7 @@ from api.rbac.permissions import HasPermissions
class BaseViewSet(ModelViewSet):
authentication_classes = [JWTAuthentication]
authentication_classes = [CombinedJWTOrAPIKeyAuthentication]
required_permissions = []
permission_classes = [permissions.IsAuthenticated, HasPermissions]
filter_backends = [
+7 -1
View File
@@ -61,7 +61,7 @@ def rls_transaction(value: str, parameter: str = POSTGRES_TENANT_VAR):
with transaction.atomic():
with connection.cursor() as cursor:
try:
# just in case the value is an UUID object
# just in case the value is a UUID object
uuid.UUID(str(value))
except ValueError:
raise ValidationError("Must be a valid UUID")
@@ -434,6 +434,12 @@ def drop_index_on_partitions(
schema_editor.execute(sql)
def generate_api_key_prefix():
"""Generate a random 8-character prefix for API keys (e.g., 'pk_abc123de')."""
random_chars = generate_random_token(length=8)
return f"pk_{random_chars}"
# Postgres enum definition for member role
+17 -10
View File
@@ -1,6 +1,10 @@
from django.core.exceptions import ValidationError as django_validation_error
from rest_framework import status
from rest_framework.exceptions import APIException
from rest_framework.exceptions import (
APIException,
AuthenticationFailed,
NotAuthenticated,
)
from rest_framework_json_api.exceptions import exception_handler
from rest_framework_json_api.serializers import ValidationError
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
@@ -68,15 +72,18 @@ def custom_exception_handler(exc, context):
exc = ValidationError(exc.message_dict)
else:
exc = ValidationError(detail=exc.messages[0], code=exc.code)
elif isinstance(exc, (TokenError, InvalidToken)):
if (
hasattr(exc, "detail")
and isinstance(exc.detail, dict)
and "messages" in exc.detail
):
exc.detail["messages"] = [
message_item["message"] for message_item in exc.detail["messages"]
]
# Force 401 status for AuthenticationFailed exceptions regardless of the authentication backend
elif isinstance(exc, (AuthenticationFailed, NotAuthenticated, TokenError)):
exc.status_code = status.HTTP_401_UNAUTHORIZED
if isinstance(exc, (TokenError, InvalidToken)):
if (
hasattr(exc, "detail")
and isinstance(exc.detail, dict)
and "messages" in exc.detail
):
exc.detail["messages"] = [
message_item["message"] for message_item in exc.detail["messages"]
]
return exception_handler(exc, context)
+18
View File
@@ -43,6 +43,7 @@ from api.models import (
StateChoices,
StatusChoices,
Task,
TenantAPIKey,
User,
)
from api.rls import Tenant
@@ -880,3 +881,20 @@ class IntegrationJiraFindingsFilter(FilterSet):
}
)
return super().filter_queryset(queryset)
class TenantApiKeyFilter(FilterSet):
inserted_at = DateFilter(field_name="created", lookup_expr="date")
inserted_at__gte = DateFilter(field_name="created", lookup_expr="gte")
inserted_at__lte = DateFilter(field_name="created", lookup_expr="lte")
expires_at = DateFilter(field_name="expiry_date", lookup_expr="date")
expires_at__gte = DateFilter(field_name="expiry_date", lookup_expr="gte")
expires_at__lte = DateFilter(field_name="expiry_date", lookup_expr="lte")
class Meta:
model = TenantAPIKey
fields = {
"prefix": ["exact", "icontains"],
"revoked": ["exact"],
"name": ["exact", "icontains"],
}
+8 -2
View File
@@ -8,9 +8,14 @@ def extract_auth_info(request) -> dict:
if getattr(request, "auth", None) is not None:
tenant_id = request.auth.get("tenant_id", "N/A")
user_id = request.auth.get("sub", "N/A")
api_key_prefix = request.auth.get("api_key_prefix", "N/A")
else:
tenant_id, user_id = "N/A", "N/A"
return {"tenant_id": tenant_id, "user_id": user_id}
tenant_id, user_id, api_key_prefix = "N/A", "N/A", "N/A"
return {
"tenant_id": tenant_id,
"user_id": user_id,
"api_key_prefix": api_key_prefix,
}
class APILoggingMiddleware:
@@ -38,6 +43,7 @@ class APILoggingMiddleware:
extra={
"user_id": auth_info["user_id"],
"tenant_id": auth_info["tenant_id"],
"api_key_prefix": auth_info["api_key_prefix"],
"method": request.method,
"path": request.path,
"query_params": request.GET.dict(),
@@ -0,0 +1,124 @@
# Generated by Django 5.1.12 on 2025-09-30 13:10
import uuid
import django.db.models.deletion
import drf_simple_apikey.models
from django.conf import settings
from django.db import migrations, models
import api.db_utils
import api.rls
class Migration(migrations.Migration):
dependencies = [
("api", "0047_remove_integration_unique_configuration_per_tenant"),
]
operations = [
migrations.CreateModel(
name="TenantAPIKey",
fields=[
("name", models.CharField(blank=True, max_length=255, null=True)),
(
"expiry_date",
models.DateTimeField(
default=drf_simple_apikey.models._expiry_date,
help_text="Once API key expires, entities cannot use it anymore.",
verbose_name="Expires",
),
),
(
"revoked",
models.BooleanField(
blank=True,
default=False,
help_text="If the API key is revoked, entities cannot use it anymore. (This cannot be undone.)",
),
),
("created", models.DateTimeField(auto_now=True)),
(
"whitelisted_ips",
models.JSONField(
blank=True,
help_text="List of allowed IP addresses for this API key.",
null=True,
),
),
(
"blacklisted_ips",
models.JSONField(
blank=True,
help_text="List of denied IP addresses for this API key.",
null=True,
),
),
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
(
"prefix",
models.CharField(
default=api.db_utils.generate_api_key_prefix,
editable=False,
help_text="Unique prefix to identify the API key",
max_length=11,
unique=True,
),
),
(
"last_used_at",
models.DateTimeField(
blank=True,
help_text="Last time this API key was used for authentication",
null=True,
),
),
(
"entity",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="user_api_keys",
to=settings.AUTH_USER_MODEL,
),
),
(
"tenant",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, to="api.tenant"
),
),
],
options={
"db_table": "api_keys",
"abstract": False,
"indexes": [
models.Index(
fields=["tenant_id", "prefix"],
name="api_keys_tenant_prefix_idx",
)
],
"constraints": [
models.UniqueConstraint(
fields=("tenant_id", "prefix"), name="unique_api_key_prefixes"
)
],
},
),
migrations.AddConstraint(
model_name="tenantapikey",
constraint=api.rls.BaseSecurityConstraint(
name="statements_on_tenantapikey",
statements=["SELECT", "INSERT", "UPDATE", "DELETE"],
),
),
]
+63
View File
@@ -22,6 +22,8 @@ from django.db.models import Q
from django.utils.translation import gettext_lazy as _
from django_celery_beat.models import PeriodicTask
from django_celery_results.models import TaskResult
from drf_simple_apikey.crypto import get_crypto
from drf_simple_apikey.models import AbstractAPIKey, AbstractAPIKeyManager
from psqlextra.manager import PostgresManager
from psqlextra.models import PostgresPartitionedModel
from psqlextra.types import PostgresPartitioningMethod
@@ -42,6 +44,7 @@ from api.db_utils import (
StateEnumField,
StatusEnumField,
enum_to_choices,
generate_api_key_prefix,
generate_random_token,
one_week_from_now,
)
@@ -125,6 +128,17 @@ class ActiveProviderPartitionedManager(PostgresManager, ActiveProviderManager):
return super().get_queryset().filter(self.active_provider_filter())
class TenantAPIKeyManager(AbstractAPIKeyManager):
separator = "."
def assign_api_key(self, obj) -> str:
payload = {"_pk": str(obj.pk), "_exp": obj.expiry_date.timestamp()}
key = get_crypto().generate(payload)
prefixed_key = f"{obj.prefix}{self.separator}{key}"
return prefixed_key
class User(AbstractBaseUser):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
name = models.CharField(max_length=150, validators=[MinLengthValidator(3)])
@@ -204,6 +218,55 @@ class Membership(models.Model):
resource_name = "memberships"
class TenantAPIKey(AbstractAPIKey, RowLevelSecurityProtectedModel):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
prefix = models.CharField(
max_length=11,
unique=True,
default=generate_api_key_prefix,
editable=False,
help_text="Unique prefix to identify the API key",
)
last_used_at = models.DateTimeField(
null=True,
blank=True,
help_text="Last time this API key was used for authentication",
)
entity = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="user_api_keys",
)
objects = TenantAPIKeyManager()
class Meta(RowLevelSecurityProtectedModel.Meta):
db_table = "api_keys"
constraints = [
RowLevelSecurityConstraint(
field="tenant_id",
name="rls_on_%(class)s",
statements=["SELECT", "INSERT", "UPDATE", "DELETE"],
),
models.UniqueConstraint(
fields=("tenant_id", "prefix"),
name="unique_api_key_prefixes",
),
]
indexes = [
models.Index(
fields=["tenant_id", "prefix"], name="api_keys_tenant_prefix_idx"
),
]
class JSONAPIMeta:
resource_name = "api-keys"
class Provider(RowLevelSecurityProtectedModel):
objects = ActiveProviderManager()
all_objects = models.Manager()
+16
View File
@@ -0,0 +1,16 @@
from drf_spectacular.extensions import OpenApiAuthenticationExtension
from drf_spectacular.openapi import AutoSchema
class CombinedJWTOrAPIKeyAuthenticationScheme(OpenApiAuthenticationExtension):
target_class = "api.authentication.CombinedJWTOrAPIKeyAuthentication"
name = "JWT or API Key"
def get_security_definition(self, auto_schema: AutoSchema): # noqa: F841
return {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT",
"description": "Supports both JWT Bearer tokens and API Key authentication. "
"Use `Bearer <token>` for JWT or `Api-Key <key>` for API keys.",
}
+26 -2
View File
@@ -1,12 +1,12 @@
from celery import states
from celery.signals import before_task_publish
from config.celery import celery_app
from django.db.models.signals import post_delete
from django.db.models.signals import post_delete, pre_delete
from django.dispatch import receiver
from django_celery_results.backends.database import DatabaseBackend
from api.db_utils import delete_related_daily_task
from api.models import Provider
from api.models import Membership, Provider, TenantAPIKey, User
def create_task_result_on_publish(sender=None, headers=None, **kwargs): # noqa: F841
@@ -32,3 +32,27 @@ before_task_publish.connect(
def delete_provider_scan_task(sender, instance, **kwargs): # noqa: F841
# Delete the associated periodic task when the provider is deleted
delete_related_daily_task(instance.id)
@receiver(pre_delete, sender=User)
def revoke_user_api_keys(sender, instance, **kwargs): # noqa: F841
"""
Revoke all API keys associated with a user before deletion.
The entity field will be set to NULL by on_delete=SET_NULL,
but we explicitly revoke the keys to prevent further use.
"""
TenantAPIKey.objects.filter(entity=instance).update(revoked=True)
@receiver(post_delete, sender=Membership)
def revoke_membership_api_keys(sender, instance, **kwargs): # noqa: F841
"""
Revoke all API keys when a user is removed from a tenant.
When a membership is deleted, all API keys created by that user
in that tenant should be revoked to prevent further access.
"""
TenantAPIKey.objects.filter(
entity=instance.user, tenant_id=instance.tenant.id
).update(revoked=True)
File diff suppressed because it is too large Load Diff
@@ -1,9 +1,14 @@
import time
from datetime import datetime, timedelta, timezone
from uuid import uuid4
import pytest
from conftest import TEST_PASSWORD, get_api_tokens, get_authorization_header
from django.urls import reverse
from drf_simple_apikey.crypto import get_crypto
from rest_framework.test import APIClient
from api.models import Membership, User
from api.models import Membership, Role, TenantAPIKey, User, UserRoleRelationship
@pytest.mark.django_db
@@ -298,3 +303,706 @@ class TestTokenSwitchTenant:
assert invalid_tenant_response.json()["errors"][0]["detail"] == (
"Tenant does not exist or user is not a " "member."
)
@pytest.mark.django_db
class TestAPIKeyAuthentication:
def test_successful_authentication_with_api_key(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""Verify API key can authenticate and access protected endpoints."""
client = APIClient()
api_key = api_keys_fixture[0]
# Use API key to authenticate and access protected endpoint
api_key_headers = get_api_key_header(api_key._raw_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 200
assert "data" in response.json()
def test_api_key_one_time_display_on_creation(
self, create_test_user_rbac, tenants_fixture
):
"""Verify full key only returned on creation, subsequent retrieval shows prefix only."""
client = APIClient()
# Authenticate with JWT to create API key
access_token, _ = get_api_tokens(
client, create_test_user_rbac.email, TEST_PASSWORD
)
jwt_headers = get_authorization_header(access_token)
# Create API key
api_key_name = "Test One-Time Key"
create_response = client.post(
reverse("api-key-list"),
data={
"data": {
"type": "api-keys",
"attributes": {
"name": api_key_name,
},
}
},
format="vnd.api+json",
headers=jwt_headers,
)
assert create_response.status_code == 201
created_data = create_response.json()["data"]
api_key_id = created_data["id"]
# Verify full key is present in creation response
assert "api_key" in created_data["attributes"]
full_key = created_data["attributes"]["api_key"]
assert full_key.startswith("pk_")
assert "." in full_key
# Retrieve the same API key
retrieve_response = client.get(
reverse("api-key-detail", kwargs={"pk": api_key_id}),
headers=jwt_headers,
)
assert retrieve_response.status_code == 200
retrieved_data = retrieve_response.json()["data"]
# Verify full key is NOT present in retrieval response
assert "api_key" not in retrieved_data["attributes"]
# Only prefix should be visible
assert "prefix" in retrieved_data["attributes"]
assert retrieved_data["attributes"]["prefix"].startswith("pk_")
def test_last_used_at_tracking(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""Verify last_used_at timestamp updates on each authentication."""
client = APIClient()
api_key = api_keys_fixture[0]
# Verify initially last_used_at is None
assert api_key.last_used_at is None
# Use API key to authenticate
api_key_headers = get_api_key_header(api_key._raw_key)
first_response = client.get(reverse("provider-list"), headers=api_key_headers)
assert first_response.status_code == 200
# Reload from database and check last_used_at is set
api_key.refresh_from_db()
first_used_at = api_key.last_used_at
assert first_used_at is not None
# Use the same key again after a small delay
time.sleep(0.1)
second_response = client.get(reverse("provider-list"), headers=api_key_headers)
assert second_response.status_code == 200
# Reload and verify last_used_at was updated
api_key.refresh_from_db()
second_used_at = api_key.last_used_at
assert second_used_at is not None
assert second_used_at > first_used_at
@pytest.mark.django_db
class TestAPIKeyErrors:
def test_invalid_api_key_format_missing_separator(
self, create_test_user, tenants_fixture
):
"""Malformed key without . separator."""
client = APIClient()
# Create malformed key without separator
malformed_key = "pk_12345678abcdefgh"
api_key_headers = get_api_key_header(malformed_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 401
assert "Invalid API Key." in response.json()["errors"][0]["detail"]
def test_invalid_api_key_format_malformed(self, create_test_user, tenants_fixture):
"""Completely invalid format."""
client = APIClient()
# Various malformed keys
malformed_keys = [
"invalid_key",
"Bearer some_token",
"",
"pk_.",
".encrypted_part",
]
for malformed_key in malformed_keys:
api_key_headers = get_api_key_header(malformed_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 401
assert "Invalid API Key." in response.json()["errors"][0]["detail"]
def test_expired_api_key_rejected(self, create_test_user, tenants_fixture):
"""Key past expiry date returns 401."""
client = APIClient()
# Create API key with past expiry date
expired_key, raw_key = TenantAPIKey.objects.create_api_key(
name="Expired Key",
tenant_id=tenants_fixture[0].id,
entity=create_test_user,
expiry_date=datetime.now(timezone.utc) - timedelta(days=1),
)
api_key_headers = get_api_key_header(raw_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 401
assert "API Key has already expired." in response.json()["errors"][0]["detail"]
def test_revoked_api_key_rejected(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""Revoked key returns 401."""
client = APIClient()
# Use the revoked key from fixture
revoked_key = api_keys_fixture[2]
assert revoked_key.revoked is True
api_key_headers = get_api_key_header(revoked_key._raw_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 401
assert "API Key has been revoked." in response.json()["errors"][0]["detail"]
def test_non_existent_api_key(self, create_test_user, tenants_fixture):
"""Key UUID doesn't exist in database."""
client = APIClient()
# Create a valid-looking key with non-existent UUID
crypto = get_crypto()
fake_uuid = str(uuid4())
fake_expiry = (datetime.now(timezone.utc) + timedelta(days=30)).timestamp()
payload = {"_pk": fake_uuid, "_exp": fake_expiry}
encrypted_payload = crypto.generate(payload)
fake_key = f"pk_fakepfx.{encrypted_payload}"
api_key_headers = get_api_key_header(fake_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 401
assert (
"No entity matching this api key." in response.json()["errors"][0]["detail"]
)
def test_corrupted_payload(self, create_test_user, tenants_fixture):
"""Tampered/corrupted encrypted payload."""
client = APIClient()
# Create key with corrupted encrypted portion
corrupted_key = "pk_12345678.corrupted_encrypted_data_here"
api_key_headers = get_api_key_header(corrupted_key)
response = client.get(reverse("provider-list"), headers=api_key_headers)
assert response.status_code == 401
assert "Invalid API Key." in response.json()["errors"][0]["detail"]
@pytest.mark.django_db
class TestAPIKeyTenantIsolation:
def test_api_key_tenant_isolation(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""User in tenant A cannot use API key from tenant B."""
client = APIClient()
# Create a second user in a different tenant
second_user = User.objects.create_user(
name="second_user",
email="second_user@prowler.com",
password="Test_password@1",
)
second_tenant = tenants_fixture[1]
Membership.objects.create(user=second_user, tenant=second_tenant)
# Create and assign role to second_user
second_role = Role.objects.create(
tenant_id=second_tenant.id,
name="Second Tenant Role",
unlimited_visibility=True,
manage_account=True,
)
UserRoleRelationship.objects.create(
user=second_user,
role=second_role,
tenant_id=second_tenant.id,
)
# Create API key for second user in second tenant
second_key, raw_key = TenantAPIKey.objects.create_api_key(
name="Second Tenant Key",
tenant_id=second_tenant.id,
entity=second_user,
)
# First user's API key from first tenant
first_key = api_keys_fixture[0]
tenants_fixture[0]
# Verify both keys are from different tenants
assert first_key.tenant_id != second_key.tenant_id
# Each key should only access resources in its own tenant
# This is enforced by RLS at the database level
first_headers = get_api_key_header(first_key._raw_key)
second_headers = get_api_key_header(raw_key)
# Both should work for their respective tenants
first_response = client.get(reverse("provider-list"), headers=first_headers)
assert first_response.status_code == 200
second_response = client.get(reverse("provider-list"), headers=second_headers)
assert second_response.status_code == 200
# Verify tenant context is correct in each response
# The responses should contain only data for their respective tenants
def test_api_key_filters_by_tenant(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""List endpoint only shows keys for current tenant."""
client = APIClient()
# Create JWT token for first tenant
access_token, _ = get_api_tokens(client, create_test_user.email, TEST_PASSWORD)
jwt_headers = get_authorization_header(access_token)
# List API keys
list_response = client.get(reverse("api-key-list"), headers=jwt_headers)
assert list_response.status_code == 200
keys_data = list_response.json()["data"]
# Verify all returned keys belong to the current tenant
tenants_fixture[0].id
for key_data in keys_data:
# We can't directly see tenant_id in response, but all keys should be from fixtures
# which are created in first tenant
assert key_data["type"] == "api-keys"
# Count should match the number of non-revoked keys in api_keys_fixture for this tenant
# api_keys_fixture creates 3 keys (1 normal, 1 with expiry, 1 revoked)
assert len(keys_data) == 3
def test_api_key_revoked_when_user_removed_from_tenant(self, tenants_fixture):
"""When user membership is deleted, all user's API keys for that tenant are revoked."""
client = APIClient()
tenant = tenants_fixture[0]
# Create a fresh user for this test
test_user = User.objects.create_user(
name="test_membership_removal",
email="membership_removal@prowler.com",
password=TEST_PASSWORD,
)
# Create membership between user and tenant
Membership.objects.create(
user=test_user,
tenant=tenant,
role=Membership.RoleChoices.OWNER,
)
# Create role with manage_account permission
role = Role.objects.create(
tenant_id=tenant.id,
name="Membership Removal Role",
unlimited_visibility=True,
manage_account=True,
)
# Assign role to user
UserRoleRelationship.objects.create(
user=test_user,
role=role,
tenant_id=tenant.id,
)
# Create API key for this user in this tenant
api_key, raw_key = TenantAPIKey.objects.create_api_key(
name="Test Key for Membership Removal",
tenant_id=tenant.id,
entity=test_user,
)
# Verify API key works initially
api_key_headers = get_api_key_header(raw_key)
initial_response = client.get(reverse("provider-list"), headers=api_key_headers)
assert initial_response.status_code == 200
# Store API key ID for later verification
api_key_id = api_key.id
# Remove user from tenant by deleting membership
Membership.objects.filter(user=test_user, tenant=tenant).delete()
# Reload API key from database
api_key.refresh_from_db()
# Verify API key still exists in database
assert TenantAPIKey.objects.filter(id=api_key_id).exists()
# Verify API key is now revoked
assert api_key.revoked is True
# Verify authentication with this API key now fails with 401
auth_response = client.get(reverse("provider-list"), headers=api_key_headers)
assert auth_response.status_code == 401
# Verify error message indicates revocation
response_json = auth_response.json()
assert "errors" in response_json
error_detail = response_json["errors"][0]["detail"]
assert "revoked" in error_detail.lower()
@pytest.mark.django_db
class TestAPIKeyLifecycle:
def test_create_api_key(self, create_test_user_rbac, tenants_fixture):
"""Create via POST with name and optional expiry."""
client = APIClient()
# Authenticate with JWT
access_token, _ = get_api_tokens(
client, create_test_user_rbac.email, TEST_PASSWORD
)
jwt_headers = get_authorization_header(access_token)
# Create API key without expiry
key_name = "Test Lifecycle Key"
create_response = client.post(
reverse("api-key-list"),
data={
"data": {
"type": "api-keys",
"attributes": {
"name": key_name,
},
}
},
format="vnd.api+json",
headers=jwt_headers,
)
assert create_response.status_code == 201
created_data = create_response.json()["data"]
assert created_data["attributes"]["name"] == key_name
assert "api_key" in created_data["attributes"]
assert "prefix" in created_data["attributes"]
assert created_data["attributes"]["revoked"] is False
# Create API key with expiry
future_expiry = (datetime.now(timezone.utc) + timedelta(days=90)).isoformat()
create_with_expiry_response = client.post(
reverse("api-key-list"),
data={
"data": {
"type": "api-keys",
"attributes": {
"name": "Key with Expiry",
"expires_at": future_expiry,
},
}
},
format="vnd.api+json",
headers=jwt_headers,
)
assert create_with_expiry_response.status_code == 201
expiry_data = create_with_expiry_response.json()["data"]
assert expiry_data["attributes"]["expires_at"] is not None
def test_update_api_key_name_only(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""PATCH only allows name changes."""
client = APIClient()
# Authenticate with JWT
access_token, _ = get_api_tokens(client, create_test_user.email, TEST_PASSWORD)
jwt_headers = get_authorization_header(access_token)
api_key = api_keys_fixture[0]
api_key.name
new_name = "Updated API Key Name"
# Update name
update_response = client.patch(
reverse("api-key-detail", kwargs={"pk": api_key.id}),
data={
"data": {
"type": "api-keys",
"id": str(api_key.id),
"attributes": {
"name": new_name,
},
}
},
format="vnd.api+json",
headers=jwt_headers,
)
assert update_response.status_code == 200
updated_data = update_response.json()["data"]
assert updated_data["attributes"]["name"] == new_name
# Verify name was actually updated in database
api_key.refresh_from_db()
assert api_key.name == new_name
# Verify other fields remain unchanged
assert api_key.prefix == updated_data["attributes"]["prefix"]
assert api_key.revoked is False
def test_delete_api_key(self, create_test_user, tenants_fixture, api_keys_fixture):
"""DELETE revokes key (sets revoked=True)."""
client = APIClient()
# Authenticate with JWT
access_token, _ = get_api_tokens(client, create_test_user.email, TEST_PASSWORD)
jwt_headers = get_authorization_header(access_token)
api_key = api_keys_fixture[1]
api_key_id = api_key.id
# Revoke API key using the revoke endpoint
revoke_response = client.delete(
reverse("api-key-revoke", kwargs={"pk": api_key_id}),
headers=jwt_headers,
)
assert revoke_response.status_code == 200
# Verify key still exists but is revoked
api_key.refresh_from_db()
assert api_key.revoked is True
# Verify revoked key can no longer authenticate
api_key_headers = get_api_key_header(api_key._raw_key)
auth_response = client.get(reverse("provider-list"), headers=api_key_headers)
assert auth_response.status_code == 401
def test_multiple_keys_per_user(self, create_test_user_rbac, tenants_fixture):
"""User can have multiple active keys."""
client = APIClient()
# Authenticate with JWT
access_token, _ = get_api_tokens(
client, create_test_user_rbac.email, TEST_PASSWORD
)
jwt_headers = get_authorization_header(access_token)
# Create multiple API keys
key_names = ["Key One", "Key Two", "Key Three"]
created_keys = []
for name in key_names:
create_response = client.post(
reverse("api-key-list"),
data={
"data": {
"type": "api-keys",
"attributes": {
"name": name,
},
}
},
format="vnd.api+json",
headers=jwt_headers,
)
assert create_response.status_code == 201
created_keys.append(create_response.json()["data"])
# Verify all keys were created
assert len(created_keys) == 3
# List all keys and verify count
list_response = client.get(reverse("api-key-list"), headers=jwt_headers)
assert list_response.status_code == 200
# Should include the 3 new keys plus the ones from api_keys_fixture
keys_list = list_response.json()["data"]
assert len(keys_list) >= 3
# Verify each created key can authenticate independently
for key_data in created_keys:
full_key = key_data["attributes"]["api_key"]
api_key_headers = get_api_key_header(full_key)
auth_response = client.get(
reverse("provider-list"), headers=api_key_headers
)
assert auth_response.status_code == 200
def test_api_key_becomes_invalid_when_user_deleted(self, tenants_fixture):
"""When user is deleted, API key entity is set to None and authentication fails."""
client = APIClient()
tenant = tenants_fixture[0]
# Create a fresh user for this test to avoid affecting other tests
test_user = User.objects.create_user(
name="test_deletion_user",
email="deletion_test@prowler.com",
password=TEST_PASSWORD,
)
Membership.objects.create(
user=test_user,
tenant=tenant,
role=Membership.RoleChoices.OWNER,
)
# Create role for the user
role = Role.objects.create(
tenant_id=tenant.id,
name="Deletion Test Role",
unlimited_visibility=True,
manage_account=True,
)
UserRoleRelationship.objects.create(
user=test_user,
role=role,
tenant_id=tenant.id,
)
# Create API key for this user
api_key, raw_key = TenantAPIKey.objects.create_api_key(
name="Test Key for Deletion",
tenant_id=tenant.id,
entity=test_user,
)
# Verify the API key works initially
api_key_headers = get_api_key_header(raw_key)
initial_response = client.get(reverse("provider-list"), headers=api_key_headers)
assert initial_response.status_code == 200
# Store the API key ID for later verification
api_key_id = api_key.id
# Delete the user
test_user.delete()
# Reload the API key from database
api_key.refresh_from_db()
# Verify the API key still exists in database (not cascade deleted)
assert TenantAPIKey.objects.filter(id=api_key_id).exists()
# Verify entity field is now None (CASCADE behavior is SET_NULL)
assert api_key.entity is None
# Verify authentication with this API key now fails
auth_response = client.get(reverse("provider-list"), headers=api_key_headers)
# Must return 401 Unauthorized, not 500 Internal Server Error
assert auth_response.status_code == 401, (
f"Expected 401 but got {auth_response.status_code}: "
f"{auth_response.json()}"
)
# Verify error message is present
response_json = auth_response.json()
assert "errors" in response_json
error_detail = response_json["errors"][0]["detail"]
# The error should indicate authentication failed due to invalid/orphaned key
assert (
"API Key" in error_detail
or "Invalid" in error_detail
or "entity" in error_detail.lower()
)
@pytest.mark.django_db
class TestCombinedAuthentication:
def test_jwt_takes_priority_over_api_key(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""When Bearer token present, JWT is used."""
client = APIClient()
# Get JWT token
access_token, _ = get_api_tokens(client, create_test_user.email, TEST_PASSWORD)
# Create headers with both Bearer (JWT) and API key would conflict
# But we'll test that Bearer takes priority by setting Authorization to Bearer
jwt_headers = {"Authorization": f"Bearer {access_token}"}
response = client.get(reverse("provider-list"), headers=jwt_headers)
assert response.status_code == 200
# The authentication should have used JWT, not API key
# We can verify this worked as JWT authentication
def test_api_key_header_format_validation(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""Verify Authorization: Api-Key <key> format."""
client = APIClient()
api_key = api_keys_fixture[0]
# Correct format
correct_headers = {"Authorization": f"Api-Key {api_key._raw_key}"}
correct_response = client.get(reverse("provider-list"), headers=correct_headers)
assert correct_response.status_code == 200
# Wrong format - using Bearer instead of Api-Key
wrong_format_headers = {"Authorization": f"Bearer {api_key._raw_key}"}
wrong_response = client.get(
reverse("provider-list"), headers=wrong_format_headers
)
# Should fail because it tries to parse as JWT
assert wrong_response.status_code == 401
# Wrong format - missing Api-Key prefix
no_prefix_headers = {"Authorization": api_key._raw_key}
no_prefix_response = client.get(
reverse("provider-list"), headers=no_prefix_headers
)
assert no_prefix_response.status_code == 401
def test_concurrent_api_key_usage(
self, create_test_user, tenants_fixture, api_keys_fixture
):
"""Same key can be used multiple times concurrently."""
client = APIClient()
api_key = api_keys_fixture[0]
api_key_headers = get_api_key_header(api_key._raw_key)
# Make multiple concurrent requests with the same key
responses = []
for _ in range(5):
response = client.get(reverse("provider-list"), headers=api_key_headers)
responses.append(response)
# All requests should succeed
for response in responses:
assert response.status_code == 200
# Verify last_used_at was updated
api_key.refresh_from_db()
assert api_key.last_used_at is not None
def get_api_key_header(api_key: str) -> dict:
"""Helper to create API key authorization header."""
return {"Authorization": f"Api-Key {api_key}"}
@@ -11,6 +11,7 @@ from api.db_utils import (
batch_delete,
create_objects_in_batches,
enum_to_choices,
generate_api_key_prefix,
generate_random_token,
one_week_from_now,
update_objects_in_batches,
@@ -313,3 +314,28 @@ class TestUpdateObjectsInBatches:
qs = Provider.objects.filter(tenant=tenant, uid__endswith="_upd")
assert qs.count() == total
class TestGenerateApiKeyPrefix:
def test_prefix_format(self):
"""Test that generated prefix starts with 'pk_'."""
prefix = generate_api_key_prefix()
assert prefix.startswith("pk_")
def test_prefix_length(self):
"""Test that prefix has correct length (pk_ + 8 random chars = 11)."""
prefix = generate_api_key_prefix()
assert len(prefix) == 11
def test_prefix_uniqueness(self):
"""Test that multiple generations produce unique prefixes."""
prefixes = {generate_api_key_prefix() for _ in range(100)}
assert len(prefixes) == 100
def test_prefix_character_set(self):
"""Test that random part uses only allowed characters."""
allowed_chars = "23456789ABCDEFGHJKMNPQRSTVWXYZ"
for _ in range(50):
prefix = generate_api_key_prefix()
random_part = prefix[3:] # Strip 'pk_'
assert all(char in allowed_chars for char in random_part)
@@ -24,6 +24,7 @@ def test_api_logging_middleware_logging(mock_logger):
mock_extract_auth_info.return_value = {
"user_id": "user123",
"tenant_id": "tenant456",
"api_key_prefix": "pk_test",
}
with patch("api.middleware.logging.getLogger") as mock_get_logger:
@@ -44,6 +45,7 @@ def test_api_logging_middleware_logging(mock_logger):
expected_extra = {
"user_id": "user123",
"tenant_id": "tenant456",
"api_key_prefix": "pk_test",
"method": "GET",
"path": "/test-path",
"query_params": {"param1": "value1", "param2": "value2"},
+747 -1
View File
@@ -12,8 +12,8 @@ from uuid import uuid4
import jwt
import pytest
from allauth.socialaccount.models import SocialAccount, SocialApp
from allauth.account.models import EmailAddress
from allauth.socialaccount.models import SocialAccount, SocialApp
from botocore.exceptions import ClientError, NoCredentialsError
from conftest import (
API_JSON_CONTENT_TYPE,
@@ -48,6 +48,7 @@ from api.models import (
Scan,
StateChoices,
Task,
TenantAPIKey,
User,
UserRoleRelationship,
)
@@ -7405,3 +7406,748 @@ class TestProcessorViewSet:
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
@pytest.mark.django_db
class TestTenantApiKeyViewSet:
"""Tests for TenantAPIKey endpoints."""
def test_api_keys_list(self, authenticated_client, api_keys_fixture):
"""Test listing all API keys for the tenant."""
response = authenticated_client.get(reverse("api-key-list"))
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == len(api_keys_fixture)
# Verify keys are ordered by -created (newest first)
assert data[0]["attributes"]["name"] == TenantAPIKey.objects.first().name
def test_api_keys_list_empty(self, authenticated_client, tenants_fixture):
"""Test listing API keys when none exist returns empty list."""
response = authenticated_client.get(reverse("api-key-list"))
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == 0
assert isinstance(data, list)
def test_api_keys_list_default_ordering(
self, authenticated_client, api_keys_fixture
):
"""Test that API keys are ordered by -created (newest first) by default."""
response = authenticated_client.get(reverse("api-key-list"))
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
# Verify ordering by comparing inserted_at timestamps
# (newest should be first since ordering = ["-created"])
if len(data) >= 2:
first_date = data[0]["attributes"]["inserted_at"]
second_date = data[1]["attributes"]["inserted_at"]
assert first_date >= second_date
def test_api_keys_list_pagination_page_size(
self, authenticated_client, api_keys_fixture
):
"""Test pagination with custom page size."""
page_size = 1
response = authenticated_client.get(
reverse("api-key-list"), {"page[size]": page_size}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == page_size
assert response.json()["meta"]["pagination"]["page"] == 1
assert response.json()["meta"]["pagination"]["pages"] == 3
def test_api_keys_list_pagination_page_number(
self, authenticated_client, api_keys_fixture
):
"""Test pagination with specific page number."""
page_size = 1
page_number = 2
response = authenticated_client.get(
reverse("api-key-list"),
{"page[size]": page_size, "page[number]": page_number},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == page_size
assert response.json()["meta"]["pagination"]["page"] == page_number
def test_api_keys_list_pagination_invalid_page(self, authenticated_client):
"""Test pagination with invalid page number returns 404."""
response = authenticated_client.get(
reverse("api-key-list"), {"page[number]": 999}
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_api_keys_retrieve(self, authenticated_client, api_keys_fixture):
"""Test retrieving a single API key by ID."""
api_key = api_keys_fixture[0]
response = authenticated_client.get(
reverse("api-key-detail", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert data["id"] == str(api_key.id)
assert data["attributes"]["name"] == api_key.name
assert data["attributes"]["prefix"] == api_key.prefix
assert data["attributes"]["revoked"] == api_key.revoked
assert "expires_at" in data["attributes"]
assert "inserted_at" in data["attributes"]
assert "last_used_at" in data["attributes"]
# Verify api_key field is NOT in response (only on creation)
assert "api_key" not in data["attributes"]
def test_api_keys_retrieve_invalid(self, authenticated_client):
"""Test retrieving non-existent API key returns 404."""
response = authenticated_client.get(
reverse(
"api-key-detail",
kwargs={"pk": "f498b103-c760-4785-9a3e-e23fafbb7b02"},
)
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_api_keys_retrieve_field_mapping(
self, authenticated_client, api_keys_fixture
):
"""Test that field names are correctly mapped (expires_at, inserted_at)."""
api_key = api_keys_fixture[0]
response = authenticated_client.get(
reverse("api-key-detail", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]["attributes"]
# Verify field mapping: expires_at -> expiry_date
assert "expires_at" in data
assert "expiry_date" not in data
# Verify field mapping: inserted_at -> created
assert "inserted_at" in data
assert "created" not in data
@pytest.mark.parametrize(
"api_key_payload",
(
[
{"name": "New API Key"},
{"name": ""},
{},
]
),
)
def test_api_keys_create_valid(
self, authenticated_client, create_test_user, api_key_payload
):
data = {
"data": {
"type": "api-keys",
"attributes": api_key_payload,
}
}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_201_CREATED
response_data = response.json()["data"]
assert "prefix" in response_data["attributes"]
assert "api_key" in response_data["attributes"]
assert response_data["attributes"]["api_key"] is not None
# Verify the raw API key is returned (only on creation)
assert (
response_data["attributes"]["prefix"]
in response_data["attributes"]["api_key"]
)
# Verify entity is set to current user
assert response_data["relationships"]["entity"]["data"]["id"] == str(
create_test_user.id
)
@pytest.mark.parametrize(
"api_key_payload, error_pointer",
(
[
(
{"name": "Invalid Expiry", "expires_at": "not-a-date"},
"expires_at",
),
]
),
)
def test_api_keys_create_invalid(
self,
authenticated_client,
create_test_user,
api_key_payload,
error_pointer,
):
data = {
"data": {
"type": "api-keys",
"attributes": api_key_payload,
}
}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "errors" in response.json()
assert (
response.json()["errors"][0]["source"]["pointer"]
== f"/data/attributes/{error_pointer}"
)
def test_api_keys_create_multiple_unique_prefixes(
self, authenticated_client, api_keys_fixture
):
"""Test creating multiple API keys generates unique prefixes."""
prefixes = set()
for i in range(3):
data = {
"data": {
"type": "api-keys",
"attributes": {
"name": f"Unique Key {i}",
},
}
}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_201_CREATED
prefix = response.json()["data"]["attributes"]["prefix"]
prefixes.add(prefix)
# Verify all prefixes are unique
assert len(prefixes) == 3
def test_api_keys_create_invalid_content_type(
self, authenticated_client, create_test_user
):
"""Test creating an API key with wrong content type returns 415."""
data = {"name": "Test Key"}
response = authenticated_client.post(
reverse("api-key-list"),
data=data,
content_type="application/json",
)
assert response.status_code == status.HTTP_415_UNSUPPORTED_MEDIA_TYPE
def test_api_keys_create_malformed_json(
self, authenticated_client, create_test_user
):
"""Test creating an API key with malformed JSON returns 400."""
response = authenticated_client.post(
reverse("api-key-list"),
data="not valid json",
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_api_keys_create_invalid_structure(
self, authenticated_client, create_test_user
):
"""Test creating an API key with invalid JSON:API structure."""
data = {"invalid": "structure"}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "errors" in response.json()
def test_api_keys_revoke(self, authenticated_client, api_keys_fixture):
"""Test revoking an API key."""
api_key = api_keys_fixture[0] # Not revoked
assert api_key.revoked is False
response = authenticated_client.delete(
reverse("api-key-revoke", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()["data"]
assert response_data["attributes"]["revoked"] is True
# Verify in database
api_key.refresh_from_db()
assert api_key.revoked is True
def test_api_keys_revoke_already_revoked(
self, authenticated_client, api_keys_fixture
):
"""Test revoking an already revoked API key returns validation error."""
api_key = api_keys_fixture[2] # Already revoked
api_key.refresh_from_db()
assert api_key.revoked is True
response = authenticated_client.delete(
reverse("api-key-revoke", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
assert "already revoked" in response.json()["errors"][0]["detail"]
def test_api_keys_revoke_nonexistent(self, authenticated_client):
"""Test revoking non-existent API key returns 404."""
response = authenticated_client.delete(
reverse(
"api-key-revoke",
kwargs={"pk": "f498b103-c760-4785-9a3e-e23fafbb7b02"},
)
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_api_keys_destroy_not_allowed(self, authenticated_client, api_keys_fixture):
"""Test that DELETE (destroy) endpoint is disabled."""
api_key = api_keys_fixture[0]
response = authenticated_client.delete(
reverse("api-key-detail", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
def test_api_keys_put_not_allowed(self, authenticated_client, api_keys_fixture):
"""Test that PUT is not allowed."""
api_key = api_keys_fixture[0]
data = {
"data": {
"type": "api-keys",
"id": str(api_key.id),
"attributes": {
"name": "Updated Name",
},
}
}
response = authenticated_client.put(
reverse("api-key-detail", kwargs={"pk": api_key.id}),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_405_METHOD_NOT_ALLOWED
@pytest.mark.parametrize(
"filter_name, filter_value, expected_min_count",
(
[
("name", "Test API Key 1", 1),
("name__icontains", "test", 2),
("revoked", "true", 1),
("revoked", "false", 2),
("inserted_at", TODAY, 1),
("inserted_at__gte", "2024-01-01", 3),
("inserted_at__lte", "2099-12-31", 3),
("expires_at__gte", today_after_n_days(50), 1),
]
),
)
def test_api_keys_filters(
self,
authenticated_client,
api_keys_fixture,
filter_name,
filter_value,
expected_min_count,
):
response = authenticated_client.get(
reverse("api-key-list"),
{f"filter[{filter_name}]": filter_value},
)
assert response.status_code == status.HTTP_200_OK
assert len(response.json()["data"]) >= expected_min_count
def test_api_keys_filter_combined(self, authenticated_client, api_keys_fixture):
"""Test combining multiple filters."""
response = authenticated_client.get(
reverse("api-key-list"),
{
"filter[revoked]": "false",
"filter[name__icontains]": "test",
},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert all(item["attributes"]["revoked"] is False for item in data)
assert all("test" in item["attributes"]["name"].lower() for item in data)
@pytest.mark.parametrize(
"filter_name",
(
[
"invalid_field",
"nonexistent",
]
),
)
def test_api_keys_filters_invalid(self, authenticated_client, filter_name):
response = authenticated_client.get(
reverse("api-key-list"),
{f"filter[{filter_name}]": "whatever"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_api_keys_filter_invalid_date_format(self, authenticated_client):
"""Test filtering with invalid date format returns 400."""
response = authenticated_client.get(
reverse("api-key-list"),
{"filter[inserted_at]": "not-a-date"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_api_keys_filter_empty_result(self, authenticated_client, api_keys_fixture):
"""Test filter that returns no results."""
response = authenticated_client.get(
reverse("api-key-list"),
{"filter[name]": "NonExistent Key Name"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert len(data) == 0
assert isinstance(data, list)
@pytest.mark.parametrize(
"sort_field",
(
[
"name",
"prefix",
"revoked",
"inserted_at",
"expires_at",
"-name",
"-inserted_at",
]
),
)
def test_api_keys_sort(self, authenticated_client, api_keys_fixture, sort_field):
response = authenticated_client.get(
reverse("api-key-list"), {"sort": sort_field}
)
assert response.status_code == status.HTTP_200_OK
def test_api_keys_sort_invalid(self, authenticated_client):
"""Test invalid sort parameter returns 400."""
response = authenticated_client.get(
reverse("api-key-list"),
{"sort": "invalid_field"},
)
assert response.status_code == status.HTTP_400_BAD_REQUEST
def test_api_keys_rbac_manage_account_required(
self, authenticated_client_rbac_manage_users_only, api_keys_fixture
):
"""Test that users without MANAGE_ACCOUNT permission are denied."""
response = authenticated_client_rbac_manage_users_only.get(
reverse("api-key-list")
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_api_keys_rbac_manage_account_allowed(
self, authenticated_client_rbac_manage_account, tenants_fixture
):
"""Test that users with MANAGE_ACCOUNT permission can access API keys."""
response = authenticated_client_rbac_manage_account.get(reverse("api-key-list"))
assert response.status_code == status.HTTP_200_OK
def test_api_keys_rbac_create_requires_permission(
self, authenticated_client_rbac_manage_users_only
):
"""Test that creating API keys requires MANAGE_ACCOUNT permission."""
data = {
"data": {
"type": "api-keys",
"attributes": {
"name": "Test Key",
},
}
}
response = authenticated_client_rbac_manage_users_only.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_api_keys_rbac_revoke_requires_permission(
self, authenticated_client_rbac_manage_users_only, api_keys_fixture
):
"""Test that revoking API keys requires MANAGE_ACCOUNT permission."""
api_key = api_keys_fixture[0]
response = authenticated_client_rbac_manage_users_only.delete(
reverse("api-key-revoke", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_api_keys_tenant_isolation(
self, authenticated_client, api_keys_fixture, tenants_fixture
):
"""Test that API keys are isolated by tenant (RLS enforcement)."""
# Create a second tenant with different user
tenant2 = Tenant.objects.create(name="Another Tenant")
user2 = User.objects.create_user(
name="Another User",
email="another@example.com",
password=TEST_PASSWORD,
)
Membership.objects.create(
user=user2,
tenant=tenant2,
role=Membership.RoleChoices.OWNER,
)
# Create API key for tenant2
TenantAPIKey.objects.create_api_key(
name="Tenant 2 Key",
tenant_id=tenant2.id,
entity=user2,
)
# Authenticate as user from tenant 1
response = authenticated_client.get(reverse("api-key-list"))
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
# Should only see keys from tenant 1
assert len(data) == len(api_keys_fixture)
assert all(item["attributes"]["name"] != "Tenant 2 Key" for item in data)
def test_api_keys_tenant_isolation_retrieve(
self, authenticated_client, tenants_fixture
):
"""Test that retrieving API key from another tenant returns 404."""
# Create a second tenant with API key
tenant2 = Tenant.objects.create(name="Another Tenant")
user2 = User.objects.create_user(
name="Another User",
email="another2@example.com",
password=TEST_PASSWORD,
)
Membership.objects.create(
user=user2,
tenant=tenant2,
role=Membership.RoleChoices.OWNER,
)
api_key2, _ = TenantAPIKey.objects.create_api_key(
name="Tenant 2 Key",
tenant_id=tenant2.id,
entity=user2,
)
# Try to retrieve tenant2's API key as tenant1 user
response = authenticated_client.get(
reverse("api-key-detail", kwargs={"pk": api_key2.id})
)
# Should return 404 due to RLS filtering
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_api_keys_tenant_isolation_revoke(
self, authenticated_client, tenants_fixture
):
"""Test that revoking API key from another tenant returns 404."""
# Create a second tenant with API key
tenant2 = Tenant.objects.create(name="Another Tenant")
user2 = User.objects.create_user(
name="Another User",
email="another3@example.com",
password=TEST_PASSWORD,
)
Membership.objects.create(
user=user2,
tenant=tenant2,
role=Membership.RoleChoices.OWNER,
)
api_key2, _ = TenantAPIKey.objects.create_api_key(
name="Tenant 2 Key",
tenant_id=tenant2.id,
entity=user2,
)
# Try to revoke tenant2's API key as tenant1 user
response = authenticated_client.delete(
reverse("api-key-revoke", kwargs={"pk": api_key2.id})
)
# Should return 404 due to RLS filtering
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_api_keys_read_only_fields_on_create(
self, authenticated_client, create_test_user
):
"""Test that read-only fields are ignored during creation."""
# Note: Fields not in serializer (like 'prefix', 'revoked') will cause 400
# So we only test that the response has correct read-only values
data = {
"data": {
"type": "api-keys",
"attributes": {
"name": "Test Read-Only",
},
}
}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_201_CREATED
response_data = response.json()["data"]
# Verify read-only fields have correct default/auto-generated values
# Prefix should be auto-generated (not empty, not None)
assert response_data["attributes"]["prefix"] is not None
assert len(response_data["attributes"]["prefix"]) > 0
# Revoked should be False (default)
assert response_data["attributes"]["revoked"] is False
# Entity should be set to current user (auto-assigned)
assert response_data["relationships"]["entity"]["data"]["id"] == str(
create_test_user.id
)
def test_api_keys_entity_relationship_included(
self, authenticated_client, api_keys_fixture
):
"""Test that entity (user) relationship is included correctly."""
api_key = api_keys_fixture[0]
response = authenticated_client.get(
reverse("api-key-detail", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_200_OK
data = response.json()["data"]
assert "entity" in data["relationships"]
assert data["relationships"]["entity"]["data"]["type"] == "users"
assert data["relationships"]["entity"]["data"]["id"] == str(api_key.entity.id)
def test_api_keys_entity_auto_assigned_on_create(
self, authenticated_client, create_test_user
):
"""Test that entity is automatically assigned to current user on creation."""
data = {
"data": {
"type": "api-keys",
"attributes": {
"name": "Auto Entity Key",
},
}
}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_201_CREATED
response_data = response.json()["data"]
# Entity should be set to authenticated user
assert response_data["relationships"]["entity"]["data"]["id"] == str(
create_test_user.id
)
# Verify in database
api_key_id = response_data["id"]
api_key = TenantAPIKey.objects.get(id=api_key_id)
assert api_key.entity.id == create_test_user.id
def test_api_keys_list_response_structure(
self, authenticated_client, api_keys_fixture
):
"""Test that list response follows JSON:API structure."""
response = authenticated_client.get(reverse("api-key-list"))
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
# Verify top-level structure
assert "data" in response_data
assert "meta" in response_data
assert isinstance(response_data["data"], list)
# Verify pagination meta
assert "pagination" in response_data["meta"]
assert "count" in response_data["meta"]["pagination"]
assert "page" in response_data["meta"]["pagination"]
assert "pages" in response_data["meta"]["pagination"]
def test_api_keys_retrieve_response_structure(
self, authenticated_client, api_keys_fixture
):
"""Test that retrieve response follows JSON:API structure."""
api_key = api_keys_fixture[0]
response = authenticated_client.get(
reverse("api-key-detail", kwargs={"pk": api_key.id})
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
# Verify top-level structure
assert "data" in response_data
data = response_data["data"]
# Verify resource object structure
assert "type" in data
assert data["type"] == "api-keys"
assert "id" in data
assert "attributes" in data
assert "relationships" in data
def test_api_keys_create_response_structure(
self, authenticated_client, create_test_user
):
"""Test that create response follows JSON:API structure."""
data = {
"data": {
"type": "api-keys",
"attributes": {
"name": "Structure Test Key",
},
}
}
response = authenticated_client.post(
reverse("api-key-list"),
data=json.dumps(data),
content_type="application/vnd.api+json",
)
assert response.status_code == status.HTTP_201_CREATED
response_data = response.json()
# Verify top-level structure
assert "data" in response_data
data = response_data["data"]
# Verify resource object structure
assert "type" in data
assert data["type"] == "api-keys"
assert "id" in data
assert "attributes" in data
assert "relationships" in data
# Verify api_key is included in creation response only
assert "api_key" in data["attributes"]
assert data["attributes"]["api_key"] is not None
def test_api_keys_error_response_structure(self, authenticated_client):
"""Test that error responses follow JSON:API structure."""
response = authenticated_client.get(
reverse(
"api-key-detail",
kwargs={"pk": "f498b103-c760-4785-9a3e-e23fafbb7b02"},
)
)
assert response.status_code == status.HTTP_404_NOT_FOUND
response_data = response.json()
# Verify error structure
assert "errors" in response_data
assert isinstance(response_data["errors"], list)
assert len(response_data["errors"]) > 0
# Verify error object structure
error = response_data["errors"][0]
assert "detail" in error or "title" in error
+101
View File
@@ -39,6 +39,7 @@ from api.models import (
StateChoices,
StatusChoices,
Task,
TenantAPIKey,
User,
UserRoleRelationship,
)
@@ -2735,3 +2736,103 @@ class LighthouseConfigUpdateSerializer(BaseWriteSerializer):
instance.api_key_decoded = api_key
instance.save()
return instance
# API Keys
class TenantApiKeySerializer(RLSSerializer):
"""
Serializer for the TenantApiKey model.
"""
# Map database field names to API field names for consistency
expires_at = serializers.DateTimeField(source="expiry_date", read_only=True)
inserted_at = serializers.DateTimeField(source="created", read_only=True)
class Meta:
model = TenantAPIKey
fields = [
"id",
"name",
"prefix",
"expires_at",
"revoked",
"inserted_at",
"last_used_at",
"entity",
]
class TenantApiKeyCreateSerializer(RLSSerializer, BaseWriteSerializer):
"""Serializer for creating new API keys."""
# Map database field names to API field names for consistency
expires_at = serializers.DateTimeField(source="expiry_date", required=False)
inserted_at = serializers.DateTimeField(source="created", read_only=True)
api_key = serializers.SerializerMethodField()
class Meta:
model = TenantAPIKey
fields = [
"id",
"name",
"prefix",
"expires_at",
"revoked",
"entity",
"inserted_at",
"last_used_at",
"api_key",
]
extra_kwargs = {
"id": {"read_only": True},
"prefix": {"read_only": True},
"revoked": {"read_only": True},
"entity": {"read_only": True},
"inserted_at": {"read_only": True},
"last_used_at": {"read_only": True},
"api_key": {"read_only": True},
}
def get_api_key(self, obj):
"""Return the raw API key if it was stored during creation."""
return getattr(obj, "_raw_api_key", None)
def create(self, validated_data):
instance, raw_api_key = TenantAPIKey.objects.create_api_key(
**validated_data,
tenant_id=self.context.get("tenant_id"),
entity=self.context.get("request").user,
)
# Store the raw API key temporarily on the instance for the serializer
instance._raw_api_key = raw_api_key
return instance
class TenantApiKeyUpdateSerializer(RLSSerializer, BaseWriteSerializer):
"""Serializer for updating API keys - only allows changing the name."""
# Map database field names to API field names for consistency
expires_at = serializers.DateTimeField(source="expiry_date", read_only=True)
inserted_at = serializers.DateTimeField(source="created", read_only=True)
class Meta:
model = TenantAPIKey
fields = [
"id",
"name",
"prefix",
"expires_at",
"entity",
"inserted_at",
"last_used_at",
]
extra_kwargs = {
"id": {"read_only": True},
"prefix": {"read_only": True},
"entity": {"read_only": True},
"expires_at": {"read_only": True},
"inserted_at": {"read_only": True},
"last_used_at": {"read_only": True},
}
+2
View File
@@ -39,6 +39,7 @@ from api.v1.views import (
TenantViewSet,
UserRoleRelationshipView,
UserViewSet,
TenantApiKeyViewSet,
)
router = routers.DefaultRouter(trailing_slash=False)
@@ -65,6 +66,7 @@ router.register(
LighthouseConfigViewSet,
basename="lighthouseconfiguration",
)
router.register(r"api-keys", TenantApiKeyViewSet, basename="api-key")
tenants_router = routers.NestedSimpleRouter(router, r"tenants", lookup="tenant")
tenants_router.register(
+91
View File
@@ -95,6 +95,7 @@ from api.filters import (
ScanSummarySeverityFilter,
ServiceOverviewFilter,
TaskFilter,
TenantApiKeyFilter,
TenantFilter,
UserFilter,
)
@@ -124,6 +125,7 @@ from api.models import (
SeverityChoices,
StateChoices,
Task,
TenantAPIKey,
User,
UserRoleRelationship,
)
@@ -189,6 +191,9 @@ from api.v1.serializers import (
ScanUpdateSerializer,
ScheduleDailyCreateSerializer,
TaskSerializer,
TenantApiKeyCreateSerializer,
TenantApiKeySerializer,
TenantApiKeyUpdateSerializer,
TenantSerializer,
TokenRefreshSerializer,
TokenSerializer,
@@ -387,6 +392,11 @@ class SchemaView(SpectacularAPIView):
"description": "Endpoints for Single Sign-On authentication management via SAML for seamless user "
"authentication.",
},
{
"name": "API Keys",
"description": "Endpoints for API keys management. These can be used as an alternative to JWT "
"authorization.",
},
]
return super().get(request, *args, **kwargs)
@@ -4190,3 +4200,84 @@ class ProcessorViewSet(BaseRLSViewSet):
elif self.action == "partial_update":
return ProcessorUpdateSerializer
return super().get_serializer_class()
@extend_schema_view(
list=extend_schema(
tags=["API Keys"],
summary="List API keys",
description="Retrieve a list of API keys for the tenant, with filtering support.",
),
retrieve=extend_schema(
tags=["API Keys"],
summary="Retrieve API key details",
description="Fetch detailed information about a specific API key by its ID.",
),
create=extend_schema(
tags=["API Keys"],
summary="Create a new API key",
description="Create a new API key for the tenant.",
),
partial_update=extend_schema(
tags=["API Keys"],
summary="Partially update an API key",
description="Modify certain fields of an existing API key without affecting other settings.",
),
revoke=extend_schema(
tags=["API Keys"],
summary="Revoke an API key",
description="Revoke an API key by its ID. This action is irreversible and will prevent the key from being "
"used.",
request=None,
responses={
200: OpenApiResponse(
response=TenantApiKeySerializer,
description="API key was successfully revoked",
)
},
),
)
class TenantApiKeyViewSet(BaseRLSViewSet):
queryset = TenantAPIKey.objects.all()
serializer_class = TenantApiKeySerializer
filterset_class = TenantApiKeyFilter
http_method_names = ["get", "post", "patch", "delete"]
ordering = ["revoked", "-created"]
ordering_fields = ["name", "prefix", "revoked", "inserted_at", "expires_at"]
# RBAC required permissions
required_permissions = [Permissions.MANAGE_ACCOUNT]
def get_queryset(self):
queryset = TenantAPIKey.objects.filter(
tenant_id=self.request.tenant_id
).annotate(inserted_at=F("created"), expires_at=F("expiry_date"))
return queryset
def get_serializer_class(self):
if self.action == "create":
return TenantApiKeyCreateSerializer
elif self.action == "partial_update":
return TenantApiKeyUpdateSerializer
return super().get_serializer_class()
@extend_schema(exclude=True)
def destroy(self, request, *args, **kwargs):
raise MethodNotAllowed(method="DESTROY")
@action(detail=True, methods=["delete"])
def revoke(self, request, *args, **kwargs):
instance = self.get_object()
# Check if already revoked
if instance.revoked:
raise ValidationError(
{
"detail": "API key is already revoked",
}
)
TenantAPIKey.objects.revoke_api_key(instance.pk)
instance.refresh_from_db()
serializer = self.get_serializer(instance)
return Response(data=serializer.data, status=status.HTTP_200_OK)
+7
View File
@@ -48,6 +48,10 @@ class NDJSONFormatter(logging.Formatter):
log_record["user_id"] = record.user_id
if hasattr(record, "tenant_id"):
log_record["tenant_id"] = record.tenant_id
if hasattr(record, "api_key_prefix"):
log_record["api_key_prefix"] = (
record.api_key_prefix if record.api_key_prefix != "N/A" else None
)
if hasattr(record, "method"):
log_record["method"] = record.method
if hasattr(record, "path"):
@@ -90,6 +94,9 @@ class HumanReadableFormatter(logging.Formatter):
# Add REST API extra fields
if hasattr(record, "user_id"):
log_components.append(f"({record.user_id})")
if hasattr(record, "api_key_prefix"):
if record.api_key_prefix != "N/A":
log_components.append(f"(API-Key {record.api_key_prefix})")
if hasattr(record, "tenant_id"):
log_components.append(f"[{record.tenant_id}]")
if hasattr(record, "method"):
+11 -2
View File
@@ -43,6 +43,7 @@ INSTALLED_APPS = [
"allauth.socialaccount.providers.saml",
"dj_rest_auth.registration",
"rest_framework.authtoken",
"drf_simple_apikey",
]
MIDDLEWARE = [
@@ -84,7 +85,7 @@ TEMPLATES = [
REST_FRAMEWORK = {
"DEFAULT_SCHEMA_CLASS": "drf_spectacular_jsonapi.schemas.openapi.JsonApiAutoSchema",
"DEFAULT_AUTHENTICATION_CLASSES": (
"rest_framework_simplejwt.authentication.JWTAuthentication",
"api.authentication.CombinedJWTOrAPIKeyAuthentication",
),
"PAGE_SIZE": 10,
"EXCEPTION_HANDLER": "api.exceptions.custom_exception_handler",
@@ -220,7 +221,8 @@ SIMPLE_JWT = {
"JTI_CLAIM": "jti",
"USER_ID_FIELD": "id",
"USER_ID_CLAIM": "sub",
# Issuer and Audience claims, for the moment we will keep these values as default values, they may change in the future.
# Issuer and Audience claims, for the moment we will keep these values as default values, they may change in the
# future.
"AUDIENCE": env.str("DJANGO_JWT_AUDIENCE", "https://api.prowler.com"),
"ISSUER": env.str("DJANGO_JWT_ISSUER", "https://api.prowler.com"),
# Additional security settings
@@ -229,6 +231,13 @@ SIMPLE_JWT = {
SECRETS_ENCRYPTION_KEY = env.str("DJANGO_SECRETS_ENCRYPTION_KEY", "")
# DRF Simple API Key settings
DRF_API_KEY = {
"FERNET_SECRET": SECRETS_ENCRYPTION_KEY,
"API_KEY_LIFETIME": 365,
"AUTHENTICATION_KEYWORD_HEADER": "Api-Key",
}
# Internationalization
# https://docs.djangoproject.com/en/5.0/topics/i18n/
+7
View File
@@ -20,6 +20,13 @@ DATABASE_ROUTERS = []
TESTING = True
SECRETS_ENCRYPTION_KEY = "ZMiYVo7m4Fbe2eXXPyrwxdJss2WSalXSv3xHBcJkPl0="
# DRF Simple API Key settings
DRF_API_KEY = {
"FERNET_SECRET": SECRETS_ENCRYPTION_KEY,
"API_KEY_LIFETIME": 365,
"AUTHENTICATION_KEYWORD_HEADER": "Api-Key",
}
# JWT
SIMPLE_JWT["ALGORITHM"] = "HS256" # noqa: F405
+51
View File
@@ -38,6 +38,7 @@ from api.models import (
StateChoices,
StatusChoices,
Task,
TenantAPIKey,
User,
UserRoleRelationship,
)
@@ -1368,6 +1369,56 @@ def saml_sociallogin(users_fixture):
return sociallogin
@pytest.fixture
def api_keys_fixture(tenants_fixture, create_test_user):
"""Create test API keys for testing."""
tenant = tenants_fixture[0]
user = create_test_user
# Create and assign role to user for API key authentication
role = Role.objects.create(
tenant_id=tenant.id,
name="Test API Key Role",
unlimited_visibility=True,
manage_account=True,
)
UserRoleRelationship.objects.create(
user=user,
role=role,
tenant_id=tenant.id,
)
# Create API keys with different states
api_key1, raw_key1 = TenantAPIKey.objects.create_api_key(
name="Test API Key 1",
tenant_id=tenant.id,
entity=user,
)
api_key2, raw_key2 = TenantAPIKey.objects.create_api_key(
name="Test API Key 2",
tenant_id=tenant.id,
entity=user,
expiry_date=datetime.now(timezone.utc) + timedelta(days=60),
)
# Revoked API key
api_key3, raw_key3 = TenantAPIKey.objects.create_api_key(
name="Revoked API Key",
tenant_id=tenant.id,
entity=user,
)
api_key3.revoked = True
api_key3.save()
# Store raw keys on instances for testing
api_key1._raw_key = raw_key1
api_key2._raw_key = raw_key2
api_key3._raw_key = raw_key3
return [api_key1, api_key2, api_key3]
def get_authorization_header(access_token: str) -> dict:
return {"Authorization": f"Bearer {access_token}"}