feat(ecr_repositories_scan_vulnerabilities_in_latest_image): Minimum severity is configurable (#2736)

This commit is contained in:
Pepe Fagoaga
2023-08-18 09:17:02 +02:00
committed by GitHub
parent ac11c6729b
commit 7c45cb45ae
3 changed files with 279 additions and 12 deletions

View File

@@ -54,6 +54,13 @@ aws:
organizations_enabled_regions: []
organizations_trusted_delegated_administrators: []
# AWS ECR
# ecr_repositories_scan_vulnerabilities_in_latest_image
# CRITICAL
# HIGH
# MEDIUM
ecr_repository_vulnerability_minimum_severity: "MEDIUM"
# Azure Configuration
azure:

View File

@@ -5,6 +5,12 @@ from prowler.providers.aws.services.ecr.ecr_client import ecr_client
class ecr_repositories_scan_vulnerabilities_in_latest_image(Check):
def execute(self):
findings = []
# Get minimun severity to report
minimum_severity = ecr_client.audit_config.get(
"ecr_repository_vulnerability_minimum_severity", "MEDIUM"
)
for registry in ecr_client.registries.values():
for repository in registry.repositories:
# First check if the repository has images
@@ -27,8 +33,23 @@ class ecr_repositories_scan_vulnerabilities_in_latest_image(Check):
report.status_extended = (
f"ECR repository {repository.name} with scan status FAILED."
)
elif image.scan_findings_status != "FAILED":
if image.scan_findings_severity_count and (
elif (
image.scan_findings_status != "FAILED"
and image.scan_findings_severity_count
):
if (
minimum_severity == "CRITICAL"
and image.scan_findings_severity_count.critical
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}."
elif minimum_severity == "HIGH" and (
image.scan_findings_severity_count.critical
or image.scan_findings_severity_count.high
):
report.status = "FAIL"
report.status_extended = f"ECR repository {repository.name} has imageTag {image.latest_tag} scanned with findings: CRITICAL->{image.scan_findings_severity_count.critical}, HIGH->{image.scan_findings_severity_count.high}."
elif minimum_severity == "MEDIUM" and (
image.scan_findings_severity_count.critical
or image.scan_findings_severity_count.high
or image.scan_findings_severity_count.medium

View File

@@ -1,5 +1,4 @@
from datetime import datetime
from re import search
from unittest import mock
from prowler.providers.aws.services.ecr.ecr_service import (
@@ -16,6 +15,7 @@ repository_name = "test_repo"
repository_arn = (
f"arn:aws:ecr:eu-west-1:{AWS_ACCOUNT_NUMBER}:repository/{repository_name}"
)
latest_tag = "test-tag"
repo_policy_public = {
"Version": "2012-10-17",
"Statement": [
@@ -118,7 +118,7 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
@@ -145,11 +145,14 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert search("scanned without findings", result[0].status_extended)
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} scanned without findings."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_image_scanned_with_findings(self):
def test_image_scanned_with_findings_default_severity_MEDIUM(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
@@ -165,7 +168,7 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
@@ -180,6 +183,11 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
rules=[],
)
# Set audit_config
ecr_client.audit_config = {
"ecr_repository_vulnerability_minimum_severity": "MEDIUM"
}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
@@ -192,7 +200,232 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("scanned with findings:", result[0].status_extended)
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} scanned with findings: CRITICAL->{12}, HIGH->{34}, MEDIUM->{7}."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_image_scanned_with_findings_default_severity_HIGH(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=12, high=34, medium=7
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
# Set audit_config
ecr_client.audit_config = {
"ecr_repository_vulnerability_minimum_severity": "HIGH"
}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import (
ecr_repositories_scan_vulnerabilities_in_latest_image,
)
check = ecr_repositories_scan_vulnerabilities_in_latest_image()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} scanned with findings: CRITICAL->{12}, HIGH->{34}."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_image_scanned_with_findings_default_severity_CRITICAL(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=12, high=34, medium=7
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
# Set audit_config
ecr_client.audit_config = {
"ecr_repository_vulnerability_minimum_severity": "CRITICAL"
}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import (
ecr_repositories_scan_vulnerabilities_in_latest_image,
)
check = ecr_repositories_scan_vulnerabilities_in_latest_image()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} scanned with findings: CRITICAL->{12}."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_image_scanned_without_CRITICAL_findings_default_severity_CRITICAL(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=34, medium=7
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
# Set audit_config
ecr_client.audit_config = {
"ecr_repository_vulnerability_minimum_severity": "CRITICAL"
}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import (
ecr_repositories_scan_vulnerabilities_in_latest_image,
)
check = ecr_repositories_scan_vulnerabilities_in_latest_image()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} scanned without findings."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
def test_image_scanned_without_CRITICAL_and_HIGH_findings_default_severity_HIGH(
self,
):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION,
scan_on_push=True,
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="COMPLETE",
scan_findings_severity_count=FindingSeverityCounts(
critical=0, high=0, medium=7
),
)
],
lifecycle_policy=None,
)
],
rules=[],
)
# Set audit_config
ecr_client.audit_config = {
"ecr_repository_vulnerability_minimum_severity": "HIGH"
}
with mock.patch(
"prowler.providers.aws.services.ecr.ecr_service.ECR",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_scan_vulnerabilities_in_latest_image.ecr_repositories_scan_vulnerabilities_in_latest_image import (
ecr_repositories_scan_vulnerabilities_in_latest_image,
)
check = ecr_repositories_scan_vulnerabilities_in_latest_image()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} scanned without findings."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
@@ -212,7 +445,7 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="FAILED",
@@ -239,7 +472,10 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("with scan status FAILED", result[0].status_extended)
assert (
result[0].status_extended
== f"ECR repository {repository_name} with scan status FAILED."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
@@ -259,7 +495,7 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
policy=repo_policy_public,
images_details=[
ImageDetails(
latest_tag="test-tag",
latest_tag=latest_tag,
latest_digest="test-digest",
image_pushed_at=datetime(2023, 1, 1),
scan_findings_status="",
@@ -286,6 +522,9 @@ class Test_ecr_repositories_scan_vulnerabilities_in_latest_image:
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert search("without a scan", result[0].status_extended)
assert (
result[0].status_extended
== f"ECR repository {repository_name} has imageTag {latest_tag} without a scan."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn