mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
fix(checks): ensure CheckID is correct in check's metadata (#4522)
Co-authored-by: Pepe Fagoaga <pepe@prowler.com>
This commit is contained in:
@@ -63,7 +63,7 @@ It contains hundreds of controls covering CIS, NIST 800, NIST CSF, CISA, RBI, Fe
|
||||
|
||||
| Provider | Checks | Services | [Compliance Frameworks](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/compliance/) | [Categories](https://docs.prowler.com/projects/prowler-open-source/en/latest/tutorials/misc/#categories) |
|
||||
|---|---|---|---|---|
|
||||
| AWS | 383 | 67 -> `prowler aws --list-services` | 28 -> `prowler aws --list-compliance` | 7 -> `prowler aws --list-categories` |
|
||||
| AWS | 385 | 67 -> `prowler aws --list-services` | 28 -> `prowler aws --list-compliance` | 7 -> `prowler aws --list-categories` |
|
||||
| GCP | 77 | 13 -> `prowler gcp --list-services` | 1 -> `prowler gcp --list-compliance` | 2 -> `prowler gcp --list-categories`|
|
||||
| Azure | 135 | 16 -> `prowler azure --list-services` | 2 -> `prowler azure --list-compliance` | 2 -> `prowler azure --list-categories` |
|
||||
| Kubernetes | 83 | 7 -> `prowler kubernetes --list-services` | 1 -> `prowler kubernetes --list-compliance` | 7 -> `prowler kubernetes --list-categories` |
|
||||
|
||||
@@ -96,6 +96,8 @@ class Check(ABC, Check_Metadata_Model):
|
||||
data = Check_Metadata_Model.parse_file(metadata_file).dict()
|
||||
# Calls parents init function
|
||||
super().__init__(**data)
|
||||
# TODO: verify that the CheckID is the same as the filename and classname
|
||||
# to mimic the test done at test_<provider>_checks_metadata_is_valid
|
||||
|
||||
def metadata(self) -> dict:
|
||||
"""Return the JSON representation of the check's metadata"""
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "documentdb_cluster_backup_retention",
|
||||
"CheckID": "documentdb_cluster_backup_enabled",
|
||||
"CheckTitle": "Check if DocumentDB Clusters have backup enabled.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "DocumentDB",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "documentdb_cluster_deletion_protection_enabled",
|
||||
"CheckID": "documentdb_cluster_deletion_protection",
|
||||
"CheckTitle": "Check if DocumentDB Clusters has deletion protection enabled.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "documentdb",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "ec2_instance_port_cassandra_exposed_to_internet",
|
||||
"CheckID": "ec2_instance_port_elasticsearch_kibana_exposed_to_internet",
|
||||
"CheckTitle": "Ensure no EC2 instances allow ingress from the internet to Elasticsearch and Kibana ports (TCP 9200, 9300, 5601).",
|
||||
"CheckType": [
|
||||
"Infrastructure Security"
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "ec2_instance_port_kafka_exposed_to_internet",
|
||||
"CheckID": "ec2_instance_port_memcached_exposed_to_internet",
|
||||
"CheckTitle": "Ensure no EC2 instances allow ingress from the internet to TCP port 11211 (Memcached).",
|
||||
"CheckType": [
|
||||
"Infrastructure Security"
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "ec2_instance_port_ssh_exposed_to_internet",
|
||||
"CheckID": "ec2_instance_port_mysql_exposed_to_internet",
|
||||
"CheckTitle": "Ensure no EC2 instances allow ingress from the internet to TCP port 3306 (MySQL).",
|
||||
"CheckType": [
|
||||
"Infrastructure Security"
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "neptune_cluster_backup_retention",
|
||||
"CheckID": "neptune_cluster_backup_enabled",
|
||||
"CheckTitle": "Check for Neptune Clusters Backup Retention Period.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "neptune",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "neptune_cluster_deletion_protection_enabled",
|
||||
"CheckID": "neptune_cluster_deletion_protection",
|
||||
"CheckTitle": "Check if Neptune Clusters storage has deletion protection enabled.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "neptune",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "neptune_cluster_multi_az_enabled",
|
||||
"CheckID": "neptune_cluster_multi_az",
|
||||
"CheckTitle": "Check if Neptune Clusters have multi-AZ enabled.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "neptune",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "aws",
|
||||
"CheckID": "neptune_cluster_encrypted",
|
||||
"CheckID": "neptune_cluster_storage_encrypted",
|
||||
"CheckTitle": "Check if Neptune Clusters storage is encrypted at rest.",
|
||||
"CheckType": [],
|
||||
"ServiceName": "neptune",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "azure",
|
||||
"CheckID": "monitor_alert_create_update_policy_assignment",
|
||||
"CheckID": "monitor_alert_create_update_nsg",
|
||||
"CheckTitle": "Ensure that Activity Log Alert exists for Create or Update Network Security Group",
|
||||
"CheckType": [],
|
||||
"ServiceName": "monitor",
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"Provider": "azure",
|
||||
"CheckID": "monitor_storage_container_with_activity_logs_is_private",
|
||||
"CheckID": "monitor_storage_account_with_activity_logs_is_private",
|
||||
"CheckTitle": "Ensure the Storage Container Storing the Activity Logs is not Publicly Accessible",
|
||||
"CheckType": [],
|
||||
"ServiceName": "monitor",
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import traceback
|
||||
@@ -8,7 +9,6 @@ from pkgutil import ModuleInfo
|
||||
|
||||
from boto3 import client
|
||||
from colorama import Fore, Style
|
||||
from fixtures.bulk_checks_metadata import test_bulk_checks_metadata
|
||||
from mock import Mock, patch
|
||||
from moto import mock_aws
|
||||
|
||||
@@ -29,6 +29,7 @@ from prowler.lib.check.check import (
|
||||
)
|
||||
from prowler.lib.check.models import load_check_metadata
|
||||
from prowler.providers.aws.aws_provider import AwsProvider
|
||||
from tests.lib.check.fixtures.bulk_checks_metadata import test_bulk_checks_metadata
|
||||
from tests.providers.aws.utils import AWS_REGION_US_EAST_1
|
||||
|
||||
# AWS_ACCOUNT_NUMBER = "123456789012"
|
||||
@@ -876,3 +877,77 @@ class TestCheck:
|
||||
capsys.readouterr().out
|
||||
== f"Something went wrong in {check.CheckID}, please use --log-level ERROR\n"
|
||||
)
|
||||
|
||||
def test_aws_checks_metadata_is_valid(self):
|
||||
# Check if the checkID in the metadata.json of the checks is correct
|
||||
# Define the base directory for the checks
|
||||
base_directory = os.path.abspath(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__), "../../../", "prowler/providers/aws/services"
|
||||
)
|
||||
)
|
||||
self.verify_metadata_check_id(base_directory)
|
||||
|
||||
def test_azure_checks_metadata_is_valid(self):
|
||||
# Check if the checkID in the metadata.json of the checks is correct
|
||||
# Define the base directory for the checks
|
||||
base_directory = os.path.abspath(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"../../../",
|
||||
"prowler/providers/azure/services",
|
||||
)
|
||||
)
|
||||
self.verify_metadata_check_id(base_directory)
|
||||
|
||||
def test_gcp_checks_metadata_is_valid(self):
|
||||
# Check if the checkID in the metadata.json of the checks is correct
|
||||
# Define the base directory for the checks
|
||||
base_directory = os.path.abspath(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__), "../../../", "prowler/providers/gcp/services"
|
||||
)
|
||||
)
|
||||
self.verify_metadata_check_id(base_directory)
|
||||
|
||||
def test_kubernetes_checks_metadata_is_valid(self):
|
||||
# Check if the checkID in the metadata.json of the checks is correct
|
||||
# Define the base directory for the checks
|
||||
base_directory = os.path.abspath(
|
||||
os.path.join(
|
||||
os.path.dirname(__file__),
|
||||
"../../../",
|
||||
"prowler/providers/kubernetes/services",
|
||||
)
|
||||
)
|
||||
self.verify_metadata_check_id(base_directory)
|
||||
|
||||
def verify_metadata_check_id(self, provider_path):
|
||||
# Walk through the base directory to find all service directories
|
||||
for root, dirs, _ in os.walk(provider_path):
|
||||
# We only want to look at directories that are direct children of the base directory
|
||||
if root == provider_path:
|
||||
for service_dir in dirs:
|
||||
service_path = os.path.join(root, service_dir)
|
||||
|
||||
# Walk through each service directory to find check directories
|
||||
for check_root, check_dirs, _ in os.walk(service_path):
|
||||
for check_dir in check_dirs:
|
||||
check_directory = os.path.join(check_root, check_dir)
|
||||
metadata_file_name = f"{check_dir}.metadata.json"
|
||||
metadata_file_path = os.path.join(
|
||||
check_directory, metadata_file_name
|
||||
)
|
||||
|
||||
if os.path.isfile(metadata_file_path):
|
||||
# Read the JSON file
|
||||
with open(metadata_file_path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
# Extract the CheckID field
|
||||
check_id = data.get("CheckID", None)
|
||||
|
||||
# Compare CheckID to the check name
|
||||
assert (
|
||||
check_id == check_dir
|
||||
), f"CheckID in metadata does not match the check name in {check_directory}. Found CheckID: {check_id}"
|
||||
|
||||
Reference in New Issue
Block a user