Compare commits

...

4 Commits

Author SHA1 Message Date
Pepe Fagoaga
f6cb43e4be fix: typo
Co-authored-by: Víctor Fernández Poyatos <victor@prowler.com>
2025-04-22 21:21:08 +05:45
Pepe Fagoaga
1f0c8ecbd6 chore: remove comment since partitioned tables inherits RLS 2024-12-13 15:29:30 +01:00
Pepe Fagoaga
a2e580167a chore: add docs 2024-12-12 14:28:45 +01:00
Pepe Fagoaga
a0fc4e2692 chore(rls): Improve policy comparison 2024-12-09 11:16:51 +01:00
2 changed files with 9 additions and 8 deletions

View File

@@ -1155,7 +1155,6 @@ class Migration(migrations.Migration):
model_name="Finding",
name="default",
),
# NOTE: the RLS policy needs to be explicitly set on the partitions
migrations.AddConstraint(
model_name="finding",
constraint=api.rls.RowLevelSecurityConstraint(

View File

@@ -2,8 +2,7 @@ from typing import Any
from uuid import uuid4
from django.core.exceptions import ValidationError
from django.db import DEFAULT_DB_ALIAS
from django.db import models
from django.db import DEFAULT_DB_ALIAS, models
from django.db.backends.ddl_references import Statement, Table
from api.db_utils import DB_USER, POSTGRES_TENANT_VAR
@@ -45,10 +44,7 @@ class RowLevelSecurityConstraint(models.BaseConstraint):
FOR {statement}
TO %(db_user)s
{clause} (
CASE
WHEN current_setting('%(tenant_setting)s', True) IS NULL THEN FALSE
ELSE %(field_column)s = current_setting('%(tenant_setting)s')::uuid
END
(SELECT current_setting('%(tenant_setting)s', True)::uuid) = %(field_column)s
);
"""
@@ -82,6 +78,10 @@ class RowLevelSecurityConstraint(models.BaseConstraint):
policy_queries = ""
grant_queries = ""
for statement in self.statements:
# SELECT and DELETE uses USING since applies to rows selected
# INSERT uses WITH CHECK because only applies in cases where records are being added
# UPDATE requires USING and WITH CHECK but if only a USING clause is specified, then that clause will be used for both USING and WITH CHECK cases.
# https://www.postgresql.org/docs/current/sql-createpolicy.html
clause = f"{'WITH CHECK' if statement == 'INSERT' else 'USING'}"
policy_queries = f"{policy_queries}{self.policy_sql_query.format(statement=statement, clause=clause)}"
grant_queries = (
@@ -131,7 +131,9 @@ class RowLevelSecurityConstraint(models.BaseConstraint):
path, _, kwargs = super().deconstruct()
return (path, (self.target_field,), kwargs)
def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS): # noqa: F841
def validate(
self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS
): # noqa: F841
if not hasattr(instance, "tenant_id"):
raise ValidationError(f"{model.__name__} does not have a tenant_id field.")