fix(scan): fix deadlock on resource transactions during concurrent scans (#5968)

This commit is contained in:
Víctor Fernández Poyatos
2024-11-29 15:41:03 +01:00
committed by GitHub
parent 89a7128236
commit 9794b5cf27
3 changed files with 65 additions and 47 deletions

View File

@@ -21,6 +21,7 @@ DJANGO_STALE_WHILE_REVALIDATE=60
DJANGO_SECRETS_ENCRYPTION_KEY=""
# Decide whether to allow Django manage database table partitions
DJANGO_MANAGE_DB_PARTITIONS=[True|False]
DJANGO_CELERY_DEADLOCK_ATTEMPTS=5
# PostgreSQL settings
# If running django and celery on host, use 'localhost', else use 'postgres-db'

View File

@@ -9,3 +9,5 @@ CELERY_RESULT_BACKEND = "django-db"
CELERY_TASK_TRACK_STARTED = True
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
CELERY_DEADLOCK_ATTEMPTS = env.int("DJANGO_CELERY_DEADLOCK_ATTEMPTS", default=5)

View File

@@ -3,6 +3,8 @@ from copy import deepcopy
from datetime import datetime, timezone
from celery.utils.log import get_task_logger
from config.settings.celery import CELERY_DEADLOCK_ATTEMPTS
from django.db import IntegrityError, OperationalError
from django.db.models import Case, Count, IntegerField, Sum, When
from api.compliance import (
@@ -151,43 +153,57 @@ def perform_prowler_scan(
last_status_cache = {}
for progress, findings in prowler_scan.scan():
with tenant_transaction(tenant_id):
for finding in findings:
# Process resource
resource_uid = finding.resource_uid
if resource_uid not in resource_cache:
# Get or create the resource
resource_instance, _ = Resource.objects.get_or_create(
tenant_id=tenant_id,
provider=provider_instance,
uid=resource_uid,
defaults={
"region": finding.region,
"service": finding.service_name,
"type": finding.resource_type,
"name": finding.resource_name,
},
)
resource_cache[resource_uid] = resource_instance
else:
resource_instance = resource_cache[resource_uid]
for finding in findings:
for attempt in range(CELERY_DEADLOCK_ATTEMPTS):
try:
with tenant_transaction(tenant_id):
# Process resource
resource_uid = finding.resource_uid
if resource_uid not in resource_cache:
# Get or create the resource
resource_instance, _ = Resource.objects.get_or_create(
tenant_id=tenant_id,
provider=provider_instance,
uid=resource_uid,
defaults={
"region": finding.region,
"service": finding.service_name,
"type": finding.resource_type,
"name": finding.resource_name,
},
)
resource_cache[resource_uid] = resource_instance
else:
resource_instance = resource_cache[resource_uid]
# Update resource fields if necessary
updated_fields = []
if resource_instance.region != finding.region:
resource_instance.region = finding.region
updated_fields.append("region")
if resource_instance.service != finding.service_name:
resource_instance.service = finding.service_name
updated_fields.append("service")
if resource_instance.type != finding.resource_type:
resource_instance.type = finding.resource_type
updated_fields.append("type")
if updated_fields:
resource_instance.save(update_fields=updated_fields)
# Update resource fields if necessary
updated_fields = []
if resource_instance.region != finding.region:
resource_instance.region = finding.region
updated_fields.append("region")
if resource_instance.service != finding.service_name:
resource_instance.service = finding.service_name
updated_fields.append("service")
if resource_instance.type != finding.resource_type:
resource_instance.type = finding.resource_type
updated_fields.append("type")
if updated_fields:
with tenant_transaction(tenant_id):
resource_instance.save(update_fields=updated_fields)
except (OperationalError, IntegrityError) as db_err:
if attempt < CELERY_DEADLOCK_ATTEMPTS - 1:
logger.warning(
f"{'Deadlock error' if isinstance(db_err, OperationalError) else 'Integrity error'} "
f"detected when processing resource {resource_uid} on scan {scan_id}. Retrying..."
)
time.sleep(0.1 * (2**attempt))
continue
else:
raise db_err
# Update tags
tags = []
# Update tags
tags = []
with tenant_transaction(tenant_id):
for key, value in finding.resource_tags.items():
tag_key = (key, value)
if tag_key not in tag_cache:
@@ -200,11 +216,10 @@ def perform_prowler_scan(
tags.append(tag_instance)
resource_instance.upsert_or_delete_tags(tags=tags)
unique_resources.add(
(resource_instance.uid, resource_instance.region)
)
unique_resources.add((resource_instance.uid, resource_instance.region))
# Process finding
# Process finding
with tenant_transaction(tenant_id):
finding_uid = finding.uid
if finding_uid not in last_status_cache:
most_recent_finding = (
@@ -241,15 +256,15 @@ def perform_prowler_scan(
)
finding_instance.add_resources([resource_instance])
# Update compliance data if applicable
if not generate_compliance or finding.status.value == "MUTED":
continue
# Update compliance data if applicable
if not generate_compliance or finding.status.value == "MUTED":
continue
region_dict = check_status_by_region.setdefault(finding.region, {})
current_status = region_dict.get(finding.check_id)
if current_status == "FAIL":
continue
region_dict[finding.check_id] = finding.status.value
region_dict = check_status_by_region.setdefault(finding.region, {})
current_status = region_dict.get(finding.check_id)
if current_status == "FAIL":
continue
region_dict[finding.check_id] = finding.status.value
# Update scan progress
with tenant_transaction(tenant_id):