Compare commits

...

8 Commits

Author SHA1 Message Date
MrCloudSec
c9531483d5 fix: flake 8 2025-11-19 08:56:56 -05:00
Sergio Garcia
0ad7c3ba60 Merge branch 'master' into PRWLR-7853-asff-first-observed-at-is-not-taking-into-account-the-finding-can-exist 2025-11-19 08:52:56 -05:00
MrCloudSec
af069104bf chore: retrieve findings in batches 2025-09-30 16:33:56 -04:00
MrCloudSec
7566fe6072 chore: add filter for current findings 2025-08-27 14:34:45 +02:00
MrCloudSec
ed75293dad chore: add changelog 2025-08-27 13:47:01 +02:00
MrCloudSec
4063a94251 chore: delete try catch 2025-08-27 11:37:19 +02:00
MrCloudSec
bfe5c1c770 chore: revision 2025-08-27 11:34:29 +02:00
MrCloudSec
4df5b878ca fix(securityhub): preserve timestampt of existing findings 2025-08-27 10:58:53 +02:00
6 changed files with 1122 additions and 16 deletions

View File

@@ -72,6 +72,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
- Fix Validation and other errors in Azure provider [(#8915)](https://github.com/prowler-cloud/prowler/pull/8915)
- Update documentation URLs from docs.prowler.cloud to docs.prowler.com [(#9240)](https://github.com/prowler-cloud/prowler/pull/9240)
- Fix file name parsing for checks on Windows [(#9268)](https://github.com/prowler-cloud/prowler/pull/9268)
- Preserve timestamps in existing findings from Security Hub integration [(#8589)](https://github.com/prowler-cloud/prowler/pull/8589)
---

View File

@@ -455,9 +455,9 @@ def prowler():
findings=finding_outputs,
file_path=f"{filename}{json_asff_file_suffix}",
)
generated_outputs["regular"].append(asff_output)
# Write ASFF Finding Object to file
asff_output.batch_write_data_to_file()
if not args.security_hub:
generated_outputs["regular"].append(asff_output)
asff_output.batch_write_data_to_file()
if mode == "json-ocsf":
json_output = OCSF(
@@ -1035,17 +1035,42 @@ def prowler():
send_only_fails=output_options.send_sh_only_fails,
aws_security_hub_available_regions=security_hub_regions,
)
# Send the findings to Security Hub
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub()
if findings_sent_to_security_hub == 0:
if not security_hub._enabled_regions:
print(
f"{Style.BRIGHT}{orange_color}\nNo findings sent to AWS Security Hub.{Style.RESET_ALL}"
f"{Style.BRIGHT}{Fore.RED}\nNo regions with Security Hub enabled with Prowler integration found. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/{Style.RESET_ALL}\n"
)
sys.exit(1)
else:
print(
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent_to_security_hub} findings sent to AWS Security Hub!{Style.RESET_ALL}"
# Get existing findings timestamps to preserve FirstObservedAt, CreatedAt, and UpdatedAt for findings that already exist
existing_findings_timestamps = (
security_hub.get_existing_findings_timestamps()
)
# Create ASFF output with existing timestamps to preserve FirstObservedAt, CreatedAt, and UpdatedAt
asff_output_with_timestamps = ASFF(
findings=finding_outputs,
existing_findings_timestamps=existing_findings_timestamps,
)
# Update Security Hub findings to use the ones with preserved timestamps
security_hub._findings_per_region = security_hub.filter(
asff_output_with_timestamps.data,
output_options.send_sh_only_fails,
)
# Send the findings to Security Hub
findings_sent_to_security_hub = (
security_hub.batch_send_to_security_hub()
)
if findings_sent_to_security_hub == 0:
print(
f"{Style.BRIGHT}{orange_color}\nNo findings sent to AWS Security Hub.{Style.RESET_ALL}"
)
else:
print(
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent_to_security_hub} findings sent to AWS Security Hub!{Style.RESET_ALL}"
)
# Resolve previous fails of Security Hub
if not args.skip_sh_update:
print(
@@ -1063,6 +1088,12 @@ def prowler():
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
)
asff_output_with_timestamps.file_path = (
f"{filename}{json_asff_file_suffix}"
)
generated_outputs["regular"].append(asff_output_with_timestamps)
asff_output_with_timestamps.batch_write_data_to_file()
# Display summary table
if not args.only_logs:
display_summary_table(

View File

@@ -20,6 +20,7 @@ class ASFF(Output):
Attributes:
- _data: A list to store the transformed findings.
- _file_descriptor: A file descriptor to write to file.
- _existing_findings_timestamps: A dictionary mapping finding IDs to their existing timestamps from Security Hub.
Methods:
- transform(findings: list[Finding]) -> None: Transforms a list of findings into ASFF format.
@@ -31,7 +32,30 @@ class ASFF(Output):
- AWS Security Finding Format Syntax: https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-findings-format-syntax.html
"""
def transform(self, findings: list[Finding]) -> None:
def __init__(
self,
findings: list[Finding],
file_path: str = None,
file_extension: str = "",
from_cli: bool = True,
existing_findings_timestamps: dict = None,
) -> None:
"""
Initialize the ASFF output formatter.
Args:
findings: List of findings to transform
file_path: Path to the output file
file_extension: File extension for the output
from_cli: Whether this is being called from CLI
existing_findings_timestamps: Dictionary mapping finding IDs to existing timestamps from Security Hub
"""
self._existing_findings_timestamps = existing_findings_timestamps
super().__init__(findings, file_path, file_extension, from_cli)
def transform(
self, findings: list[Finding], existing_findings_timestamps: dict = None
) -> None:
"""
Transforms a list of findings into AWS Security Finding Format (ASFF).
@@ -39,6 +63,7 @@ class ASFF(Output):
Parameters:
- findings (list[Finding]): A list of Finding objects representing the findings to be transformed.
- existing_findings_timestamps (dict, optional): A dictionary mapping finding IDs to their existing timestamps from Security Hub.
Returns:
- None
@@ -49,18 +74,43 @@ class ASFF(Output):
- It formats timestamps in the required ASFF format.
- It handles compliance frameworks and associated standards for each finding.
- It ensures that the finding status matches the allowed values in ASFF.
- If existing_findings_timestamps is provided, it preserves the original FirstObservedAt and CreatedAt timestamps for findings that already exist in Security Hub.
References:
- AWS Security Hub API Reference: https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
- AWS Security Finding Format Syntax: https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-findings-format-syntax.html
"""
try:
# Use the parameter if provided, otherwise use the stored value
timestamps_to_use = (
existing_findings_timestamps or self._existing_findings_timestamps
)
for finding in findings:
# MANUAL status is not valid in SecurityHub
# https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
if finding.status == "MANUAL":
continue
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
# Generate the finding ID to check if it exists in Security Hub
finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Determine timestamps based on whether the finding already exists in Security Hub
if timestamps_to_use and finding_id in timestamps_to_use:
existing_timestamps = timestamps_to_use[finding_id]
# Preserve original FirstObservedAt and CreatedAt, update UpdatedAt
first_observed_at = existing_timestamps.get(
"FirstObservedAt", current_timestamp
)
created_at = existing_timestamps.get("CreatedAt", current_timestamp)
updated_at = current_timestamp # Always update with current time
else:
# New finding: use current timestamp for all fields
first_observed_at = current_timestamp
created_at = current_timestamp
updated_at = current_timestamp
associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
@@ -72,7 +122,7 @@ class ASFF(Output):
AWSSecurityFindingFormat(
# The following line cannot be changed because it is the format we use to generate unique findings for AWS Security Hub
# If changed some findings could be lost because the unique identifier will be different
Id=f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}",
Id=finding_id,
ProductArn=f"arn:{finding.partition}:securityhub:{finding.region}::product/prowler/prowler",
ProductFields=ProductFields(
ProwlerResourceName=finding.resource_uid,
@@ -84,9 +134,9 @@ class ASFF(Output):
if finding.metadata.CheckType
else ["Software and Configuration Checks"]
),
FirstObservedAt=timestamp,
UpdatedAt=timestamp,
CreatedAt=timestamp,
FirstObservedAt=first_observed_at,
UpdatedAt=updated_at,
CreatedAt=created_at,
Severity=Severity(Label=finding.metadata.Severity.value),
Title=finding.metadata.CheckTitle,
Description=(

View File

@@ -364,6 +364,70 @@ class SecurityHub:
)
return success_count
def get_existing_findings_timestamps(self) -> dict:
"""
Retrieves existing findings from Security Hub that are relevant to the current scan
and returns their timestamps. This function only checks for findings that match
the current scan's check IDs to improve performance, then filters locally by finding IDs.
Returns:
dict: A dictionary mapping finding IDs to their timestamps from Security Hub.
"""
existing_findings_timestamps = {}
logger.info(
"Retrieving existing findings timestamps from Security Hub for current scan."
)
# Get unique check IDs and finding IDs from current scan findings
current_finding_ids = set()
for findings in self._findings_per_region.values():
for finding in findings:
current_finding_ids.add(finding.Id)
if not current_finding_ids:
logger.info("No current scan findings to check timestamps for.")
return existing_findings_timestamps
logger.info(
f"Checking timestamps for {len(current_finding_ids)} findings from current scan."
)
for region in self._findings_per_region.keys():
try:
# Get findings of that region, filtered by current scan's finding IDs
findings_filter = {
"ProductName": [{"Value": "Prowler", "Comparison": "EQUALS"}],
"RecordState": [{"Value": "ACTIVE", "Comparison": "EQUALS"}],
"AwsAccountId": [
{"Value": self._aws_account_id, "Comparison": "EQUALS"}
],
"Region": [{"Value": region, "Comparison": "EQUALS"}],
"Id": [
{"Value": finding_id, "Comparison": "EQUALS"}
for finding_id in current_finding_ids
],
}
get_findings_paginator = self._enabled_regions[region].get_paginator(
"get_findings"
)
# Use the same batching pattern as _send_findings_in_batches
self._retrieve_findings_in_batches(
get_findings_paginator,
findings_filter,
existing_findings_timestamps,
)
except Exception as error:
logger.error(
f"Error retrieving existing findings timestamps from region {region}: {error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
)
logger.info(
f"Retrieved timestamps for {len(existing_findings_timestamps)} existing findings from Security Hub relevant to current scan."
)
return existing_findings_timestamps
def archive_previous_findings(self) -> int:
"""
Checks previous findings in Security Hub to archive them.
@@ -418,6 +482,34 @@ class SecurityHub:
)
return success_count
def _retrieve_findings_in_batches(
self, paginator, findings_filter: dict, existing_findings_timestamps: dict
) -> None:
"""
Retrieves findings from AWS Security Hub in batches using the same pattern as _send_findings_in_batches.
Args:
paginator: The paginator object for get_findings.
findings_filter (dict): The filter to apply when retrieving findings.
existing_findings_timestamps (dict): Dictionary to store the retrieved findings timestamps.
"""
try:
for page in paginator.paginate(
Filters=findings_filter,
PaginationConfig={"PageSize": SECURITY_HUB_MAX_BATCH},
):
for finding in page["Findings"]:
finding_id = finding["Id"]
existing_findings_timestamps[finding_id] = {
"FirstObservedAt": finding.get("FirstObservedAt"),
"CreatedAt": finding.get("CreatedAt"),
"UpdatedAt": finding.get("UpdatedAt"),
}
except Exception as error:
logger.error(
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
)
def _send_findings_in_batches(
self, findings: list[AWSSecurityFindingFormat], region: str
) -> int:

View File

@@ -592,3 +592,697 @@ class TestASFF:
assert ASFF.generate_status("FAIL") == "FAILED"
assert ASFF.generate_status("FAIL", True) == "WARNING"
assert ASFF.generate_status("SOMETHING ELSE") == "NOT_AVAILABLE"
def test_asff_preserves_existing_timestamps(self):
"""Test that ASFF preserves existing timestamps for findings that already exist in Security Hub."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Mock existing finding timestamps from Security Hub
finding_id = f"prowler-{finding.metadata.CheckID}-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}-{hash_sha512(finding.resource_uid)}"
existing_timestamps = {
finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
)
expected = AWSSecurityFindingFormat(
Id=finding_id,
ProductArn=f"arn:{AWS_COMMERCIAL_PARTITION}:securityhub:{AWS_REGION_EU_WEST_1}::product/prowler/prowler",
ProductFields=ProductFields(
ProviderVersion=prowler_version,
ProwlerResourceName=finding.resource_uid,
),
GeneratorId="prowler-" + finding.metadata.CheckID,
AwsAccountId=AWS_ACCOUNT_NUMBER,
Types=finding.metadata.CheckType,
FirstObservedAt="2023-01-01T00:00:00Z", # Should preserve existing timestamp
UpdatedAt=current_timestamp, # Should update with current timestamp
CreatedAt="2023-01-01T00:00:00Z", # Should preserve existing timestamp
Severity=Severity(Label=finding.metadata.Severity.value),
Title=finding.metadata.CheckTitle,
Resources=[
Resource(
Id=finding.resource_uid,
Type=finding.metadata.ResourceType,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags={"key1": "value1"},
)
],
Compliance=Compliance(
Status=ASFF.generate_status(status),
RelatedRequirements=compliance_summary,
AssociatedStandards=associated_standards,
),
Remediation=Remediation(
Recommendation=Recommendation(
Text=finding.metadata.Remediation.Recommendation.Text,
Url=finding.metadata.Remediation.Recommendation.Url,
)
),
Description=finding.status_extended,
)
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
assert len(asff.data) == 1
asff_finding = asff.data[0]
assert asff_finding == expected
# Verify that FirstObservedAt and CreatedAt are preserved
assert asff_finding.FirstObservedAt == "2023-01-01T00:00:00Z"
assert asff_finding.CreatedAt == "2023-01-01T00:00:00Z"
# Verify that UpdatedAt uses current timestamp
assert asff_finding.UpdatedAt == current_timestamp
def test_asff_uses_current_timestamps_for_new_findings(self):
"""Test that ASFF uses current timestamps for new findings when no existing timestamps are provided."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
associated_standards, compliance_summary = ASFF.format_compliance(
finding.compliance
)
expected = AWSSecurityFindingFormat(
Id=f"prowler-{finding.metadata.CheckID}-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}-{hash_sha512(finding.resource_uid)}",
ProductArn=f"arn:{AWS_COMMERCIAL_PARTITION}:securityhub:{AWS_REGION_EU_WEST_1}::product/prowler/prowler",
ProductFields=ProductFields(
ProviderVersion=prowler_version,
ProwlerResourceName=finding.resource_uid,
),
GeneratorId="prowler-" + finding.metadata.CheckID,
AwsAccountId=AWS_ACCOUNT_NUMBER,
Types=finding.metadata.CheckType,
FirstObservedAt=current_timestamp, # Should use current timestamp for new findings
UpdatedAt=current_timestamp,
CreatedAt=current_timestamp,
Severity=Severity(Label=finding.metadata.Severity.value),
Title=finding.metadata.CheckTitle,
Resources=[
Resource(
Id=finding.resource_uid,
Type=finding.metadata.ResourceType,
Partition=AWS_COMMERCIAL_PARTITION,
Region=AWS_REGION_EU_WEST_1,
Tags={"key1": "value1"},
)
],
Compliance=Compliance(
Status=ASFF.generate_status(status),
RelatedRequirements=compliance_summary,
AssociatedStandards=associated_standards,
),
Remediation=Remediation(
Recommendation=Recommendation(
Text=finding.metadata.Remediation.Recommendation.Text,
Url=finding.metadata.Remediation.Recommendation.Url,
)
),
Description=finding.status_extended,
)
asff = ASFF(findings=[finding])
assert len(asff.data) == 1
asff_finding = asff.data[0]
assert asff_finding == expected
# Verify that all timestamps use current timestamp for new findings
assert asff_finding.FirstObservedAt == current_timestamp
assert asff_finding.UpdatedAt == current_timestamp
assert asff_finding.CreatedAt == current_timestamp
def test_asff_constructor_with_existing_timestamps(self):
"""Test that ASFF constructor properly stores existing timestamps."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID that will be used
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Existing timestamps with the correct ID
existing_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
# Create ASFF output with timestamps in constructor
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Verify that timestamps are preserved
assert len(asff.data) == 1
finding_asff = asff.data[0]
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
def test_asff_constructor_without_existing_timestamps(self):
"""Test that ASFF constructor works without existing timestamps."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Create ASFF output without timestamps parameter
asff = ASFF(findings=[finding])
# Verify that current timestamps are used
assert len(asff.data) == 1
finding_asff = asff.data[0]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.FirstObservedAt == current_timestamp
assert finding_asff.CreatedAt == current_timestamp
assert finding_asff.UpdatedAt == current_timestamp
def test_asff_transform_method_parameter_override(self):
"""Test that transform method parameter overrides constructor parameter."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Constructor timestamps
constructor_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
# Transform method timestamps (different)
transform_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-02-01T00:00:00Z",
"CreatedAt": "2023-02-01T00:00:00Z",
"UpdatedAt": "2023-02-01T00:00:00Z",
}
}
# Create ASFF output with constructor timestamps
asff = ASFF(
findings=[finding], existing_findings_timestamps=constructor_timestamps
)
# Clear existing data and transform with different timestamps
asff._data = []
asff.transform([finding], transform_timestamps)
# Verify that transform method timestamps are used
assert len(asff.data) == 1
finding_asff = asff.data[0]
assert finding_asff.FirstObservedAt == "2023-02-01T00:00:00Z"
assert finding_asff.CreatedAt == "2023-02-01T00:00:00Z"
def test_asff_transform_method_without_parameter(self):
"""Test that transform method uses constructor timestamps when no parameter is provided."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Constructor timestamps
constructor_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
# Create ASFF output with timestamps in constructor
asff = ASFF(
findings=[finding], existing_findings_timestamps=constructor_timestamps
)
# Clear existing data and transform without parameter (should use constructor timestamps)
asff._data = []
asff.transform([finding])
# Verify that constructor timestamps are used
assert len(asff.data) == 1
finding_asff = asff.data[0]
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
def test_asff_handles_partial_existing_timestamps(self):
"""Test that ASFF handles cases where only some timestamp fields exist in the existing data."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Mock existing finding with only FirstObservedAt timestamp
existing_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
# Missing CreatedAt and UpdatedAt
}
}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Should preserve FirstObservedAt, use current timestamp for missing fields
assert len(asff.data) == 1
finding_asff = asff.data[0]
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.CreatedAt == current_timestamp
assert finding_asff.UpdatedAt == current_timestamp
def test_asff_handles_missing_timestamp_fields(self):
"""Test that ASFF handles cases where timestamp fields are missing from the dictionary."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Mock existing finding with missing keys (not None values)
existing_timestamps = {
actual_finding_id: {
# Missing FirstObservedAt, CreatedAt, and UpdatedAt keys entirely
}
}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Should use current timestamp for all fields when keys are missing
assert len(asff.data) == 1
finding_asff = asff.data[0]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.FirstObservedAt == current_timestamp
assert finding_asff.CreatedAt == current_timestamp
assert finding_asff.UpdatedAt == current_timestamp
def test_asff_handles_empty_existing_timestamps_dict(self):
"""Test that ASFF handles empty existing timestamps dictionary."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Empty existing timestamps
existing_timestamps = {}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Should use current timestamp for all fields
assert len(asff.data) == 1
finding_asff = asff.data[0]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.FirstObservedAt == current_timestamp
assert finding_asff.CreatedAt == current_timestamp
assert finding_asff.UpdatedAt == current_timestamp
def test_asff_handles_none_existing_timestamps(self):
"""Test that ASFF handles None existing timestamps parameter."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# None existing timestamps
asff = ASFF(findings=[finding], existing_findings_timestamps=None)
# Should use current timestamp for all fields
assert len(asff.data) == 1
finding_asff = asff.data[0]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.FirstObservedAt == current_timestamp
assert finding_asff.CreatedAt == current_timestamp
assert finding_asff.UpdatedAt == current_timestamp
def test_asff_finding_id_generation_consistency(self):
"""Test that finding ID generation is consistent between timestamp lookup and ASFF creation."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the expected finding ID
expected_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Create existing timestamps with the expected ID
existing_timestamps = {
expected_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Verify that the finding ID matches and timestamps are preserved
assert len(asff.data) == 1
finding_asff = asff.data[0]
assert finding_asff.Id == expected_finding_id
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
def test_asff_multiple_findings_mixed_timestamps(self):
"""Test that ASFF handles multiple findings with mixed existing and new timestamps."""
# Create multiple findings
finding1 = generate_finding_output(
status="PASS",
status_extended="First finding",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource-1",
resource_uid="test-arn-1",
resource_tags={"key1": "value1"},
)
finding2 = generate_finding_output(
status="FAIL",
status_extended="Second finding",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource-2",
resource_uid="test-arn-2",
resource_tags={"key2": "value2"},
)
# Only first finding has existing timestamps
finding1_id = f"prowler-{finding1.metadata.CheckID}-{finding1.account_uid}-{finding1.region}-{hash_sha512(finding1.resource_uid)}"
existing_timestamps = {
finding1_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
asff = ASFF(
findings=[finding1, finding2],
existing_findings_timestamps=existing_timestamps,
)
# Verify that first finding preserves timestamps
assert len(asff.data) == 2
finding1_asff = asff.data[0]
assert finding1_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
assert finding1_asff.CreatedAt == "2023-01-01T00:00:00Z"
# Verify that second finding uses current timestamps
finding2_asff = asff.data[1]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding2_asff.FirstObservedAt == current_timestamp
assert finding2_asff.CreatedAt == current_timestamp
assert finding2_asff.UpdatedAt == current_timestamp
def test_asff_updated_at_always_current(self):
"""Test that UpdatedAt is always set to current timestamp regardless of existing data."""
status = "PASS"
finding = generate_finding_output(
status=status,
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Existing timestamps with old UpdatedAt
existing_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z", # Old timestamp
}
}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Verify that UpdatedAt is current time, not the old timestamp
assert len(asff.data) == 1
finding_asff = asff.data[0]
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.UpdatedAt == current_timestamp
assert finding_asff.UpdatedAt != "2023-01-01T00:00:00Z"
def test_asff_handles_duplicate_finding_ids(self):
"""Test that ASFF handles cases where multiple findings might have the same ID."""
# Create findings with same check ID but different resources
finding1 = generate_finding_output(
status="PASS",
status_extended="First finding",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource-1",
resource_uid="test-arn-1",
resource_tags={"key1": "value1"},
)
finding2 = generate_finding_output(
status="FAIL",
status_extended="Second finding",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource-2",
resource_uid="test-arn-2",
resource_tags={"key2": "value2"},
)
# Create existing timestamps for one finding
finding1_id = f"prowler-{finding1.metadata.CheckID}-{finding1.account_uid}-{finding1.region}-{hash_sha512(finding1.resource_uid)}"
existing_timestamps = {
finding1_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
asff = ASFF(
findings=[finding1, finding2],
existing_findings_timestamps=existing_timestamps,
)
# Verify that findings are processed correctly
assert len(asff.data) == 2
# First finding should preserve timestamps
finding1_asff = asff.data[0]
assert finding1_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
assert finding1_asff.CreatedAt == "2023-01-01T00:00:00Z"
# Second finding should use current timestamps
finding2_asff = asff.data[1]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding2_asff.FirstObservedAt == current_timestamp
assert finding2_asff.CreatedAt == current_timestamp
def test_asff_handles_manual_status_findings(self):
"""Test that ASFF correctly skips MANUAL status findings even with existing timestamps."""
finding = generate_finding_output(
status="MANUAL",
status_extended="This is a manual finding",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Existing timestamps
existing_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# MANUAL findings should be skipped
assert len(asff.data) == 0
def test_asff_timestamp_format_consistency(self):
"""Test that all timestamps use consistent ISO 8601 format."""
finding = generate_finding_output(
status="PASS",
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Generate the actual finding ID
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
# Existing timestamps in different formats
existing_timestamps = {
actual_finding_id: {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
}
asff = ASFF(
findings=[finding], existing_findings_timestamps=existing_timestamps
)
# Verify that all timestamps use consistent format
assert len(asff.data) == 1
finding_asff = asff.data[0]
# Check format: YYYY-MM-DDTHH:MM:SSZ
import re
timestamp_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"
assert re.match(timestamp_pattern, finding_asff.FirstObservedAt)
assert re.match(timestamp_pattern, finding_asff.CreatedAt)
assert re.match(timestamp_pattern, finding_asff.UpdatedAt)
def test_asff_backward_compatibility(self):
"""Test that ASFF maintains backward compatibility when no existing timestamps are provided."""
finding = generate_finding_output(
status="PASS",
status_extended="This is a test",
region=AWS_REGION_EU_WEST_1,
resource_details="Test resource details",
resource_name="test-resource",
resource_uid="test-arn",
resource_tags={"key1": "value1"},
)
# Test with no existing timestamps (backward compatibility)
asff = ASFF(findings=[finding])
# Verify that current timestamps are used
assert len(asff.data) == 1
finding_asff = asff.data[0]
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
assert finding_asff.FirstObservedAt == current_timestamp
assert finding_asff.UpdatedAt == current_timestamp
assert finding_asff.CreatedAt == current_timestamp

View File

@@ -5,7 +5,7 @@ import botocore
import pytest
from boto3 import session
from botocore.client import ClientError
from mock import patch
from mock import MagicMock, patch
from prowler.lib.outputs.asff.asff import ASFF
from prowler.providers.aws.lib.security_hub.exceptions.exceptions import (
@@ -1284,6 +1284,244 @@ class TestSecurityHub:
assert len(connection.enabled_regions) == 1
assert len(connection.disabled_regions) == 1
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
def test_get_existing_findings_timestamps(self, mock_aws_setup):
"""Test that get_existing_findings_timestamps correctly retrieves existing findings timestamps."""
# Mock findings per region
mock_findings = [
MagicMock(
Id="prowler-test-check-123456789012-us-east-1-hash123",
Region="us-east-1",
Compliance=MagicMock(Status="FAILED"),
),
MagicMock(
Id="prowler-test-check-123456789012-us-west-2-hash456",
Region="us-west-2",
Compliance=MagicMock(Status="FAILED"),
),
]
# Mock enabled regions
mock_enabled_regions = {
"us-east-1": MagicMock(),
"us-west-2": MagicMock(),
}
# Mock paginator responses
mock_page1 = {
"Findings": [
{
"Id": "prowler-test-check-123456789012-us-east-1-hash123",
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
}
]
}
mock_page2 = {
"Findings": [
{
"Id": "prowler-test-check-123456789012-us-west-2-hash456",
"FirstObservedAt": "2023-01-15T00:00:00Z",
"CreatedAt": "2023-01-15T00:00:00Z",
"UpdatedAt": "2023-01-15T00:00:00Z",
}
]
}
# Mock paginator
mock_paginator = MagicMock()
mock_paginator.paginate.return_value = [mock_page1, mock_page2]
# Mock Security Hub client
mock_client = MagicMock()
mock_client.get_paginator.return_value = mock_paginator
# Create SecurityHub instance with mocked session
mock_session = MagicMock()
mock_aws_setup.return_value._session.current_session = mock_session
security_hub = SecurityHub(
aws_account_id="123456789012",
aws_partition="aws",
findings=mock_findings,
aws_security_hub_available_regions=["us-east-1", "us-west-2"],
)
# Mock the enabled regions
security_hub._enabled_regions = mock_enabled_regions
security_hub._enabled_regions["us-east-1"] = mock_client
security_hub._enabled_regions["us-west-2"] = mock_client
# Mock findings per region
security_hub._findings_per_region = {
"us-east-1": [mock_findings[0]],
"us-west-2": [mock_findings[1]],
}
# Call the method
result = security_hub.get_existing_findings_timestamps()
# Verify the result
expected_result = {
"prowler-test-check-123456789012-us-east-1-hash123": {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": "2023-01-01T00:00:00Z",
"UpdatedAt": "2023-01-01T00:00:00Z",
},
"prowler-test-check-123456789012-us-west-2-hash456": {
"FirstObservedAt": "2023-01-15T00:00:00Z",
"CreatedAt": "2023-01-15T00:00:00Z",
"UpdatedAt": "2023-01-15T00:00:00Z",
},
}
assert result == expected_result
# Verify that the paginator was called correctly
mock_client.get_paginator.assert_called_with("get_findings")
assert mock_paginator.paginate.call_count == 2
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
def test_get_existing_findings_timestamps_empty_regions(self, mock_aws_setup):
"""Test that get_existing_findings_timestamps handles empty regions correctly."""
# Mock session
mock_session = MagicMock()
mock_aws_setup.return_value._session.current_session = mock_session
security_hub = SecurityHub(
aws_account_id="123456789012",
aws_partition="aws",
findings=[],
aws_security_hub_available_regions=[],
)
# Mock empty findings per region
security_hub._findings_per_region = {}
result = security_hub.get_existing_findings_timestamps()
assert result == {}
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
def test_get_existing_findings_timestamps_with_error(self, mock_aws_setup):
"""Test that get_existing_findings_timestamps handles errors gracefully."""
# Mock findings per region
mock_findings = [
MagicMock(
Id="prowler-test-check-123456789012-us-east-1-hash123",
Region="us-east-1",
Compliance=MagicMock(Status="FAILED"),
),
]
# Mock enabled regions
mock_enabled_regions = {
"us-east-1": MagicMock(),
}
# Mock client that raises an exception
mock_client = MagicMock()
mock_client.get_paginator.side_effect = Exception("Test error")
# Mock session
mock_session = MagicMock()
mock_aws_setup.return_value._session.current_session = mock_session
# Create SecurityHub instance
security_hub = SecurityHub(
aws_account_id="123456789012",
aws_partition="aws",
findings=mock_findings,
aws_security_hub_available_regions=["us-east-1"],
)
# Mock the enabled regions
security_hub._enabled_regions = mock_enabled_regions
security_hub._enabled_regions["us-east-1"] = mock_client
# Mock findings per region
security_hub._findings_per_region = {
"us-east-1": [mock_findings[0]],
}
# Call the method - should not raise exception
result = security_hub.get_existing_findings_timestamps()
# Should return empty dict due to error
assert result == {}
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
def test_get_existing_findings_timestamps_partial_data(self, mock_aws_setup):
"""Test that get_existing_findings_timestamps handles partial timestamp data correctly."""
# Mock findings per region
mock_findings = [
MagicMock(
Id="prowler-test-check-123456789012-us-east-1-hash123",
Region="us-east-1",
Compliance=MagicMock(Status="FAILED"),
),
]
# Mock enabled regions
mock_enabled_regions = {
"us-east-1": MagicMock(),
}
# Mock paginator responses with partial data
mock_page = {
"Findings": [
{
"Id": "prowler-test-check-123456789012-us-east-1-hash123",
"FirstObservedAt": "2023-01-01T00:00:00Z",
# Missing CreatedAt and UpdatedAt
}
]
}
# Mock paginator
mock_paginator = MagicMock()
mock_paginator.paginate.return_value = [mock_page]
# Mock Security Hub client
mock_client = MagicMock()
mock_client.get_paginator.return_value = mock_paginator
# Mock session
mock_session = MagicMock()
mock_aws_setup.return_value._session.current_session = mock_session
# Create SecurityHub instance
security_hub = SecurityHub(
aws_account_id="123456789012",
aws_partition="aws",
findings=mock_findings,
aws_security_hub_available_regions=["us-east-1"],
)
# Mock the enabled regions
security_hub._enabled_regions = mock_enabled_regions
security_hub._enabled_regions["us-east-1"] = mock_client
# Mock findings per region
security_hub._findings_per_region = {
"us-east-1": [mock_findings[0]],
}
# Call the method
result = security_hub.get_existing_findings_timestamps()
# Verify the result handles missing fields gracefully
expected_result = {
"prowler-test-check-123456789012-us-east-1-hash123": {
"FirstObservedAt": "2023-01-01T00:00:00Z",
"CreatedAt": None,
"UpdatedAt": None,
},
}
assert result == expected_result
# Tests for _check_region_security_hub static method
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_check_region_security_hub_success(self):