mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-02-09 15:10:36 +00:00
Compare commits
8 Commits
PROWLER-50
...
PRWLR-7853
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c9531483d5 | ||
|
|
0ad7c3ba60 | ||
|
|
af069104bf | ||
|
|
7566fe6072 | ||
|
|
ed75293dad | ||
|
|
4063a94251 | ||
|
|
bfe5c1c770 | ||
|
|
4df5b878ca |
@@ -72,6 +72,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
|
||||
- Fix Validation and other errors in Azure provider [(#8915)](https://github.com/prowler-cloud/prowler/pull/8915)
|
||||
- Update documentation URLs from docs.prowler.cloud to docs.prowler.com [(#9240)](https://github.com/prowler-cloud/prowler/pull/9240)
|
||||
- Fix file name parsing for checks on Windows [(#9268)](https://github.com/prowler-cloud/prowler/pull/9268)
|
||||
- Preserve timestamps in existing findings from Security Hub integration [(#8589)](https://github.com/prowler-cloud/prowler/pull/8589)
|
||||
|
||||
---
|
||||
|
||||
|
||||
@@ -455,9 +455,9 @@ def prowler():
|
||||
findings=finding_outputs,
|
||||
file_path=f"{filename}{json_asff_file_suffix}",
|
||||
)
|
||||
generated_outputs["regular"].append(asff_output)
|
||||
# Write ASFF Finding Object to file
|
||||
asff_output.batch_write_data_to_file()
|
||||
if not args.security_hub:
|
||||
generated_outputs["regular"].append(asff_output)
|
||||
asff_output.batch_write_data_to_file()
|
||||
|
||||
if mode == "json-ocsf":
|
||||
json_output = OCSF(
|
||||
@@ -1035,17 +1035,42 @@ def prowler():
|
||||
send_only_fails=output_options.send_sh_only_fails,
|
||||
aws_security_hub_available_regions=security_hub_regions,
|
||||
)
|
||||
# Send the findings to Security Hub
|
||||
findings_sent_to_security_hub = security_hub.batch_send_to_security_hub()
|
||||
if findings_sent_to_security_hub == 0:
|
||||
if not security_hub._enabled_regions:
|
||||
print(
|
||||
f"{Style.BRIGHT}{orange_color}\nNo findings sent to AWS Security Hub.{Style.RESET_ALL}"
|
||||
f"{Style.BRIGHT}{Fore.RED}\nNo regions with Security Hub enabled with Prowler integration found. More info: https://docs.prowler.cloud/en/latest/tutorials/aws/securityhub/{Style.RESET_ALL}\n"
|
||||
)
|
||||
sys.exit(1)
|
||||
else:
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent_to_security_hub} findings sent to AWS Security Hub!{Style.RESET_ALL}"
|
||||
# Get existing findings timestamps to preserve FirstObservedAt, CreatedAt, and UpdatedAt for findings that already exist
|
||||
existing_findings_timestamps = (
|
||||
security_hub.get_existing_findings_timestamps()
|
||||
)
|
||||
|
||||
# Create ASFF output with existing timestamps to preserve FirstObservedAt, CreatedAt, and UpdatedAt
|
||||
asff_output_with_timestamps = ASFF(
|
||||
findings=finding_outputs,
|
||||
existing_findings_timestamps=existing_findings_timestamps,
|
||||
)
|
||||
|
||||
# Update Security Hub findings to use the ones with preserved timestamps
|
||||
security_hub._findings_per_region = security_hub.filter(
|
||||
asff_output_with_timestamps.data,
|
||||
output_options.send_sh_only_fails,
|
||||
)
|
||||
|
||||
# Send the findings to Security Hub
|
||||
findings_sent_to_security_hub = (
|
||||
security_hub.batch_send_to_security_hub()
|
||||
)
|
||||
if findings_sent_to_security_hub == 0:
|
||||
print(
|
||||
f"{Style.BRIGHT}{orange_color}\nNo findings sent to AWS Security Hub.{Style.RESET_ALL}"
|
||||
)
|
||||
else:
|
||||
print(
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_sent_to_security_hub} findings sent to AWS Security Hub!{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
# Resolve previous fails of Security Hub
|
||||
if not args.skip_sh_update:
|
||||
print(
|
||||
@@ -1063,6 +1088,12 @@ def prowler():
|
||||
f"{Style.BRIGHT}{Fore.GREEN}\n{findings_archived_in_security_hub} findings archived in AWS Security Hub!{Style.RESET_ALL}"
|
||||
)
|
||||
|
||||
asff_output_with_timestamps.file_path = (
|
||||
f"{filename}{json_asff_file_suffix}"
|
||||
)
|
||||
generated_outputs["regular"].append(asff_output_with_timestamps)
|
||||
asff_output_with_timestamps.batch_write_data_to_file()
|
||||
|
||||
# Display summary table
|
||||
if not args.only_logs:
|
||||
display_summary_table(
|
||||
|
||||
@@ -20,6 +20,7 @@ class ASFF(Output):
|
||||
Attributes:
|
||||
- _data: A list to store the transformed findings.
|
||||
- _file_descriptor: A file descriptor to write to file.
|
||||
- _existing_findings_timestamps: A dictionary mapping finding IDs to their existing timestamps from Security Hub.
|
||||
|
||||
Methods:
|
||||
- transform(findings: list[Finding]) -> None: Transforms a list of findings into ASFF format.
|
||||
@@ -31,7 +32,30 @@ class ASFF(Output):
|
||||
- AWS Security Finding Format Syntax: https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-findings-format-syntax.html
|
||||
"""
|
||||
|
||||
def transform(self, findings: list[Finding]) -> None:
|
||||
def __init__(
|
||||
self,
|
||||
findings: list[Finding],
|
||||
file_path: str = None,
|
||||
file_extension: str = "",
|
||||
from_cli: bool = True,
|
||||
existing_findings_timestamps: dict = None,
|
||||
) -> None:
|
||||
"""
|
||||
Initialize the ASFF output formatter.
|
||||
|
||||
Args:
|
||||
findings: List of findings to transform
|
||||
file_path: Path to the output file
|
||||
file_extension: File extension for the output
|
||||
from_cli: Whether this is being called from CLI
|
||||
existing_findings_timestamps: Dictionary mapping finding IDs to existing timestamps from Security Hub
|
||||
"""
|
||||
self._existing_findings_timestamps = existing_findings_timestamps
|
||||
super().__init__(findings, file_path, file_extension, from_cli)
|
||||
|
||||
def transform(
|
||||
self, findings: list[Finding], existing_findings_timestamps: dict = None
|
||||
) -> None:
|
||||
"""
|
||||
Transforms a list of findings into AWS Security Finding Format (ASFF).
|
||||
|
||||
@@ -39,6 +63,7 @@ class ASFF(Output):
|
||||
|
||||
Parameters:
|
||||
- findings (list[Finding]): A list of Finding objects representing the findings to be transformed.
|
||||
- existing_findings_timestamps (dict, optional): A dictionary mapping finding IDs to their existing timestamps from Security Hub.
|
||||
|
||||
Returns:
|
||||
- None
|
||||
@@ -49,18 +74,43 @@ class ASFF(Output):
|
||||
- It formats timestamps in the required ASFF format.
|
||||
- It handles compliance frameworks and associated standards for each finding.
|
||||
- It ensures that the finding status matches the allowed values in ASFF.
|
||||
- If existing_findings_timestamps is provided, it preserves the original FirstObservedAt and CreatedAt timestamps for findings that already exist in Security Hub.
|
||||
|
||||
References:
|
||||
- AWS Security Hub API Reference: https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
|
||||
- AWS Security Finding Format Syntax: https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-findings-format-syntax.html
|
||||
"""
|
||||
try:
|
||||
# Use the parameter if provided, otherwise use the stored value
|
||||
timestamps_to_use = (
|
||||
existing_findings_timestamps or self._existing_findings_timestamps
|
||||
)
|
||||
|
||||
for finding in findings:
|
||||
# MANUAL status is not valid in SecurityHub
|
||||
# https://docs.aws.amazon.com/securityhub/1.0/APIReference/API_Compliance.html
|
||||
if finding.status == "MANUAL":
|
||||
continue
|
||||
timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
# Generate the finding ID to check if it exists in Security Hub
|
||||
finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Determine timestamps based on whether the finding already exists in Security Hub
|
||||
if timestamps_to_use and finding_id in timestamps_to_use:
|
||||
existing_timestamps = timestamps_to_use[finding_id]
|
||||
# Preserve original FirstObservedAt and CreatedAt, update UpdatedAt
|
||||
first_observed_at = existing_timestamps.get(
|
||||
"FirstObservedAt", current_timestamp
|
||||
)
|
||||
created_at = existing_timestamps.get("CreatedAt", current_timestamp)
|
||||
updated_at = current_timestamp # Always update with current time
|
||||
else:
|
||||
# New finding: use current timestamp for all fields
|
||||
first_observed_at = current_timestamp
|
||||
created_at = current_timestamp
|
||||
updated_at = current_timestamp
|
||||
|
||||
associated_standards, compliance_summary = ASFF.format_compliance(
|
||||
finding.compliance
|
||||
@@ -72,7 +122,7 @@ class ASFF(Output):
|
||||
AWSSecurityFindingFormat(
|
||||
# The following line cannot be changed because it is the format we use to generate unique findings for AWS Security Hub
|
||||
# If changed some findings could be lost because the unique identifier will be different
|
||||
Id=f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}",
|
||||
Id=finding_id,
|
||||
ProductArn=f"arn:{finding.partition}:securityhub:{finding.region}::product/prowler/prowler",
|
||||
ProductFields=ProductFields(
|
||||
ProwlerResourceName=finding.resource_uid,
|
||||
@@ -84,9 +134,9 @@ class ASFF(Output):
|
||||
if finding.metadata.CheckType
|
||||
else ["Software and Configuration Checks"]
|
||||
),
|
||||
FirstObservedAt=timestamp,
|
||||
UpdatedAt=timestamp,
|
||||
CreatedAt=timestamp,
|
||||
FirstObservedAt=first_observed_at,
|
||||
UpdatedAt=updated_at,
|
||||
CreatedAt=created_at,
|
||||
Severity=Severity(Label=finding.metadata.Severity.value),
|
||||
Title=finding.metadata.CheckTitle,
|
||||
Description=(
|
||||
|
||||
@@ -364,6 +364,70 @@ class SecurityHub:
|
||||
)
|
||||
return success_count
|
||||
|
||||
def get_existing_findings_timestamps(self) -> dict:
|
||||
"""
|
||||
Retrieves existing findings from Security Hub that are relevant to the current scan
|
||||
and returns their timestamps. This function only checks for findings that match
|
||||
the current scan's check IDs to improve performance, then filters locally by finding IDs.
|
||||
|
||||
Returns:
|
||||
dict: A dictionary mapping finding IDs to their timestamps from Security Hub.
|
||||
"""
|
||||
existing_findings_timestamps = {}
|
||||
logger.info(
|
||||
"Retrieving existing findings timestamps from Security Hub for current scan."
|
||||
)
|
||||
|
||||
# Get unique check IDs and finding IDs from current scan findings
|
||||
current_finding_ids = set()
|
||||
for findings in self._findings_per_region.values():
|
||||
for finding in findings:
|
||||
current_finding_ids.add(finding.Id)
|
||||
|
||||
if not current_finding_ids:
|
||||
logger.info("No current scan findings to check timestamps for.")
|
||||
return existing_findings_timestamps
|
||||
|
||||
logger.info(
|
||||
f"Checking timestamps for {len(current_finding_ids)} findings from current scan."
|
||||
)
|
||||
|
||||
for region in self._findings_per_region.keys():
|
||||
try:
|
||||
# Get findings of that region, filtered by current scan's finding IDs
|
||||
findings_filter = {
|
||||
"ProductName": [{"Value": "Prowler", "Comparison": "EQUALS"}],
|
||||
"RecordState": [{"Value": "ACTIVE", "Comparison": "EQUALS"}],
|
||||
"AwsAccountId": [
|
||||
{"Value": self._aws_account_id, "Comparison": "EQUALS"}
|
||||
],
|
||||
"Region": [{"Value": region, "Comparison": "EQUALS"}],
|
||||
"Id": [
|
||||
{"Value": finding_id, "Comparison": "EQUALS"}
|
||||
for finding_id in current_finding_ids
|
||||
],
|
||||
}
|
||||
get_findings_paginator = self._enabled_regions[region].get_paginator(
|
||||
"get_findings"
|
||||
)
|
||||
|
||||
# Use the same batching pattern as _send_findings_in_batches
|
||||
self._retrieve_findings_in_batches(
|
||||
get_findings_paginator,
|
||||
findings_filter,
|
||||
existing_findings_timestamps,
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"Error retrieving existing findings timestamps from region {region}: {error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
logger.info(
|
||||
f"Retrieved timestamps for {len(existing_findings_timestamps)} existing findings from Security Hub relevant to current scan."
|
||||
)
|
||||
return existing_findings_timestamps
|
||||
|
||||
def archive_previous_findings(self) -> int:
|
||||
"""
|
||||
Checks previous findings in Security Hub to archive them.
|
||||
@@ -418,6 +482,34 @@ class SecurityHub:
|
||||
)
|
||||
return success_count
|
||||
|
||||
def _retrieve_findings_in_batches(
|
||||
self, paginator, findings_filter: dict, existing_findings_timestamps: dict
|
||||
) -> None:
|
||||
"""
|
||||
Retrieves findings from AWS Security Hub in batches using the same pattern as _send_findings_in_batches.
|
||||
|
||||
Args:
|
||||
paginator: The paginator object for get_findings.
|
||||
findings_filter (dict): The filter to apply when retrieving findings.
|
||||
existing_findings_timestamps (dict): Dictionary to store the retrieved findings timestamps.
|
||||
"""
|
||||
try:
|
||||
for page in paginator.paginate(
|
||||
Filters=findings_filter,
|
||||
PaginationConfig={"PageSize": SECURITY_HUB_MAX_BATCH},
|
||||
):
|
||||
for finding in page["Findings"]:
|
||||
finding_id = finding["Id"]
|
||||
existing_findings_timestamps[finding_id] = {
|
||||
"FirstObservedAt": finding.get("FirstObservedAt"),
|
||||
"CreatedAt": finding.get("CreatedAt"),
|
||||
"UpdatedAt": finding.get("UpdatedAt"),
|
||||
}
|
||||
except Exception as error:
|
||||
logger.error(
|
||||
f"{error.__class__.__name__} -- [{error.__traceback__.tb_lineno}]: {error}"
|
||||
)
|
||||
|
||||
def _send_findings_in_batches(
|
||||
self, findings: list[AWSSecurityFindingFormat], region: str
|
||||
) -> int:
|
||||
|
||||
@@ -592,3 +592,697 @@ class TestASFF:
|
||||
assert ASFF.generate_status("FAIL") == "FAILED"
|
||||
assert ASFF.generate_status("FAIL", True) == "WARNING"
|
||||
assert ASFF.generate_status("SOMETHING ELSE") == "NOT_AVAILABLE"
|
||||
|
||||
def test_asff_preserves_existing_timestamps(self):
|
||||
"""Test that ASFF preserves existing timestamps for findings that already exist in Security Hub."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Mock existing finding timestamps from Security Hub
|
||||
finding_id = f"prowler-{finding.metadata.CheckID}-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}-{hash_sha512(finding.resource_uid)}"
|
||||
existing_timestamps = {
|
||||
finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
associated_standards, compliance_summary = ASFF.format_compliance(
|
||||
finding.compliance
|
||||
)
|
||||
|
||||
expected = AWSSecurityFindingFormat(
|
||||
Id=finding_id,
|
||||
ProductArn=f"arn:{AWS_COMMERCIAL_PARTITION}:securityhub:{AWS_REGION_EU_WEST_1}::product/prowler/prowler",
|
||||
ProductFields=ProductFields(
|
||||
ProviderVersion=prowler_version,
|
||||
ProwlerResourceName=finding.resource_uid,
|
||||
),
|
||||
GeneratorId="prowler-" + finding.metadata.CheckID,
|
||||
AwsAccountId=AWS_ACCOUNT_NUMBER,
|
||||
Types=finding.metadata.CheckType,
|
||||
FirstObservedAt="2023-01-01T00:00:00Z", # Should preserve existing timestamp
|
||||
UpdatedAt=current_timestamp, # Should update with current timestamp
|
||||
CreatedAt="2023-01-01T00:00:00Z", # Should preserve existing timestamp
|
||||
Severity=Severity(Label=finding.metadata.Severity.value),
|
||||
Title=finding.metadata.CheckTitle,
|
||||
Resources=[
|
||||
Resource(
|
||||
Id=finding.resource_uid,
|
||||
Type=finding.metadata.ResourceType,
|
||||
Partition=AWS_COMMERCIAL_PARTITION,
|
||||
Region=AWS_REGION_EU_WEST_1,
|
||||
Tags={"key1": "value1"},
|
||||
)
|
||||
],
|
||||
Compliance=Compliance(
|
||||
Status=ASFF.generate_status(status),
|
||||
RelatedRequirements=compliance_summary,
|
||||
AssociatedStandards=associated_standards,
|
||||
),
|
||||
Remediation=Remediation(
|
||||
Recommendation=Recommendation(
|
||||
Text=finding.metadata.Remediation.Recommendation.Text,
|
||||
Url=finding.metadata.Remediation.Recommendation.Url,
|
||||
)
|
||||
),
|
||||
Description=finding.status_extended,
|
||||
)
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
assert len(asff.data) == 1
|
||||
asff_finding = asff.data[0]
|
||||
|
||||
assert asff_finding == expected
|
||||
# Verify that FirstObservedAt and CreatedAt are preserved
|
||||
assert asff_finding.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert asff_finding.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
# Verify that UpdatedAt uses current timestamp
|
||||
assert asff_finding.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_uses_current_timestamps_for_new_findings(self):
|
||||
"""Test that ASFF uses current timestamps for new findings when no existing timestamps are provided."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
|
||||
associated_standards, compliance_summary = ASFF.format_compliance(
|
||||
finding.compliance
|
||||
)
|
||||
|
||||
expected = AWSSecurityFindingFormat(
|
||||
Id=f"prowler-{finding.metadata.CheckID}-{AWS_ACCOUNT_NUMBER}-{AWS_REGION_EU_WEST_1}-{hash_sha512(finding.resource_uid)}",
|
||||
ProductArn=f"arn:{AWS_COMMERCIAL_PARTITION}:securityhub:{AWS_REGION_EU_WEST_1}::product/prowler/prowler",
|
||||
ProductFields=ProductFields(
|
||||
ProviderVersion=prowler_version,
|
||||
ProwlerResourceName=finding.resource_uid,
|
||||
),
|
||||
GeneratorId="prowler-" + finding.metadata.CheckID,
|
||||
AwsAccountId=AWS_ACCOUNT_NUMBER,
|
||||
Types=finding.metadata.CheckType,
|
||||
FirstObservedAt=current_timestamp, # Should use current timestamp for new findings
|
||||
UpdatedAt=current_timestamp,
|
||||
CreatedAt=current_timestamp,
|
||||
Severity=Severity(Label=finding.metadata.Severity.value),
|
||||
Title=finding.metadata.CheckTitle,
|
||||
Resources=[
|
||||
Resource(
|
||||
Id=finding.resource_uid,
|
||||
Type=finding.metadata.ResourceType,
|
||||
Partition=AWS_COMMERCIAL_PARTITION,
|
||||
Region=AWS_REGION_EU_WEST_1,
|
||||
Tags={"key1": "value1"},
|
||||
)
|
||||
],
|
||||
Compliance=Compliance(
|
||||
Status=ASFF.generate_status(status),
|
||||
RelatedRequirements=compliance_summary,
|
||||
AssociatedStandards=associated_standards,
|
||||
),
|
||||
Remediation=Remediation(
|
||||
Recommendation=Recommendation(
|
||||
Text=finding.metadata.Remediation.Recommendation.Text,
|
||||
Url=finding.metadata.Remediation.Recommendation.Url,
|
||||
)
|
||||
),
|
||||
Description=finding.status_extended,
|
||||
)
|
||||
|
||||
asff = ASFF(findings=[finding])
|
||||
|
||||
assert len(asff.data) == 1
|
||||
asff_finding = asff.data[0]
|
||||
|
||||
assert asff_finding == expected
|
||||
# Verify that all timestamps use current timestamp for new findings
|
||||
assert asff_finding.FirstObservedAt == current_timestamp
|
||||
assert asff_finding.UpdatedAt == current_timestamp
|
||||
assert asff_finding.CreatedAt == current_timestamp
|
||||
|
||||
def test_asff_constructor_with_existing_timestamps(self):
|
||||
"""Test that ASFF constructor properly stores existing timestamps."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID that will be used
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Existing timestamps with the correct ID
|
||||
existing_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
# Create ASFF output with timestamps in constructor
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Verify that timestamps are preserved
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
|
||||
def test_asff_constructor_without_existing_timestamps(self):
|
||||
"""Test that ASFF constructor works without existing timestamps."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Create ASFF output without timestamps parameter
|
||||
asff = ASFF(findings=[finding])
|
||||
|
||||
# Verify that current timestamps are used
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.FirstObservedAt == current_timestamp
|
||||
assert finding_asff.CreatedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_transform_method_parameter_override(self):
|
||||
"""Test that transform method parameter overrides constructor parameter."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Constructor timestamps
|
||||
constructor_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
# Transform method timestamps (different)
|
||||
transform_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-02-01T00:00:00Z",
|
||||
"CreatedAt": "2023-02-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-02-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
# Create ASFF output with constructor timestamps
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=constructor_timestamps
|
||||
)
|
||||
|
||||
# Clear existing data and transform with different timestamps
|
||||
asff._data = []
|
||||
asff.transform([finding], transform_timestamps)
|
||||
|
||||
# Verify that transform method timestamps are used
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
assert finding_asff.FirstObservedAt == "2023-02-01T00:00:00Z"
|
||||
assert finding_asff.CreatedAt == "2023-02-01T00:00:00Z"
|
||||
|
||||
def test_asff_transform_method_without_parameter(self):
|
||||
"""Test that transform method uses constructor timestamps when no parameter is provided."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Constructor timestamps
|
||||
constructor_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
# Create ASFF output with timestamps in constructor
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=constructor_timestamps
|
||||
)
|
||||
|
||||
# Clear existing data and transform without parameter (should use constructor timestamps)
|
||||
asff._data = []
|
||||
asff.transform([finding])
|
||||
|
||||
# Verify that constructor timestamps are used
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
|
||||
def test_asff_handles_partial_existing_timestamps(self):
|
||||
"""Test that ASFF handles cases where only some timestamp fields exist in the existing data."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Mock existing finding with only FirstObservedAt timestamp
|
||||
existing_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
# Missing CreatedAt and UpdatedAt
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Should preserve FirstObservedAt, use current timestamp for missing fields
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.CreatedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_handles_missing_timestamp_fields(self):
|
||||
"""Test that ASFF handles cases where timestamp fields are missing from the dictionary."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Mock existing finding with missing keys (not None values)
|
||||
existing_timestamps = {
|
||||
actual_finding_id: {
|
||||
# Missing FirstObservedAt, CreatedAt, and UpdatedAt keys entirely
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Should use current timestamp for all fields when keys are missing
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.FirstObservedAt == current_timestamp
|
||||
assert finding_asff.CreatedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_handles_empty_existing_timestamps_dict(self):
|
||||
"""Test that ASFF handles empty existing timestamps dictionary."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Empty existing timestamps
|
||||
existing_timestamps = {}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Should use current timestamp for all fields
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.FirstObservedAt == current_timestamp
|
||||
assert finding_asff.CreatedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_handles_none_existing_timestamps(self):
|
||||
"""Test that ASFF handles None existing timestamps parameter."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# None existing timestamps
|
||||
asff = ASFF(findings=[finding], existing_findings_timestamps=None)
|
||||
|
||||
# Should use current timestamp for all fields
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.FirstObservedAt == current_timestamp
|
||||
assert finding_asff.CreatedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_finding_id_generation_consistency(self):
|
||||
"""Test that finding ID generation is consistent between timestamp lookup and ASFF creation."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the expected finding ID
|
||||
expected_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Create existing timestamps with the expected ID
|
||||
existing_timestamps = {
|
||||
expected_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Verify that the finding ID matches and timestamps are preserved
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
assert finding_asff.Id == expected_finding_id
|
||||
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
|
||||
def test_asff_multiple_findings_mixed_timestamps(self):
|
||||
"""Test that ASFF handles multiple findings with mixed existing and new timestamps."""
|
||||
# Create multiple findings
|
||||
finding1 = generate_finding_output(
|
||||
status="PASS",
|
||||
status_extended="First finding",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource-1",
|
||||
resource_uid="test-arn-1",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
finding2 = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="Second finding",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource-2",
|
||||
resource_uid="test-arn-2",
|
||||
resource_tags={"key2": "value2"},
|
||||
)
|
||||
|
||||
# Only first finding has existing timestamps
|
||||
finding1_id = f"prowler-{finding1.metadata.CheckID}-{finding1.account_uid}-{finding1.region}-{hash_sha512(finding1.resource_uid)}"
|
||||
existing_timestamps = {
|
||||
finding1_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding1, finding2],
|
||||
existing_findings_timestamps=existing_timestamps,
|
||||
)
|
||||
|
||||
# Verify that first finding preserves timestamps
|
||||
assert len(asff.data) == 2
|
||||
|
||||
finding1_asff = asff.data[0]
|
||||
assert finding1_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert finding1_asff.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
|
||||
# Verify that second finding uses current timestamps
|
||||
finding2_asff = asff.data[1]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding2_asff.FirstObservedAt == current_timestamp
|
||||
assert finding2_asff.CreatedAt == current_timestamp
|
||||
assert finding2_asff.UpdatedAt == current_timestamp
|
||||
|
||||
def test_asff_updated_at_always_current(self):
|
||||
"""Test that UpdatedAt is always set to current timestamp regardless of existing data."""
|
||||
status = "PASS"
|
||||
finding = generate_finding_output(
|
||||
status=status,
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Existing timestamps with old UpdatedAt
|
||||
existing_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z", # Old timestamp
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Verify that UpdatedAt is current time, not the old timestamp
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
assert finding_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert finding_asff.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt != "2023-01-01T00:00:00Z"
|
||||
|
||||
def test_asff_handles_duplicate_finding_ids(self):
|
||||
"""Test that ASFF handles cases where multiple findings might have the same ID."""
|
||||
# Create findings with same check ID but different resources
|
||||
finding1 = generate_finding_output(
|
||||
status="PASS",
|
||||
status_extended="First finding",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource-1",
|
||||
resource_uid="test-arn-1",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
finding2 = generate_finding_output(
|
||||
status="FAIL",
|
||||
status_extended="Second finding",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource-2",
|
||||
resource_uid="test-arn-2",
|
||||
resource_tags={"key2": "value2"},
|
||||
)
|
||||
|
||||
# Create existing timestamps for one finding
|
||||
finding1_id = f"prowler-{finding1.metadata.CheckID}-{finding1.account_uid}-{finding1.region}-{hash_sha512(finding1.resource_uid)}"
|
||||
existing_timestamps = {
|
||||
finding1_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding1, finding2],
|
||||
existing_findings_timestamps=existing_timestamps,
|
||||
)
|
||||
|
||||
# Verify that findings are processed correctly
|
||||
assert len(asff.data) == 2
|
||||
|
||||
# First finding should preserve timestamps
|
||||
finding1_asff = asff.data[0]
|
||||
assert finding1_asff.FirstObservedAt == "2023-01-01T00:00:00Z"
|
||||
assert finding1_asff.CreatedAt == "2023-01-01T00:00:00Z"
|
||||
|
||||
# Second finding should use current timestamps
|
||||
finding2_asff = asff.data[1]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding2_asff.FirstObservedAt == current_timestamp
|
||||
assert finding2_asff.CreatedAt == current_timestamp
|
||||
|
||||
def test_asff_handles_manual_status_findings(self):
|
||||
"""Test that ASFF correctly skips MANUAL status findings even with existing timestamps."""
|
||||
finding = generate_finding_output(
|
||||
status="MANUAL",
|
||||
status_extended="This is a manual finding",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Existing timestamps
|
||||
existing_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# MANUAL findings should be skipped
|
||||
assert len(asff.data) == 0
|
||||
|
||||
def test_asff_timestamp_format_consistency(self):
|
||||
"""Test that all timestamps use consistent ISO 8601 format."""
|
||||
finding = generate_finding_output(
|
||||
status="PASS",
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Generate the actual finding ID
|
||||
actual_finding_id = f"prowler-{finding.metadata.CheckID}-{finding.account_uid}-{finding.region}-{hash_sha512(finding.resource_uid)}"
|
||||
|
||||
# Existing timestamps in different formats
|
||||
existing_timestamps = {
|
||||
actual_finding_id: {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
}
|
||||
|
||||
asff = ASFF(
|
||||
findings=[finding], existing_findings_timestamps=existing_timestamps
|
||||
)
|
||||
|
||||
# Verify that all timestamps use consistent format
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
|
||||
# Check format: YYYY-MM-DDTHH:MM:SSZ
|
||||
import re
|
||||
|
||||
timestamp_pattern = r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$"
|
||||
|
||||
assert re.match(timestamp_pattern, finding_asff.FirstObservedAt)
|
||||
assert re.match(timestamp_pattern, finding_asff.CreatedAt)
|
||||
assert re.match(timestamp_pattern, finding_asff.UpdatedAt)
|
||||
|
||||
def test_asff_backward_compatibility(self):
|
||||
"""Test that ASFF maintains backward compatibility when no existing timestamps are provided."""
|
||||
finding = generate_finding_output(
|
||||
status="PASS",
|
||||
status_extended="This is a test",
|
||||
region=AWS_REGION_EU_WEST_1,
|
||||
resource_details="Test resource details",
|
||||
resource_name="test-resource",
|
||||
resource_uid="test-arn",
|
||||
resource_tags={"key1": "value1"},
|
||||
)
|
||||
|
||||
# Test with no existing timestamps (backward compatibility)
|
||||
asff = ASFF(findings=[finding])
|
||||
|
||||
# Verify that current timestamps are used
|
||||
assert len(asff.data) == 1
|
||||
finding_asff = asff.data[0]
|
||||
current_timestamp = timestamp_utc.strftime("%Y-%m-%dT%H:%M:%SZ")
|
||||
assert finding_asff.FirstObservedAt == current_timestamp
|
||||
assert finding_asff.UpdatedAt == current_timestamp
|
||||
assert finding_asff.CreatedAt == current_timestamp
|
||||
|
||||
@@ -5,7 +5,7 @@ import botocore
|
||||
import pytest
|
||||
from boto3 import session
|
||||
from botocore.client import ClientError
|
||||
from mock import patch
|
||||
from mock import MagicMock, patch
|
||||
|
||||
from prowler.lib.outputs.asff.asff import ASFF
|
||||
from prowler.providers.aws.lib.security_hub.exceptions.exceptions import (
|
||||
@@ -1284,6 +1284,244 @@ class TestSecurityHub:
|
||||
assert len(connection.enabled_regions) == 1
|
||||
assert len(connection.disabled_regions) == 1
|
||||
|
||||
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
|
||||
def test_get_existing_findings_timestamps(self, mock_aws_setup):
|
||||
"""Test that get_existing_findings_timestamps correctly retrieves existing findings timestamps."""
|
||||
# Mock findings per region
|
||||
mock_findings = [
|
||||
MagicMock(
|
||||
Id="prowler-test-check-123456789012-us-east-1-hash123",
|
||||
Region="us-east-1",
|
||||
Compliance=MagicMock(Status="FAILED"),
|
||||
),
|
||||
MagicMock(
|
||||
Id="prowler-test-check-123456789012-us-west-2-hash456",
|
||||
Region="us-west-2",
|
||||
Compliance=MagicMock(Status="FAILED"),
|
||||
),
|
||||
]
|
||||
|
||||
# Mock enabled regions
|
||||
mock_enabled_regions = {
|
||||
"us-east-1": MagicMock(),
|
||||
"us-west-2": MagicMock(),
|
||||
}
|
||||
|
||||
# Mock paginator responses
|
||||
mock_page1 = {
|
||||
"Findings": [
|
||||
{
|
||||
"Id": "prowler-test-check-123456789012-us-east-1-hash123",
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
}
|
||||
]
|
||||
}
|
||||
mock_page2 = {
|
||||
"Findings": [
|
||||
{
|
||||
"Id": "prowler-test-check-123456789012-us-west-2-hash456",
|
||||
"FirstObservedAt": "2023-01-15T00:00:00Z",
|
||||
"CreatedAt": "2023-01-15T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-15T00:00:00Z",
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Mock paginator
|
||||
mock_paginator = MagicMock()
|
||||
mock_paginator.paginate.return_value = [mock_page1, mock_page2]
|
||||
|
||||
# Mock Security Hub client
|
||||
mock_client = MagicMock()
|
||||
mock_client.get_paginator.return_value = mock_paginator
|
||||
|
||||
# Create SecurityHub instance with mocked session
|
||||
mock_session = MagicMock()
|
||||
mock_aws_setup.return_value._session.current_session = mock_session
|
||||
|
||||
security_hub = SecurityHub(
|
||||
aws_account_id="123456789012",
|
||||
aws_partition="aws",
|
||||
findings=mock_findings,
|
||||
aws_security_hub_available_regions=["us-east-1", "us-west-2"],
|
||||
)
|
||||
|
||||
# Mock the enabled regions
|
||||
security_hub._enabled_regions = mock_enabled_regions
|
||||
security_hub._enabled_regions["us-east-1"] = mock_client
|
||||
security_hub._enabled_regions["us-west-2"] = mock_client
|
||||
|
||||
# Mock findings per region
|
||||
security_hub._findings_per_region = {
|
||||
"us-east-1": [mock_findings[0]],
|
||||
"us-west-2": [mock_findings[1]],
|
||||
}
|
||||
|
||||
# Call the method
|
||||
result = security_hub.get_existing_findings_timestamps()
|
||||
|
||||
# Verify the result
|
||||
expected_result = {
|
||||
"prowler-test-check-123456789012-us-east-1-hash123": {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": "2023-01-01T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-01T00:00:00Z",
|
||||
},
|
||||
"prowler-test-check-123456789012-us-west-2-hash456": {
|
||||
"FirstObservedAt": "2023-01-15T00:00:00Z",
|
||||
"CreatedAt": "2023-01-15T00:00:00Z",
|
||||
"UpdatedAt": "2023-01-15T00:00:00Z",
|
||||
},
|
||||
}
|
||||
|
||||
assert result == expected_result
|
||||
|
||||
# Verify that the paginator was called correctly
|
||||
mock_client.get_paginator.assert_called_with("get_findings")
|
||||
assert mock_paginator.paginate.call_count == 2
|
||||
|
||||
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
|
||||
def test_get_existing_findings_timestamps_empty_regions(self, mock_aws_setup):
|
||||
"""Test that get_existing_findings_timestamps handles empty regions correctly."""
|
||||
# Mock session
|
||||
mock_session = MagicMock()
|
||||
mock_aws_setup.return_value._session.current_session = mock_session
|
||||
|
||||
security_hub = SecurityHub(
|
||||
aws_account_id="123456789012",
|
||||
aws_partition="aws",
|
||||
findings=[],
|
||||
aws_security_hub_available_regions=[],
|
||||
)
|
||||
|
||||
# Mock empty findings per region
|
||||
security_hub._findings_per_region = {}
|
||||
|
||||
result = security_hub.get_existing_findings_timestamps()
|
||||
|
||||
assert result == {}
|
||||
|
||||
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
|
||||
def test_get_existing_findings_timestamps_with_error(self, mock_aws_setup):
|
||||
"""Test that get_existing_findings_timestamps handles errors gracefully."""
|
||||
# Mock findings per region
|
||||
mock_findings = [
|
||||
MagicMock(
|
||||
Id="prowler-test-check-123456789012-us-east-1-hash123",
|
||||
Region="us-east-1",
|
||||
Compliance=MagicMock(Status="FAILED"),
|
||||
),
|
||||
]
|
||||
|
||||
# Mock enabled regions
|
||||
mock_enabled_regions = {
|
||||
"us-east-1": MagicMock(),
|
||||
}
|
||||
|
||||
# Mock client that raises an exception
|
||||
mock_client = MagicMock()
|
||||
mock_client.get_paginator.side_effect = Exception("Test error")
|
||||
|
||||
# Mock session
|
||||
mock_session = MagicMock()
|
||||
mock_aws_setup.return_value._session.current_session = mock_session
|
||||
|
||||
# Create SecurityHub instance
|
||||
security_hub = SecurityHub(
|
||||
aws_account_id="123456789012",
|
||||
aws_partition="aws",
|
||||
findings=mock_findings,
|
||||
aws_security_hub_available_regions=["us-east-1"],
|
||||
)
|
||||
|
||||
# Mock the enabled regions
|
||||
security_hub._enabled_regions = mock_enabled_regions
|
||||
security_hub._enabled_regions["us-east-1"] = mock_client
|
||||
|
||||
# Mock findings per region
|
||||
security_hub._findings_per_region = {
|
||||
"us-east-1": [mock_findings[0]],
|
||||
}
|
||||
|
||||
# Call the method - should not raise exception
|
||||
result = security_hub.get_existing_findings_timestamps()
|
||||
|
||||
# Should return empty dict due to error
|
||||
assert result == {}
|
||||
|
||||
@patch("prowler.providers.aws.lib.security_hub.security_hub.AwsSetUpSession")
|
||||
def test_get_existing_findings_timestamps_partial_data(self, mock_aws_setup):
|
||||
"""Test that get_existing_findings_timestamps handles partial timestamp data correctly."""
|
||||
# Mock findings per region
|
||||
mock_findings = [
|
||||
MagicMock(
|
||||
Id="prowler-test-check-123456789012-us-east-1-hash123",
|
||||
Region="us-east-1",
|
||||
Compliance=MagicMock(Status="FAILED"),
|
||||
),
|
||||
]
|
||||
|
||||
# Mock enabled regions
|
||||
mock_enabled_regions = {
|
||||
"us-east-1": MagicMock(),
|
||||
}
|
||||
|
||||
# Mock paginator responses with partial data
|
||||
mock_page = {
|
||||
"Findings": [
|
||||
{
|
||||
"Id": "prowler-test-check-123456789012-us-east-1-hash123",
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
# Missing CreatedAt and UpdatedAt
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
# Mock paginator
|
||||
mock_paginator = MagicMock()
|
||||
mock_paginator.paginate.return_value = [mock_page]
|
||||
|
||||
# Mock Security Hub client
|
||||
mock_client = MagicMock()
|
||||
mock_client.get_paginator.return_value = mock_paginator
|
||||
|
||||
# Mock session
|
||||
mock_session = MagicMock()
|
||||
mock_aws_setup.return_value._session.current_session = mock_session
|
||||
|
||||
# Create SecurityHub instance
|
||||
security_hub = SecurityHub(
|
||||
aws_account_id="123456789012",
|
||||
aws_partition="aws",
|
||||
findings=mock_findings,
|
||||
aws_security_hub_available_regions=["us-east-1"],
|
||||
)
|
||||
|
||||
# Mock the enabled regions
|
||||
security_hub._enabled_regions = mock_enabled_regions
|
||||
security_hub._enabled_regions["us-east-1"] = mock_client
|
||||
|
||||
# Mock findings per region
|
||||
security_hub._findings_per_region = {
|
||||
"us-east-1": [mock_findings[0]],
|
||||
}
|
||||
|
||||
# Call the method
|
||||
result = security_hub.get_existing_findings_timestamps()
|
||||
|
||||
# Verify the result handles missing fields gracefully
|
||||
expected_result = {
|
||||
"prowler-test-check-123456789012-us-east-1-hash123": {
|
||||
"FirstObservedAt": "2023-01-01T00:00:00Z",
|
||||
"CreatedAt": None,
|
||||
"UpdatedAt": None,
|
||||
},
|
||||
}
|
||||
|
||||
assert result == expected_result
|
||||
|
||||
# Tests for _check_region_security_hub static method
|
||||
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
|
||||
def test_check_region_security_hub_success(self):
|
||||
|
||||
Reference in New Issue
Block a user