feat(integrations): implement AWS Security Hub integration (#8365)

This commit is contained in:
Adrián Jesús Peña Rodríguez
2025-08-25 15:53:48 +02:00
committed by GitHub
parent d457166a0c
commit 83242da0ab
15 changed files with 1670 additions and 104 deletions

View File

@@ -6,6 +6,7 @@ All notable changes to the **Prowler API** are documented in this file.
### Added
- Lighthouse support for OpenAI GPT-5 [(#8527)](https://github.com/prowler-cloud/prowler/pull/8527)
- Integration with Amazon Security Hub, enabling sending findings to Security Hub [(#8365)](https://github.com/prowler-cloud/prowler/pull/8365)
## [1.11.0] (Prowler 5.10.0)

View File

@@ -38,7 +38,7 @@ name = "prowler-api"
package-mode = false
# Needed for the SDK compatibility
requires-python = ">=3.11,<3.13"
version = "1.11.0"
version = "1.12.0"
[project.scripts]
celery = "src.backend.config.settings.celery"

View File

@@ -0,0 +1,16 @@
# Generated by Django 5.1.10 on 2025-08-20 08:24
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("api", "0046_lighthouse_gpt5"),
]
operations = [
migrations.RemoveConstraint(
model_name="integration",
name="unique_configuration_per_tenant",
),
]

View File

@@ -1372,10 +1372,6 @@ class Integration(RowLevelSecurityProtectedModel):
db_table = "integrations"
constraints = [
models.UniqueConstraint(
fields=("configuration", "tenant"),
name="unique_configuration_per_tenant",
),
RowLevelSecurityConstraint(
field="tenant_id",
name="rls_on_%(class)s",

View File

@@ -1,7 +1,7 @@
openapi: 3.0.3
info:
title: Prowler API
version: 1.11.0
version: 1.12.0
description: |-
Prowler API specification.
@@ -8918,6 +8918,19 @@ components:
default: output
required:
- bucket_name
- type: object
title: AWS Security Hub
properties:
send_only_fails:
type: boolean
default: false
description: If true, only findings with status 'FAIL' will
be sent to Security Hub.
archive_previous_findings:
type: boolean
default: false
description: If true, archives findings that are not present in
the current execution.
credentials:
oneOf:
- type: object
@@ -9064,6 +9077,19 @@ components:
default: output
required:
- bucket_name
- type: object
title: AWS Security Hub
properties:
send_only_fails:
type: boolean
default: false
description: If true, only findings with status 'FAIL' will
be sent to Security Hub.
archive_previous_findings:
type: boolean
default: false
description: If true, archives findings that are not present
in the current execution.
credentials:
oneOf:
- type: object
@@ -9225,6 +9251,19 @@ components:
default: output
required:
- bucket_name
- type: object
title: AWS Security Hub
properties:
send_only_fails:
type: boolean
default: false
description: If true, only findings with status 'FAIL' will
be sent to Security Hub.
archive_previous_findings:
type: boolean
default: false
description: If true, archives findings that are not present in
the current execution.
credentials:
oneOf:
- type: object
@@ -9777,6 +9816,10 @@ components:
- gpt-4o
- gpt-4o-mini-2024-07-18
- gpt-4o-mini
- gpt-5-2025-08-07
- gpt-5
- gpt-5-mini-2025-08-07
- gpt-5-mini
type: string
description: |-
Must be one of the supported model names
@@ -9787,6 +9830,10 @@ components:
* `gpt-4o` - GPT-4o Default
* `gpt-4o-mini-2024-07-18` - GPT-4o Mini v2024-07-18
* `gpt-4o-mini` - GPT-4o Mini Default
* `gpt-5-2025-08-07` - GPT-5 v2025-08-07
* `gpt-5` - GPT-5 Default
* `gpt-5-mini-2025-08-07` - GPT-5 Mini v2025-08-07
* `gpt-5-mini` - GPT-5 Mini Default
temperature:
type: number
format: double
@@ -9843,6 +9890,10 @@ components:
- gpt-4o
- gpt-4o-mini-2024-07-18
- gpt-4o-mini
- gpt-5-2025-08-07
- gpt-5
- gpt-5-mini-2025-08-07
- gpt-5-mini
type: string
description: |-
Must be one of the supported model names
@@ -9853,6 +9904,10 @@ components:
* `gpt-4o` - GPT-4o Default
* `gpt-4o-mini-2024-07-18` - GPT-4o Mini v2024-07-18
* `gpt-4o-mini` - GPT-4o Mini Default
* `gpt-5-2025-08-07` - GPT-5 v2025-08-07
* `gpt-5` - GPT-5 Default
* `gpt-5-mini-2025-08-07` - GPT-5 Mini v2025-08-07
* `gpt-5-mini` - GPT-5 Mini Default
temperature:
type: number
format: double
@@ -9915,6 +9970,10 @@ components:
- gpt-4o
- gpt-4o-mini-2024-07-18
- gpt-4o-mini
- gpt-5-2025-08-07
- gpt-5
- gpt-5-mini-2025-08-07
- gpt-5-mini
type: string
description: |-
Must be one of the supported model names
@@ -9925,6 +9984,10 @@ components:
* `gpt-4o` - GPT-4o Default
* `gpt-4o-mini-2024-07-18` - GPT-4o Mini v2024-07-18
* `gpt-4o-mini` - GPT-4o Mini Default
* `gpt-5-2025-08-07` - GPT-5 v2025-08-07
* `gpt-5` - GPT-5 Default
* `gpt-5-mini-2025-08-07` - GPT-5 Mini v2025-08-07
* `gpt-5-mini` - GPT-5 Mini Default
temperature:
type: number
format: double
@@ -9995,6 +10058,10 @@ components:
- gpt-4o
- gpt-4o-mini-2024-07-18
- gpt-4o-mini
- gpt-5-2025-08-07
- gpt-5
- gpt-5-mini-2025-08-07
- gpt-5-mini
type: string
description: |-
Must be one of the supported model names
@@ -10005,6 +10072,10 @@ components:
* `gpt-4o` - GPT-4o Default
* `gpt-4o-mini-2024-07-18` - GPT-4o Mini v2024-07-18
* `gpt-4o-mini` - GPT-4o Mini Default
* `gpt-5-2025-08-07` - GPT-5 v2025-08-07
* `gpt-5` - GPT-5 Default
* `gpt-5-mini-2025-08-07` - GPT-5 Mini v2025-08-07
* `gpt-5-mini` - GPT-5 Mini Default
temperature:
type: number
format: double
@@ -10584,6 +10655,19 @@ components:
default: output
required:
- bucket_name
- type: object
title: AWS Security Hub
properties:
send_only_fails:
type: boolean
default: false
description: If true, only findings with status 'FAIL' will
be sent to Security Hub.
archive_previous_findings:
type: boolean
default: false
description: If true, archives findings that are not present
in the current execution.
credentials:
oneOf:
- type: object
@@ -10779,6 +10863,10 @@ components:
- gpt-4o
- gpt-4o-mini-2024-07-18
- gpt-4o-mini
- gpt-5-2025-08-07
- gpt-5
- gpt-5-mini-2025-08-07
- gpt-5-mini
type: string
description: |-
Must be one of the supported model names
@@ -10789,6 +10877,10 @@ components:
* `gpt-4o` - GPT-4o Default
* `gpt-4o-mini-2024-07-18` - GPT-4o Mini v2024-07-18
* `gpt-4o-mini` - GPT-4o Mini Default
* `gpt-5-2025-08-07` - GPT-5 v2025-08-07
* `gpt-5` - GPT-5 Default
* `gpt-5-mini-2025-08-07` - GPT-5 Mini v2025-08-07
* `gpt-5-mini` - GPT-5 Mini Default
temperature:
type: number
format: double

View File

@@ -11,6 +11,7 @@ from api.models import Integration, Invitation, Processor, Provider, Resource
from api.v1.serializers import FindingMetadataSerializer
from prowler.providers.aws.aws_provider import AwsProvider
from prowler.providers.aws.lib.s3.s3 import S3
from prowler.providers.aws.lib.security_hub.security_hub import SecurityHub
from prowler.providers.azure.azure_provider import AzureProvider
from prowler.providers.common.models import Connection
from prowler.providers.gcp.gcp_provider import GcpProvider
@@ -196,7 +197,34 @@ def prowler_integration_connection_test(integration: Integration) -> Connection:
elif (
integration.integration_type == Integration.IntegrationChoices.AWS_SECURITY_HUB
):
pass
# Get the provider associated with this integration
provider_relationship = integration.integrationproviderrelationship_set.first()
if not provider_relationship:
return Connection(
is_connected=False, error="No provider associated with this integration"
)
credentials = (
integration.credentials
if integration.credentials
else provider_relationship.provider.secret.secret
)
connection = SecurityHub.test_connection(
aws_account_id=provider_relationship.provider.uid,
raise_on_exception=False,
**credentials,
)
# Only save regions if connection is successful
if connection.is_connected:
regions_status = {r: True for r in connection.enabled_regions}
regions_status.update({r: False for r in connection.disabled_regions})
# Save regions information in the integration configuration
integration.configuration["regions"] = regions_status
integration.save()
return connection
elif integration.integration_type == Integration.IntegrationChoices.JIRA:
pass
elif integration.integration_type == Integration.IntegrationChoices.SLACK:

View File

@@ -52,6 +52,21 @@ class S3ConfigSerializer(BaseValidateSerializer):
resource_name = "integrations"
class SecurityHubConfigSerializer(BaseValidateSerializer):
send_only_fails = serializers.BooleanField(default=False)
archive_previous_findings = serializers.BooleanField(default=False)
regions = serializers.DictField(default=dict, read_only=True)
def to_internal_value(self, data):
validated_data = super().to_internal_value(data)
# Always initialize regions as empty dict
validated_data["regions"] = {}
return validated_data
class Meta:
resource_name = "integrations"
class AWSCredentialSerializer(BaseValidateSerializer):
role_arn = serializers.CharField(required=False)
external_id = serializers.CharField(required=False)
@@ -146,6 +161,22 @@ class IntegrationCredentialField(serializers.JSONField):
},
"required": ["bucket_name"],
},
{
"type": "object",
"title": "AWS Security Hub",
"properties": {
"send_only_fails": {
"type": "boolean",
"default": False,
"description": "If true, only findings with status 'FAIL' will be sent to Security Hub.",
},
"archive_previous_findings": {
"type": "boolean",
"default": False,
"description": "If true, archives findings that are not present in the current execution.",
},
},
},
]
}
)

View File

@@ -46,6 +46,7 @@ from api.v1.serializer_utils.integrations import (
IntegrationConfigField,
IntegrationCredentialField,
S3ConfigSerializer,
SecurityHubConfigSerializer,
)
from api.v1.serializer_utils.processors import ProcessorConfigField
from api.v1.serializer_utils.providers import ProviderSecretField
@@ -1951,11 +1952,42 @@ class ScheduleDailyCreateSerializer(serializers.Serializer):
class BaseWriteIntegrationSerializer(BaseWriteSerializer):
def validate(self, attrs):
if Integration.objects.filter(
if (
attrs.get("integration_type") == Integration.IntegrationChoices.AMAZON_S3
and Integration.objects.filter(
configuration=attrs.get("configuration")
).exists():
).exists()
):
raise serializers.ValidationError(
{"name": "This integration already exists."}
{"configuration": "This integration already exists."}
)
# Check if any provider already has a SecurityHub integration
integration_type = attrs.get("integration_type")
if hasattr(self, "instance") and self.instance and not integration_type:
integration_type = self.instance.integration_type
if (
integration_type == Integration.IntegrationChoices.AWS_SECURITY_HUB
and "providers" in attrs
):
providers = attrs.get("providers", [])
tenant_id = self.context.get("tenant_id")
for provider in providers:
# For updates, exclude the current instance from the check
query = IntegrationProviderRelationship.objects.filter(
provider=provider,
integration__integration_type=Integration.IntegrationChoices.AWS_SECURITY_HUB,
tenant_id=tenant_id,
)
if hasattr(self, "instance") and self.instance:
query = query.exclude(integration=self.instance)
if query.exists():
raise serializers.ValidationError(
{
"providers": f"Provider {provider.id} already has a Security Hub integration. Only one Security Hub integration is allowed per provider."
}
)
return super().validate(attrs)
@@ -1970,14 +2002,22 @@ class BaseWriteIntegrationSerializer(BaseWriteSerializer):
if integration_type == Integration.IntegrationChoices.AMAZON_S3:
config_serializer = S3ConfigSerializer
credentials_serializers = [AWSCredentialSerializer]
# TODO: This will be required for AWS Security Hub
# if providers and not all(
# provider.provider == Provider.ProviderChoices.AWS
# for provider in providers
# ):
# raise serializers.ValidationError(
# {"providers": "All providers must be AWS for the S3 integration."}
# )
elif integration_type == Integration.IntegrationChoices.AWS_SECURITY_HUB:
if providers:
if len(providers) > 1:
raise serializers.ValidationError(
{
"providers": "Only one provider is supported for the Security Hub integration."
}
)
if providers[0].provider != Provider.ProviderChoices.AWS:
raise serializers.ValidationError(
{
"providers": "The provider must be AWS type for the Security Hub integration."
}
)
config_serializer = SecurityHubConfigSerializer
credentials_serializers = [AWSCredentialSerializer]
else:
raise serializers.ValidationError(
{
@@ -2077,6 +2117,16 @@ class IntegrationCreateSerializer(BaseWriteIntegrationSerializer):
configuration = attrs.get("configuration")
credentials = attrs.get("credentials")
if (
not providers
and integration_type == Integration.IntegrationChoices.AWS_SECURITY_HUB
):
raise serializers.ValidationError(
{
"providers": "At least one provider is required for the Security Hub integration."
}
)
self.validate_integration_data(
integration_type, providers, configuration, credentials
)
@@ -2131,16 +2181,15 @@ class IntegrationUpdateSerializer(BaseWriteIntegrationSerializer):
}
def validate(self, attrs):
super().validate(attrs)
integration_type = self.instance.integration_type
providers = attrs.get("providers")
configuration = attrs.get("configuration") or self.instance.configuration
credentials = attrs.get("credentials") or self.instance.credentials
validated_attrs = super().validate(attrs)
self.validate_integration_data(
integration_type, providers, configuration, credentials
)
validated_attrs = super().validate(attrs)
return validated_attrs
def update(self, instance, validated_data):
@@ -2155,6 +2204,13 @@ class IntegrationUpdateSerializer(BaseWriteIntegrationSerializer):
]
IntegrationProviderRelationship.objects.bulk_create(new_relationships)
# Preserve regions field for Security Hub integrations
if instance.integration_type == Integration.IntegrationChoices.AWS_SECURITY_HUB:
if "configuration" in validated_data:
# Preserve the existing regions field if it exists
existing_regions = instance.configuration.get("regions", {})
validated_data["configuration"]["regions"] = existing_regions
return super().update(instance, validated_data)

View File

@@ -293,7 +293,7 @@ class SchemaView(SpectacularAPIView):
def get(self, request, *args, **kwargs):
spectacular_settings.TITLE = "Prowler API"
spectacular_settings.VERSION = "1.11.0"
spectacular_settings.VERSION = "1.12.0"
spectacular_settings.DESCRIPTION = (
"Prowler API specification.\n\nThis file is auto-generated."
)

View File

@@ -2,15 +2,21 @@ import os
from glob import glob
from celery.utils.log import get_task_logger
from config.django.base import DJANGO_FINDINGS_BATCH_SIZE
from tasks.utils import batched
from api.db_utils import rls_transaction
from api.models import Integration
from api.models import Finding, Integration, Provider
from api.utils import initialize_prowler_provider
from prowler.lib.outputs.asff.asff import ASFF
from prowler.lib.outputs.compliance.generic.generic import GenericCompliance
from prowler.lib.outputs.csv.csv import CSV
from prowler.lib.outputs.finding import Finding as FindingOutput
from prowler.lib.outputs.html.html import HTML
from prowler.lib.outputs.ocsf.ocsf import OCSF
from prowler.providers.aws.aws_provider import AwsProvider
from prowler.providers.aws.lib.s3.s3 import S3
from prowler.providers.aws.lib.security_hub.security_hub import SecurityHub
from prowler.providers.common.models import Connection
logger = get_task_logger(__name__)
@@ -154,3 +160,265 @@ def upload_s3_integration(
except Exception as e:
logger.error(f"S3 integrations failed for provider {provider_id}: {str(e)}")
return False
def get_security_hub_client_from_integration(
integration: Integration, tenant_id: str, findings: list
) -> tuple[bool, SecurityHub | Connection]:
"""
Create and return a SecurityHub client using AWS credentials from an integration.
Args:
integration (Integration): The integration to get the Security Hub client from.
tenant_id (str): The tenant identifier.
findings (list): List of findings in ASFF format to send to Security Hub.
Returns:
tuple[bool, SecurityHub | Connection]: A tuple containing a boolean indicating
if the connection was successful and the SecurityHub client or connection object.
"""
# Get the provider associated with this integration
with rls_transaction(tenant_id):
provider_relationship = integration.integrationproviderrelationship_set.first()
if not provider_relationship:
return Connection(
is_connected=False, error="No provider associated with this integration"
)
provider_uid = provider_relationship.provider.uid
provider_secret = provider_relationship.provider.secret.secret
credentials = (
integration.credentials if integration.credentials else provider_secret
)
connection = SecurityHub.test_connection(
aws_account_id=provider_uid,
raise_on_exception=False,
**credentials,
)
if connection.is_connected:
all_security_hub_regions = AwsProvider.get_available_aws_service_regions(
"securityhub", connection.partition
)
# Create regions status dictionary
regions_status = {}
for region in set(all_security_hub_regions):
regions_status[region] = region in connection.enabled_regions
# Save regions information in the integration configuration
with rls_transaction(tenant_id):
integration.configuration["regions"] = regions_status
integration.save()
# Create SecurityHub client with all necessary parameters
security_hub = SecurityHub(
aws_account_id=provider_uid,
findings=findings,
send_only_fails=integration.configuration.get("send_only_fails", False),
aws_security_hub_available_regions=list(connection.enabled_regions),
**credentials,
)
return True, security_hub
return False, connection
def upload_security_hub_integration(
tenant_id: str, provider_id: str, scan_id: str
) -> bool:
"""
Upload findings to AWS Security Hub using configured integrations.
This function retrieves findings from the database, transforms them to ASFF format,
and sends them to AWS Security Hub using the configured integration credentials.
Args:
tenant_id (str): The tenant identifier.
provider_id (str): The provider identifier.
scan_id (str): The scan identifier for which to send findings.
Returns:
bool: True if all integrations executed successfully, False otherwise.
"""
logger.info(f"Processing Security Hub integrations for provider {provider_id}")
try:
with rls_transaction(tenant_id):
# Get Security Hub integrations for this provider
integrations = list(
Integration.objects.filter(
integrationproviderrelationship__provider_id=provider_id,
integration_type=Integration.IntegrationChoices.AWS_SECURITY_HUB,
enabled=True,
)
)
if not integrations:
logger.error(
f"No Security Hub integrations found for provider {provider_id}"
)
return False
# Get the provider object
provider = Provider.objects.get(id=provider_id)
# Initialize prowler provider for finding transformation
prowler_provider = initialize_prowler_provider(provider)
# Process each Security Hub integration
integration_executions = 0
total_findings_sent = {} # Track findings sent per integration
for integration in integrations:
try:
# Initialize Security Hub client for this integration
# We'll create the client once and reuse it for all batches
security_hub_client = None
send_only_fails = integration.configuration.get(
"send_only_fails", False
)
total_findings_sent[integration.id] = 0
# Process findings in batches to avoid memory issues
has_findings = False
batch_number = 0
with rls_transaction(tenant_id):
qs = (
Finding.all_objects.filter(tenant_id=tenant_id, scan_id=scan_id)
.order_by("uid")
.iterator()
)
for batch, _ in batched(qs, DJANGO_FINDINGS_BATCH_SIZE):
batch_number += 1
has_findings = True
# Transform findings for this batch
transformed_findings = [
FindingOutput.transform_api_finding(
finding, prowler_provider
)
for finding in batch
]
# Convert to ASFF format
asff_transformer = ASFF(
findings=transformed_findings,
file_path="",
file_extension="json",
)
asff_transformer.transform(transformed_findings)
# Get the batch of ASFF findings
batch_asff_findings = asff_transformer.data
if batch_asff_findings:
# Create Security Hub client for first batch or reuse existing
if not security_hub_client:
connected, security_hub = (
get_security_hub_client_from_integration(
integration, tenant_id, batch_asff_findings
)
)
if not connected:
logger.error(
f"Security Hub connection failed for integration {integration.id}: {security_hub.error}"
)
integration.connected = False
integration.save()
break # Skip this integration
security_hub_client = security_hub
logger.info(
f"Sending {'fail' if send_only_fails else 'all'} findings to Security Hub via integration {integration.id}"
)
else:
# Update findings in existing client for this batch
security_hub_client._findings_per_region = (
security_hub_client.filter(
batch_asff_findings, send_only_fails
)
)
# Send this batch to Security Hub
try:
findings_sent = (
security_hub_client.batch_send_to_security_hub()
)
total_findings_sent[integration.id] += findings_sent
if findings_sent > 0:
logger.debug(
f"Sent batch {batch_number} with {findings_sent} findings to Security Hub"
)
except Exception as batch_error:
logger.error(
f"Failed to send batch {batch_number} to Security Hub: {str(batch_error)}"
)
# Clear memory after processing each batch
asff_transformer._data.clear()
del batch_asff_findings
del transformed_findings
if not has_findings:
logger.info(
f"No findings to send to Security Hub for scan {scan_id}"
)
integration_executions += 1
elif security_hub_client:
if total_findings_sent[integration.id] > 0:
logger.info(
f"Successfully sent {total_findings_sent[integration.id]} total findings to Security Hub via integration {integration.id}"
)
integration_executions += 1
else:
logger.warning(
f"No findings were sent to Security Hub via integration {integration.id}"
)
# Archive previous findings if configured to do so
if integration.configuration.get(
"archive_previous_findings", False
):
logger.info(
f"Archiving previous findings in Security Hub via integration {integration.id}"
)
try:
findings_archived = (
security_hub_client.archive_previous_findings()
)
logger.info(
f"Successfully archived {findings_archived} previous findings in Security Hub"
)
except Exception as archive_error:
logger.warning(
f"Failed to archive previous findings: {str(archive_error)}"
)
except Exception as e:
logger.error(
f"Security Hub integration {integration.id} failed: {str(e)}"
)
continue
result = integration_executions == len(integrations)
if result:
logger.info(
f"All Security Hub integrations completed successfully for provider {provider_id}"
)
else:
logger.error(
f"Some Security Hub integrations failed for provider {provider_id}"
)
return result
except Exception as e:
logger.error(
f"Security Hub integrations failed for provider {provider_id}: {str(e)}"
)
return False

View File

@@ -21,7 +21,10 @@ from tasks.jobs.export import (
_generate_output_directory,
_upload_to_s3,
)
from tasks.jobs.integrations import upload_s3_integration
from tasks.jobs.integrations import (
upload_s3_integration,
upload_security_hub_integration,
)
from tasks.jobs.scan import (
aggregate_findings,
create_compliance_requirements,
@@ -62,6 +65,7 @@ def _perform_scan_complete_tasks(tenant_id: str, scan_id: str, provider_id: str)
check_integrations_task.si(
tenant_id=tenant_id,
provider_id=provider_id,
scan_id=scan_id,
),
).apply_async()
@@ -323,7 +327,11 @@ def generate_outputs_task(scan_id: str, provider_id: str, tenant_id: str):
ScanSummary.objects.filter(scan_id=scan_id)
)
qs = Finding.all_objects.filter(scan_id=scan_id).order_by("uid").iterator()
qs = (
Finding.all_objects.filter(tenant_id=tenant_id, scan_id=scan_id)
.order_by("uid")
.iterator()
)
for batch, is_last in batched(qs, DJANGO_FINDINGS_BATCH_SIZE):
fos = [FindingOutput.transform_api_finding(f, prowler_provider) for f in batch]
@@ -474,17 +482,19 @@ def check_lighthouse_connection_task(lighthouse_config_id: str, tenant_id: str =
@shared_task(name="integration-check")
def check_integrations_task(tenant_id: str, provider_id: str):
def check_integrations_task(tenant_id: str, provider_id: str, scan_id: str = None):
"""
Check and execute all configured integrations for a provider.
Args:
tenant_id (str): The tenant identifier
provider_id (str): The provider identifier
scan_id (str, optional): The scan identifier for integrations that need scan data
"""
logger.info(f"Checking integrations for provider {provider_id}")
try:
integration_tasks = []
with rls_transaction(tenant_id):
integrations = Integration.objects.filter(
integrationproviderrelationship__provider_id=provider_id,
@@ -495,7 +505,16 @@ def check_integrations_task(tenant_id: str, provider_id: str):
logger.info(f"No integrations configured for provider {provider_id}")
return {"integrations_processed": 0}
integration_tasks = []
# Security Hub integration
security_hub_integrations = integrations.filter(
integration_type=Integration.IntegrationChoices.AWS_SECURITY_HUB
)
if security_hub_integrations.exists():
integration_tasks.append(
security_hub_integration_task.s(
tenant_id=tenant_id, provider_id=provider_id, scan_id=scan_id
)
)
# TODO: Add other integration types here
# slack_integrations = integrations.filter(
@@ -541,3 +560,24 @@ def s3_integration_task(
output_directory (str): Path to the directory containing output files
"""
return upload_s3_integration(tenant_id, provider_id, output_directory)
@shared_task(
base=RLSTask,
name="integration-security-hub",
queue="integrations",
)
def security_hub_integration_task(
tenant_id: str,
provider_id: str,
scan_id: str,
):
"""
Process Security Hub integrations for a provider.
Args:
tenant_id (str): The tenant identifier
provider_id (str): The provider identifier
scan_id (str): The scan identifier
"""
return upload_security_hub_integration(tenant_id, provider_id, scan_id)

File diff suppressed because it is too large Load Diff

View File

@@ -7,6 +7,7 @@ from tasks.tasks import (
check_integrations_task,
generate_outputs_task,
s3_integration_task,
security_hub_integration_task,
)
from api.models import Integration
@@ -521,31 +522,68 @@ class TestCheckIntegrationsTask:
enabled=True,
)
@patch("tasks.tasks.security_hub_integration_task")
@patch("tasks.tasks.group")
@patch("tasks.tasks.rls_transaction")
@patch("tasks.tasks.Integration.objects.filter")
def test_check_integrations_s3_success(
self, mock_integration_filter, mock_rls, mock_group
def test_check_integrations_security_hub_success(
self, mock_integration_filter, mock_rls, mock_group, mock_security_hub_task
):
# Mock that we have some integrations
mock_integration_filter.return_value.exists.return_value = True
"""Test that SecurityHub integrations are processed correctly."""
# Mock that we have SecurityHub integrations
mock_integrations = MagicMock()
mock_integrations.exists.return_value = True
# Mock SecurityHub integrations to return existing integrations
mock_security_hub_integrations = MagicMock()
mock_security_hub_integrations.exists.return_value = True
# Set up the filter chain
mock_integration_filter.return_value = mock_integrations
mock_integrations.filter.return_value = mock_security_hub_integrations
# Mock the task signature
mock_task_signature = MagicMock()
mock_security_hub_task.s.return_value = mock_task_signature
# Mock group job
mock_job = MagicMock()
mock_group.return_value = mock_job
# Ensure rls_transaction is mocked
mock_rls.return_value.__enter__.return_value = None
# Since the current implementation doesn't actually create tasks yet (TODO comment),
# we test that no tasks are created but the function returns the correct count
# Execute the function
result = check_integrations_task(
tenant_id=self.tenant_id,
provider_id=self.provider_id,
scan_id="test-scan-id",
)
assert result == {"integrations_processed": 0}
# Should process 1 SecurityHub integration
assert result == {"integrations_processed": 1}
# Verify the integration filter was called
mock_integration_filter.assert_called_once_with(
integrationproviderrelationship__provider_id=self.provider_id,
enabled=True,
)
# group should not be called since no integration tasks are created yet
mock_group.assert_not_called()
# Verify SecurityHub integrations were filtered
mock_integrations.filter.assert_called_once_with(
integration_type=Integration.IntegrationChoices.AWS_SECURITY_HUB
)
# Verify SecurityHub task was created with correct parameters
mock_security_hub_task.s.assert_called_once_with(
tenant_id=self.tenant_id,
provider_id=self.provider_id,
scan_id="test-scan-id",
)
# Verify group was called and job was executed
mock_group.assert_called_once_with([mock_task_signature])
mock_job.apply_async.assert_called_once()
@patch("tasks.tasks.rls_transaction")
@patch("tasks.tasks.Integration.objects.filter")
@@ -598,3 +636,33 @@ class TestCheckIntegrationsTask:
mock_upload.assert_called_once_with(
self.tenant_id, self.provider_id, output_directory
)
@patch("tasks.tasks.upload_security_hub_integration")
def test_security_hub_integration_task_success(self, mock_upload):
"""Test successful SecurityHub integration task execution."""
mock_upload.return_value = True
scan_id = "test-scan-123"
result = security_hub_integration_task(
tenant_id=self.tenant_id,
provider_id=self.provider_id,
scan_id=scan_id,
)
assert result is True
mock_upload.assert_called_once_with(self.tenant_id, self.provider_id, scan_id)
@patch("tasks.tasks.upload_security_hub_integration")
def test_security_hub_integration_task_failure(self, mock_upload):
"""Test SecurityHub integration task handling failure."""
mock_upload.return_value = False
scan_id = "test-scan-123"
result = security_hub_integration_task(
tenant_id=self.tenant_id,
provider_id=self.provider_id,
scan_id=scan_id,
)
assert result is False
mock_upload.assert_called_once_with(self.tenant_id, self.provider_id, scan_id)

View File

@@ -19,6 +19,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Changed
- Refine kisa isms-p compliance mapping [(#8479)](https://github.com/prowler-cloud/prowler/pull/8479)
- Improve AWS Security Hub region check using multiple threads [(#8365)](https://github.com/prowler-cloud/prowler/pull/8365)
### Fixed

View File

@@ -1,4 +1,5 @@
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Optional
@@ -117,7 +118,7 @@ class SecurityHub:
- aws_partition (str): AWS partition (e.g., aws, aws-cn, aws-us-gov) where SecurityHub is deployed.
- findings (list[AWSSecurityFindingFormat]): List of findings to filter and send to Security Hub.
- aws_security_hub_available_regions (list[str]): List of regions where Security Hub is available.
- send_only_fails (bool): Flag indicating whether to send only findings with status 'FAILED'.
- send_only_fails (bool): Flag indicating whether to send only findings with status 'FAIL'.
- role_arn: The ARN of the IAM role to assume.
- session_duration: The duration of the session in seconds, between 900 and 43200.
- external_id: The external ID to use when assuming the IAM role.
@@ -177,7 +178,7 @@ class SecurityHub:
Args:
findings (list[AWSSecurityFindingFormat]): List of findings to filter.
send_only_fails (bool): Flag indicating whether to send only findings with status 'FAILED'.
send_only_fails (bool): Flag indicating whether to send only findings with status 'FAIL'.
Returns:
dict: A dictionary containing findings per region after applying the filtering criteria.
@@ -213,23 +214,24 @@ class SecurityHub:
return findings_per_region
@staticmethod
def verify_enabled_per_region(
aws_security_hub_available_regions: list[str],
def _check_region_security_hub(
region: str,
session: Session,
aws_account_id: str,
aws_partition: str,
) -> dict[str, Session]:
) -> tuple[str, Session | None]:
"""
Filters the given list of regions where AWS Security Hub is enabled and returns a dictionary containing the region and their boto3 client if the region and the Prowler integration is enabled.
Check if Security Hub is enabled in a specific region and if Prowler integration is active.
Args:
aws_security_hub_available_regions (list[str]): List of AWS regions to check for Security Hub integration.
region (str): AWS region to check.
session (Session): AWS session object.
aws_account_id (str): AWS account ID.
aws_partition (str): AWS partition.
Returns:
dict: A dictionary containing enabled regions with SecurityHub clients.
tuple: (region, client or None) - Returns client if enabled, None otherwise.
"""
enabled_regions = {}
for region in aws_security_hub_available_regions:
try:
logger.info(
f"Checking if the {SECURITY_HUB_INTEGRATION_NAME} is enabled in the {region} region."
@@ -246,10 +248,9 @@ class SecurityHub:
logger.warning(
f"Security Hub is enabled in {region} but Prowler integration does not accept findings. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/"
)
return region, None
else:
enabled_regions[region] = session.client(
"securityhub", region_name=region
)
return region, session.client("securityhub", region_name=region)
# Handle all the permissions / configuration errors
except ClientError as client_error:
@@ -268,10 +269,58 @@ class SecurityHub:
logger.error(
f"{client_error.__class__.__name__} -- [{client_error.__traceback__.tb_lineno}]: {client_error}"
)
return region, None
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
)
return region, None
@staticmethod
def verify_enabled_per_region(
aws_security_hub_available_regions: list[str],
session: Session,
aws_account_id: str,
aws_partition: str,
) -> dict[str, Session]:
"""
Filters the given list of regions where AWS Security Hub is enabled and returns a dictionary containing the region and their boto3 client if the region and the Prowler integration is enabled.
Args:
aws_security_hub_available_regions (list[str]): List of AWS regions to check for Security Hub integration.
Returns:
dict: A dictionary containing enabled regions with SecurityHub clients.
"""
enabled_regions = {}
# Use ThreadPoolExecutor to check regions in parallel
with ThreadPoolExecutor(
max_workers=min(len(aws_security_hub_available_regions), 20)
) as executor:
# Submit all region checks
future_to_region = {
executor.submit(
SecurityHub._check_region_security_hub,
region,
session,
aws_account_id,
aws_partition,
): region
for region in aws_security_hub_available_regions
}
# Collect results as they complete
for future in as_completed(future_to_region):
try:
region, client = future.result()
if client is not None:
enabled_regions[region] = client
except Exception as error:
logger.error(
f"Error checking region {future_to_region[future]}: {error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
)
return enabled_regions
def batch_send_to_security_hub(