Compare commits

...

4 Commits

Author SHA1 Message Date
Víctor Fernández Poyatos
e1ba7bce57 fix(api): align cron validator with celery parser 2026-02-17 12:03:59 +01:00
Víctor Fernández Poyatos
4d44140ab1 fix(api): harden cron validator and migration backfill 2026-02-17 11:45:25 +01:00
Víctor Fernández Poyatos
deb17f4900 test(api): add cron expression validator tests 2026-02-16 18:09:00 +01:00
Víctor Fernández Poyatos
b49e3f59ae feat(api): add cron-based scan schedule data layer 2026-02-16 18:08:22 +01:00
4 changed files with 353 additions and 0 deletions

View File

@@ -0,0 +1,185 @@
import json
import uuid
from datetime import timezone as datetime_timezone
import django.db.models.deletion
from django.db import migrations, models
import api.rls
import api.validators
def _build_daily_cron_expression(start_time):
if start_time is None:
return "0 0 * * *"
if start_time.tzinfo is None:
start_time = start_time.replace(tzinfo=datetime_timezone.utc)
start_time_utc = start_time.astimezone(datetime_timezone.utc)
return f"{start_time_utc.minute} {start_time_utc.hour} * * *"
def backfill_legacy_daily_scan_schedules(apps, schema_editor): # noqa: ARG001
PeriodicTask = apps.get_model("django_celery_beat", "PeriodicTask")
Provider = apps.get_model("api", "Provider")
Scan = apps.get_model("api", "Scan")
ScanSchedule = apps.get_model("api", "ScanSchedule")
for periodic_task in (
PeriodicTask.objects.filter(task="scan-perform-scheduled", enabled=True)
.order_by("-date_changed", "-id")
.iterator()
):
kwargs = periodic_task.kwargs or ""
try:
task_kwargs = json.loads(kwargs)
except (TypeError, json.JSONDecodeError):
continue
if not isinstance(task_kwargs, dict):
continue
tenant_id_raw = task_kwargs.get("tenant_id")
provider_id_raw = task_kwargs.get("provider_id")
if not tenant_id_raw or not provider_id_raw:
continue
try:
tenant_id = uuid.UUID(str(tenant_id_raw))
provider_id = uuid.UUID(str(provider_id_raw))
except (TypeError, ValueError, AttributeError):
continue
provider = Provider.objects.filter(
id=provider_id,
tenant_id=tenant_id,
is_deleted=False,
).first()
if provider is None:
continue
# Preserve a single active migrated schedule per provider.
if provider.scan_schedule_id:
continue
scan_schedule = ScanSchedule.objects.create(
tenant_id=tenant_id,
cron_expression=_build_daily_cron_expression(periodic_task.start_time),
enabled=True,
scheduler_task_id=periodic_task.id,
)
Provider.objects.filter(id=provider_id, tenant_id=tenant_id).update(
scan_schedule_id=scan_schedule.id
)
Scan.objects.filter(
tenant_id=tenant_id,
provider_id=provider_id,
scheduler_task_id=periodic_task.id,
trigger="scheduled",
).update(scan_schedule_id=scan_schedule.id)
def noop_reverse(apps, schema_editor): # noqa: ARG001
"""Forward-only data migration."""
class Migration(migrations.Migration):
atomic = False
dependencies = [
("api", "0078_remove_attackpathsscan_graph_database_fields"),
("django_celery_beat", "0019_alter_periodictasks_options"),
]
operations = [
migrations.CreateModel(
name="ScanSchedule",
fields=[
(
"id",
models.UUIDField(
default=uuid.uuid4,
editable=False,
primary_key=True,
serialize=False,
),
),
("inserted_at", models.DateTimeField(auto_now_add=True)),
("updated_at", models.DateTimeField(auto_now=True)),
(
"cron_expression",
models.CharField(
max_length=100,
validators=[api.validators.cron_5_fields_validator],
),
),
("enabled", models.BooleanField(default=True)),
(
"scheduler_task",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
to="django_celery_beat.periodictask",
),
),
(
"tenant",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="api.tenant",
),
),
],
options={
"db_table": "scan_schedules",
"abstract": False,
},
),
migrations.AddConstraint(
model_name="scanschedule",
constraint=api.rls.RowLevelSecurityConstraint(
"tenant_id",
name="rls_on_scanschedule",
statements=["SELECT", "INSERT", "UPDATE", "DELETE"],
),
),
migrations.AddIndex(
model_name="scanschedule",
index=models.Index(
fields=["tenant_id", "enabled"],
name="scansch_tenant_enabled_idx",
),
),
migrations.AddField(
model_name="provider",
name="scan_schedule",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="providers",
to="api.scanschedule",
),
),
migrations.AddField(
model_name="scan",
name="scan_schedule",
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="scans",
related_query_name="scan",
to="api.scanschedule",
),
),
migrations.RunPython(
code=backfill_legacy_daily_scan_schedules,
reverse_code=noop_reverse,
),
]

View File

@@ -54,6 +54,7 @@ from api.rls import (
RowLevelSecurityProtectedModel,
Tenant,
)
from api.validators import cron_5_fields_validator
from prowler.lib.check.models import Severity
fernet = Fernet(settings.SECRETS_ENCRYPTION_KEY.encode())
@@ -440,6 +441,13 @@ class Provider(RowLevelSecurityProtectedModel):
connection_last_checked_at = models.DateTimeField(null=True, blank=True)
metadata = models.JSONField(default=dict, blank=True)
scanner_args = models.JSONField(default=dict, blank=True)
scan_schedule = models.ForeignKey(
"ScanSchedule",
on_delete=models.SET_NULL,
null=True,
blank=True,
related_name="providers",
)
def clean(self):
super().clean()
@@ -554,6 +562,41 @@ class Task(RowLevelSecurityProtectedModel):
resource_name = "tasks"
class ScanSchedule(RowLevelSecurityProtectedModel):
id = models.UUIDField(primary_key=True, default=uuid4, editable=False)
inserted_at = models.DateTimeField(auto_now_add=True, editable=False)
updated_at = models.DateTimeField(auto_now=True, editable=False)
cron_expression = models.CharField(
max_length=100,
validators=[cron_5_fields_validator],
)
enabled = models.BooleanField(default=True)
scheduler_task = models.ForeignKey(
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
)
class Meta(RowLevelSecurityProtectedModel.Meta):
db_table = "scan_schedules"
constraints = [
RowLevelSecurityConstraint(
field="tenant_id",
name="rls_on_%(class)s",
statements=["SELECT", "INSERT", "UPDATE", "DELETE"],
),
]
indexes = [
models.Index(
fields=["tenant_id", "enabled"],
name="scansch_tenant_enabled_idx",
),
]
class JSONAPIMeta:
resource_name = "scan-schedules"
class Scan(RowLevelSecurityProtectedModel):
objects = ActiveProviderManager()
all_objects = models.Manager()
@@ -583,6 +626,14 @@ class Scan(RowLevelSecurityProtectedModel):
scheduler_task = models.ForeignKey(
PeriodicTask, on_delete=models.SET_NULL, null=True, blank=True
)
scan_schedule = models.ForeignKey(
ScanSchedule,
on_delete=models.SET_NULL,
related_name="scans",
related_query_name="scan",
null=True,
blank=True,
)
output_location = models.CharField(blank=True, null=True, max_length=4096)
provider = models.ForeignKey(
Provider,

View File

@@ -0,0 +1,37 @@
import pytest
from django.core.exceptions import ValidationError
from api.validators import cron_5_fields_validator
class TestCron5FieldsValidator:
@pytest.mark.parametrize(
"expression",
[
"* * * * *",
"*/30 * * * *",
"0 2 * * 1-5",
"15,45 8-18 * 1,6,12 1-5",
],
)
def test_accepts_valid_expressions(self, expression):
cron_5_fields_validator(expression)
@pytest.mark.parametrize(
"expression",
[
"*/30 * * *",
"@daily",
"0 24 * * *",
"0 2 0 * *",
"0 2 * 13 *",
"* * * * 7",
"5/15 * * * *",
"0 2 * * 9",
"*/0 * * * *",
"",
],
)
def test_rejects_invalid_expressions(self, expression):
with pytest.raises(ValidationError):
cron_5_fields_validator(expression)

View File

@@ -1,5 +1,7 @@
import re
import string
from celery.schedules import crontab
from django.core.exceptions import ValidationError
from django.utils.translation import gettext as _
@@ -108,3 +110,81 @@ class NumericValidator:
return _(
f"Your password must contain at least {self.min_numeric} numeric character."
)
def _parse_cron_base(value: str, min_value: int, max_value: int) -> None:
if value == "*":
return
if "-" in value:
range_parts = value.split("-", 1)
if len(range_parts) != 2 or not range_parts[0] or not range_parts[1]:
raise ValidationError("Invalid cron expression.")
if not range_parts[0].isdigit() or not range_parts[1].isdigit():
raise ValidationError("Invalid cron expression.")
start = int(range_parts[0])
end = int(range_parts[1])
if start > end or start < min_value or end > max_value:
raise ValidationError("Invalid cron expression.")
return
if not value.isdigit():
raise ValidationError("Invalid cron expression.")
number = int(value)
if number < min_value or number > max_value:
raise ValidationError("Invalid cron expression.")
def _validate_cron_field(value: str, min_value: int, max_value: int) -> None:
if not value:
raise ValidationError("Invalid cron expression.")
if not re.fullmatch(r"[\d*/,\-]+", value):
raise ValidationError("Invalid cron expression.")
items = value.split(",")
if any(not item for item in items):
raise ValidationError("Invalid cron expression.")
for item in items:
if "/" in item:
step_parts = item.split("/", 1)
if len(step_parts) != 2 or not step_parts[0] or not step_parts[1]:
raise ValidationError("Invalid cron expression.")
base, step = step_parts
if not step.isdigit() or int(step) <= 0:
raise ValidationError("Invalid cron expression.")
_parse_cron_base(base, min_value, max_value)
continue
_parse_cron_base(item, min_value, max_value)
def cron_5_fields_validator(value: str) -> None:
if not isinstance(value, str):
raise ValidationError("Invalid cron expression.")
parts = value.strip().split()
if len(parts) != 5:
raise ValidationError("Cron expression must contain exactly 5 fields in UTC.")
# minute hour day-of-month month day-of-week (Celery: 0-6)
field_ranges = ((0, 59), (0, 23), (1, 31), (1, 12), (0, 6))
for part, (min_value, max_value) in zip(parts, field_ranges, strict=False):
_validate_cron_field(part, min_value, max_value)
# Keep model-level validation aligned with Celery crontab parsing.
try:
crontab(
minute=parts[0],
hour=parts[1],
day_of_month=parts[2],
month_of_year=parts[3],
day_of_week=parts[4],
)
except ValueError as exc:
raise ValidationError("Invalid cron expression.") from exc