mirror of
https://github.com/prowler-cloud/prowler.git
synced 2026-07-04 19:21:51 +00:00
bcaa6ac488
Co-authored-by: Rubén De la Torre Vico <ruben@prowler.com> Co-authored-by: Daniel Barranquero <danielbo2001@gmail.com>
1215 lines
47 KiB
Python
1215 lines
47 KiB
Python
import json
|
|
import os
|
|
import pathlib
|
|
from importlib.machinery import FileFinder
|
|
from logging import ERROR
|
|
from pkgutil import ModuleInfo
|
|
from unittest import mock
|
|
|
|
from boto3 import client
|
|
from mock import Mock, patch
|
|
from moto import mock_aws
|
|
|
|
from prowler.lib.check.check import (
|
|
exclude_checks_to_run,
|
|
exclude_services_to_run,
|
|
execute,
|
|
execute_checks,
|
|
list_categories,
|
|
list_checks_json,
|
|
list_services,
|
|
load_custom_checks_metadata,
|
|
parse_checks_from_file,
|
|
parse_checks_from_folder,
|
|
remove_custom_checks_module,
|
|
update_audit_metadata,
|
|
)
|
|
from prowler.lib.check.models import CheckMetadata, load_check_metadata
|
|
from prowler.lib.check.utils import (
|
|
list_modules,
|
|
recover_checks_from_provider,
|
|
recover_checks_from_service,
|
|
)
|
|
from prowler.providers.aws.aws_provider import AwsProvider
|
|
from prowler.providers.aws.services.accessanalyzer.accessanalyzer_service import (
|
|
Analyzer,
|
|
)
|
|
from tests.lib.check.fixtures.bulk_checks_metadata import test_bulk_checks_metadata
|
|
from tests.providers.aws.utils import (
|
|
AWS_ACCOUNT_ARN,
|
|
AWS_ACCOUNT_NUMBER,
|
|
AWS_REGION_US_EAST_1,
|
|
set_mocked_aws_provider,
|
|
)
|
|
|
|
# AWS_ACCOUNT_NUMBER = "123456789012"
|
|
# AWS_REGION = "us-east-1"
|
|
|
|
expected_packages = [
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_minimum_tls_version_12"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_minimum_tls_version_12.storage_ensure_minimum_tls_version_12",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_key_rotation_90_days"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_key_rotation_90_days.storage_key_rotation_90_days",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_private_endpoints_in_storage_accounts"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_private_endpoints_in_storage_accounts.storage_ensure_private_endpoints_in_storage_accounts",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_soft_delete_is_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_soft_delete_is_enabled.storage_ensure_soft_delete_is_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder("/root_dir/prowler/providers/azure/services/storage"),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_encryption_with_customer_managed_keys",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_encryption_with_customer_managed_keys"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_encryption_with_customer_managed_keys.storage_ensure_encryption_with_customer_managed_keys",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encrypted_with_cmk",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_tde_encrypted_with_cmk"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encrypted_with_cmk.sqlserver_tde_encrypted_with_cmk",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encryption_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_tde_encryption_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encryption_enabled.sqlserver_tde_encryption_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_auditing_retention_90_days",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_auditing_retention_90_days"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_auditing_retention_90_days.sqlserver_auditing_retention_90_days",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_vulnerability_assessment_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_vulnerability_assessment_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_vulnerability_assessment_enabled.sqlserver_vulnerability_assessment_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_periodic_recurring_scans_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_periodic_recurring_scans_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_periodic_recurring_scans_enabled.sqlserver_va_periodic_recurring_scans_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_scan_reports_configured",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_scan_reports_configured"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_scan_reports_configured.sqlserver_va_scan_reports_configured",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_emails_notifications_admins_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_emails_notifications_admins_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_emails_notifications_admins_enabled.sqlserver_va_emails_notifications_admins_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/postgresql"
|
|
),
|
|
name="prowler.providers.azure.services.postgresql.postgresql_flexible_server_enforce_ssl_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/postgresql/postgresql_flexible_server_enforce_ssl_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.postgresql.postgresql_flexible_server_enforce_ssl_enabled.postgresql_flexible_server_enforce_ssl_enabled",
|
|
ispkg=False,
|
|
),
|
|
]
|
|
|
|
|
|
def mock_walk_packages(*_):
|
|
return expected_packages
|
|
|
|
|
|
def mock_list_modules(*_):
|
|
modules = [
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_minimum_tls_version_12"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_minimum_tls_version_12.storage_ensure_minimum_tls_version_12",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_key_rotation_90_days"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_key_rotation_90_days.storage_key_rotation_90_days",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_private_endpoints_in_storage_accounts"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_private_endpoints_in_storage_accounts.storage_ensure_private_endpoints_in_storage_accounts",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_soft_delete_is_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_soft_delete_is_enabled.storage_ensure_soft_delete_is_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_encryption_with_customer_managed_keys",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_encryption_with_customer_managed_keys"
|
|
),
|
|
name="prowler.providers.azure.services.storage.storage_ensure_encryption_with_customer_managed_keys.storage_ensure_encryption_with_customer_managed_keys",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encrypted_with_cmk",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_tde_encrypted_with_cmk"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encrypted_with_cmk.sqlserver_tde_encrypted_with_cmk",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encryption_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_tde_encryption_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_tde_encryption_enabled.sqlserver_tde_encryption_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_auditing_retention_90_days",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_auditing_retention_90_days"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_auditing_retention_90_days.sqlserver_auditing_retention_90_days",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_vulnerability_assessment_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_vulnerability_assessment_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_vulnerability_assessment_enabled.sqlserver_vulnerability_assessment_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_periodic_recurring_scans_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_periodic_recurring_scans_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_periodic_recurring_scans_enabled.sqlserver_va_periodic_recurring_scans_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_scan_reports_configured",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_scan_reports_configured"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_scan_reports_configured.sqlserver_va_scan_reports_configured",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_emails_notifications_admins_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_emails_notifications_admins_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.sqlserver.sqlserver_va_emails_notifications_admins_enabled.sqlserver_va_emails_notifications_admins_enabled",
|
|
ispkg=False,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/postgresql"
|
|
),
|
|
name="prowler.providers.azure.services.postgresql.postgresql_flexible_server_enforce_ssl_enabled",
|
|
ispkg=True,
|
|
),
|
|
ModuleInfo(
|
|
module_finder=FileFinder(
|
|
"/root_dir/prowler/providers/azure/services/postgresql/postgresql_flexible_server_enforce_ssl_enabled"
|
|
),
|
|
name="prowler.providers.azure.services.postgresql.postgresql_flexible_server_enforce_ssl_enabled.postgresql_flexible_server_enforce_ssl_enabled",
|
|
ispkg=False,
|
|
),
|
|
]
|
|
return modules
|
|
|
|
|
|
def mock_recover_checks_from_azure_provider(*_):
|
|
return [
|
|
(
|
|
"defender_ensure_defender_for_app_services_is_on",
|
|
"/root_dir/fake_path/defender/defender_ensure_defender_for_app_services_is_on",
|
|
),
|
|
(
|
|
"iam_subscription_roles_owner_custom_not_created",
|
|
"/root_dir/fake_path/iam/iam_subscription_roles_owner_custom_not_created",
|
|
),
|
|
(
|
|
"iam_custom_role_has_permissions_to_administer_resource_locks",
|
|
"/root_dir/fake_path/iam/iam_custom_role_has_permissions_to_administer_resource_locks",
|
|
),
|
|
(
|
|
"storage_default_network_access_rule_is_denied",
|
|
"/root_dir/fake_path/storage/storage_default_network_access_rule_is_denied",
|
|
),
|
|
]
|
|
|
|
|
|
def mock_recover_checks_from_aws_provider(*_):
|
|
return [
|
|
(
|
|
"accessanalyzer_enabled_without_findings",
|
|
"/root_dir/fake_path/accessanalyzer/accessanalyzer_enabled_without_findings",
|
|
),
|
|
(
|
|
"awslambda_function_url_cors_policy",
|
|
"/root_dir/fake_path/awslambda/awslambda_function_url_cors_policy",
|
|
),
|
|
(
|
|
"ec2_securitygroup_allow_ingress_from_internet_to_any_port",
|
|
"/root_dir/fake_path/ec2/ec2_securitygroup_allow_ingress_from_internet_to_any_port",
|
|
),
|
|
]
|
|
|
|
|
|
class TestCheck:
|
|
def test_load_check_metadata(self):
|
|
test_cases = [
|
|
{
|
|
"input": {
|
|
"metadata_path": f"{os.path.dirname(os.path.realpath(__file__))}/fixtures/metadata.json",
|
|
},
|
|
"expected": {
|
|
"CheckID": "iam_user_accesskey_unused",
|
|
"CheckTitle": "Access Keys unused should be disabled",
|
|
"ServiceName": "iam",
|
|
"Severity": "low",
|
|
},
|
|
}
|
|
]
|
|
for test in test_cases:
|
|
metadata_path = test["input"]["metadata_path"]
|
|
check_metadata = load_check_metadata(metadata_path)
|
|
assert check_metadata.CheckID == test["expected"]["CheckID"]
|
|
assert check_metadata.CheckTitle == test["expected"]["CheckTitle"]
|
|
assert check_metadata.ServiceName == test["expected"]["ServiceName"]
|
|
assert check_metadata.Severity == test["expected"]["Severity"]
|
|
|
|
def test_parse_checks_from_file(self):
|
|
test_cases = [
|
|
{
|
|
"input": {
|
|
"path": f"{pathlib.Path().absolute()}/tests/lib/check/fixtures/checklistA.json",
|
|
"provider": "aws",
|
|
},
|
|
"expected": {"check11", "check12", "check7777"},
|
|
}
|
|
]
|
|
for test in test_cases:
|
|
check_file = test["input"]["path"]
|
|
provider = test["input"]["provider"]
|
|
assert parse_checks_from_file(check_file, provider) == test["expected"]
|
|
|
|
@mock_aws
|
|
def test_parse_checks_from_folder(self):
|
|
test_checks_folder = (
|
|
f"{pathlib.Path().absolute()}/tests/lib/check/fixtures/checks_folder"
|
|
)
|
|
# Create bucket and upload checks folder
|
|
s3_client = client("s3", region_name=AWS_REGION_US_EAST_1)
|
|
s3_client.create_bucket(Bucket="test")
|
|
# Iterate through the files in the folder and upload each one
|
|
for subdir, _, files in os.walk(test_checks_folder):
|
|
for file in files:
|
|
check = subdir.split("/")[-1]
|
|
full_path = os.path.join(subdir, file)
|
|
with open(full_path, "rb") as data:
|
|
s3_client.upload_fileobj(
|
|
data, "test", f"checks_folder/{check}/{file}"
|
|
)
|
|
test_cases = [
|
|
{
|
|
"input": {
|
|
"path": test_checks_folder,
|
|
"provider": "aws",
|
|
},
|
|
"expected": {"check11", "check12", "check7777"},
|
|
},
|
|
{
|
|
"input": {
|
|
"path": "s3://test/checks_folder/",
|
|
"provider": "aws",
|
|
},
|
|
"expected": {"check11", "check12", "check7777"},
|
|
},
|
|
]
|
|
|
|
aws_provider = AwsProvider()
|
|
for test in test_cases:
|
|
check_folder = test["input"]["path"]
|
|
provider = test["input"]["provider"]
|
|
assert (
|
|
parse_checks_from_folder(aws_provider, check_folder) == test["expected"]
|
|
)
|
|
remove_custom_checks_module(check_folder, provider)
|
|
|
|
def test_load_custom_checks_metadata(self, tmp_path):
|
|
"""Test loading check metadata from a custom checks folder."""
|
|
check_name = "custom_test_check"
|
|
check_folder = tmp_path / check_name
|
|
check_folder.mkdir()
|
|
|
|
metadata = {
|
|
"Provider": "aws",
|
|
"CheckID": check_name,
|
|
"CheckTitle": "Test Custom Check",
|
|
"CheckType": [],
|
|
"ServiceName": "custom",
|
|
"SubServiceName": "",
|
|
"ResourceIdTemplate": "arn:aws:custom:::resource",
|
|
"Severity": "low",
|
|
"ResourceType": "AwsCustomResource",
|
|
"Description": "A test custom check",
|
|
"Risk": "Test risk",
|
|
"RelatedUrl": "",
|
|
"Remediation": {
|
|
"Code": {"CLI": "", "NativeIaC": "", "Other": "", "Terraform": ""},
|
|
"Recommendation": {"Text": "", "Url": ""},
|
|
},
|
|
"Categories": [],
|
|
"DependsOn": [],
|
|
"RelatedTo": [],
|
|
"Notes": "",
|
|
}
|
|
metadata_file = check_folder / f"{check_name}.metadata.json"
|
|
metadata_file.write_text(json.dumps(metadata))
|
|
|
|
result = load_custom_checks_metadata(str(tmp_path))
|
|
|
|
assert check_name in result
|
|
assert result[check_name].CheckID == check_name
|
|
assert result[check_name].Provider == "aws"
|
|
assert result[check_name].Severity == "low"
|
|
|
|
def test_load_custom_checks_metadata_nonexistent_path(self):
|
|
"""Test that nonexistent paths return empty dict."""
|
|
result = load_custom_checks_metadata("/nonexistent/path/to/checks")
|
|
assert result == {}
|
|
|
|
def test_exclude_checks_to_run(self):
|
|
test_cases = [
|
|
{
|
|
"input": {
|
|
"check_list": {"check12", "check11", "extra72", "check13"},
|
|
"excluded_checks": {"check12", "check13"},
|
|
},
|
|
"expected": {"check11", "extra72"},
|
|
},
|
|
{
|
|
"input": {
|
|
"check_list": {"check112", "check11", "extra72", "check13"},
|
|
"excluded_checks": {"check12", "check13", "check14"},
|
|
},
|
|
"expected": {"check112", "check11", "extra72"},
|
|
},
|
|
]
|
|
for test in test_cases:
|
|
check_list = test["input"]["check_list"]
|
|
excluded_checks = test["input"]["excluded_checks"]
|
|
assert (
|
|
exclude_checks_to_run(check_list, excluded_checks) == test["expected"]
|
|
)
|
|
|
|
def test_exclude_services_to_run(self):
|
|
test_cases = [
|
|
{
|
|
"input": {
|
|
"checks_to_run": {
|
|
"iam_user_console_access_unused",
|
|
"iam_user_accesskey_unused",
|
|
},
|
|
"excluded_services": {"ec2"},
|
|
"provider": "aws",
|
|
},
|
|
"expected": {
|
|
"iam_user_console_access_unused",
|
|
"iam_user_accesskey_unused",
|
|
},
|
|
},
|
|
{
|
|
"input": {
|
|
"checks_to_run": {
|
|
"iam_user_console_access_unused",
|
|
"iam_user_accesskey_unused",
|
|
},
|
|
"excluded_services": {"iam"},
|
|
"provider": "aws",
|
|
},
|
|
"expected": set(),
|
|
},
|
|
]
|
|
for test in test_cases:
|
|
excluded_services = test["input"]["excluded_services"]
|
|
checks_to_run = test["input"]["checks_to_run"]
|
|
provider = test["input"]["provider"]
|
|
assert (
|
|
exclude_services_to_run(checks_to_run, excluded_services, provider)
|
|
== test["expected"]
|
|
)
|
|
|
|
@patch(
|
|
"prowler.lib.check.check.recover_checks_from_provider",
|
|
new=mock_recover_checks_from_azure_provider,
|
|
)
|
|
def test_list_azure_services(self):
|
|
provider = "azure"
|
|
expected_services = {"defender", "iam", "storage"}
|
|
listed_services = list_services(provider)
|
|
assert listed_services == sorted(expected_services)
|
|
|
|
@patch(
|
|
"prowler.lib.check.check.recover_checks_from_provider",
|
|
new=mock_recover_checks_from_aws_provider,
|
|
)
|
|
def test_list_aws_services(self):
|
|
provider = "azure"
|
|
expected_services = {"accessanalyzer", "awslambda", "ec2"}
|
|
listed_services = list_services(provider)
|
|
assert listed_services == sorted(expected_services)
|
|
|
|
def test_list_categories(self):
|
|
expected_categories = {
|
|
"secrets",
|
|
"forensics-ready",
|
|
"encryption",
|
|
"internet-exposed",
|
|
"trust-boundaries",
|
|
}
|
|
listed_categories = list_categories(test_bulk_checks_metadata)
|
|
assert listed_categories == expected_categories
|
|
|
|
@patch("prowler.lib.check.utils.list_modules", new=mock_list_modules)
|
|
def test_recover_checks_from_provider(self):
|
|
provider = "azure"
|
|
service = "storage"
|
|
expected_checks = [
|
|
(
|
|
"storage_ensure_minimum_tls_version_12",
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_minimum_tls_version_12",
|
|
),
|
|
(
|
|
"storage_key_rotation_90_days",
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_key_rotation_90_days",
|
|
),
|
|
(
|
|
"storage_ensure_private_endpoints_in_storage_accounts",
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_private_endpoints_in_storage_accounts",
|
|
),
|
|
(
|
|
"storage_ensure_soft_delete_is_enabled",
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_soft_delete_is_enabled",
|
|
),
|
|
(
|
|
"storage_ensure_encryption_with_customer_managed_keys",
|
|
"/root_dir/prowler/providers/azure/services/storage/storage_ensure_encryption_with_customer_managed_keys",
|
|
),
|
|
(
|
|
"sqlserver_tde_encrypted_with_cmk",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_tde_encrypted_with_cmk",
|
|
),
|
|
(
|
|
"sqlserver_tde_encryption_enabled",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_tde_encryption_enabled",
|
|
),
|
|
(
|
|
"sqlserver_auditing_retention_90_days",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_auditing_retention_90_days",
|
|
),
|
|
(
|
|
"sqlserver_vulnerability_assessment_enabled",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_vulnerability_assessment_enabled",
|
|
),
|
|
(
|
|
"sqlserver_va_periodic_recurring_scans_enabled",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_periodic_recurring_scans_enabled",
|
|
),
|
|
(
|
|
"sqlserver_va_scan_reports_configured",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_scan_reports_configured",
|
|
),
|
|
(
|
|
"sqlserver_va_emails_notifications_admins_enabled",
|
|
"/root_dir/prowler/providers/azure/services/sqlserver/sqlserver_va_emails_notifications_admins_enabled",
|
|
),
|
|
(
|
|
"postgresql_flexible_server_enforce_ssl_enabled",
|
|
"/root_dir/prowler/providers/azure/services/postgresql/postgresql_flexible_server_enforce_ssl_enabled",
|
|
),
|
|
]
|
|
returned_checks = recover_checks_from_provider(provider, service)
|
|
assert returned_checks == expected_checks
|
|
|
|
@patch("prowler.lib.check.utils.walk_packages", new=mock_walk_packages)
|
|
def test_list_modules(self):
|
|
provider = "azure"
|
|
service = "storage"
|
|
expected_modules = list_modules(provider, service)
|
|
assert expected_modules == expected_packages
|
|
|
|
@patch(
|
|
"prowler.lib.check.utils.recover_checks_from_provider",
|
|
new=mock_recover_checks_from_aws_provider,
|
|
)
|
|
def test_recover_checks_from_service(self):
|
|
service_list = ["accessanalyzer", "awslambda", "ec2"]
|
|
provider = "aws"
|
|
expected_checks = {
|
|
"accessanalyzer_enabled_without_findings",
|
|
"awslambda_function_url_cors_policy",
|
|
"ec2_securitygroup_allow_ingress_from_internet_to_any_port",
|
|
}
|
|
recovered_checks = recover_checks_from_service(service_list, provider)
|
|
assert recovered_checks == expected_checks
|
|
|
|
# def test_parse_checks_from_compliance_framework_two(self):
|
|
# test_case = {
|
|
# "input": {"compliance_frameworks": ["cis_v1.4_aws", "ens_v3_aws"]},
|
|
# "expected": {
|
|
# "vpc_flow_logs_enabled",
|
|
# "ec2_ebs_snapshot_encryption",
|
|
# "iam_user_mfa_enabled_console_access",
|
|
# "cloudtrail_multi_region_enabled",
|
|
# "ec2_elbv2_insecure_ssl_ciphers",
|
|
# "guardduty_is_enabled",
|
|
# "s3_bucket_default_encryption",
|
|
# "cloudfront_distributions_https_enabled",
|
|
# "iam_avoid_root_usage",
|
|
# "s3_bucket_secure_transport_policy",
|
|
# },
|
|
# }
|
|
# with mock.patch(
|
|
# "prowler.lib.check.check.compliance_specification_dir_path",
|
|
# new=f"{pathlib.Path().absolute()}/fixtures",
|
|
# ):
|
|
# provider = "aws"
|
|
# bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
|
|
# compliance_frameworks = test_case["input"]["compliance_frameworks"]
|
|
# assert (
|
|
# parse_checks_from_compliance_framework(
|
|
# compliance_frameworks, bulk_compliance_frameworks
|
|
# )
|
|
# == test_case["expected"]
|
|
# )
|
|
|
|
# def test_parse_checks_from_compliance_framework_one(self):
|
|
# test_case = {
|
|
# "input": {"compliance_frameworks": ["cis_v1.4_aws"]},
|
|
# "expected": {
|
|
# "iam_user_mfa_enabled_console_access",
|
|
# "s3_bucket_default_encryption",
|
|
# "iam_avoid_root_usage",
|
|
# },
|
|
# }
|
|
# with mock.patch(
|
|
# "prowler.lib.check.check.compliance_specification_dir",
|
|
# new=f"{pathlib.Path().absolute()}/fixtures",
|
|
# ):
|
|
# provider = "aws"
|
|
# bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
|
|
# compliance_frameworks = test_case["input"]["compliance_frameworks"]
|
|
# assert (
|
|
# parse_checks_from_compliance_framework(
|
|
# compliance_frameworks, bulk_compliance_frameworks
|
|
# )
|
|
# == test_case["expected"]
|
|
# )
|
|
|
|
# def test_parse_checks_from_compliance_framework_no_compliance(self):
|
|
# test_case = {
|
|
# "input": {"compliance_frameworks": []},
|
|
# "expected": set(),
|
|
# }
|
|
# with mock.patch(
|
|
# "prowler.lib.check.check.compliance_specification_dir",
|
|
# new=f"{pathlib.Path().absolute()}/fixtures",
|
|
# ):
|
|
# provider = "aws"
|
|
# bulk_compliance_frameworks = bulk_load_compliance_frameworks(provider)
|
|
# compliance_frameworks = test_case["input"]["compliance_frameworks"]
|
|
# assert (
|
|
# parse_checks_from_compliance_framework(
|
|
# compliance_frameworks, bulk_compliance_frameworks
|
|
# )
|
|
# == test_case["expected"]
|
|
# )
|
|
|
|
def test_update_audit_metadata_complete(self):
|
|
from prowler.providers.common.models import Audit_Metadata
|
|
|
|
# Set the expected checks to run
|
|
expected_checks = ["iam_administrator_access_with_mfa"]
|
|
services_executed = {"iam"}
|
|
checks_executed = {"iam_administrator_access_with_mfa"}
|
|
|
|
# Set an empty Audit_Metadata
|
|
audit_metadata = Audit_Metadata(
|
|
services_scanned=0,
|
|
expected_checks=expected_checks,
|
|
completed_checks=0,
|
|
audit_progress=0,
|
|
)
|
|
|
|
audit_metadata = update_audit_metadata(
|
|
audit_metadata, services_executed, checks_executed
|
|
)
|
|
|
|
assert audit_metadata.audit_progress == float(100)
|
|
assert audit_metadata.services_scanned == 1
|
|
assert audit_metadata.expected_checks == expected_checks
|
|
assert audit_metadata.completed_checks == 1
|
|
|
|
def test_update_audit_metadata_50(self):
|
|
from prowler.providers.common.models import Audit_Metadata
|
|
|
|
# Set the expected checks to run
|
|
expected_checks = [
|
|
"iam_administrator_access_with_mfa",
|
|
"iam_support_role_created",
|
|
]
|
|
services_executed = {"iam"}
|
|
checks_executed = {"iam_administrator_access_with_mfa"}
|
|
|
|
# Set an empty Audit_Metadata
|
|
audit_metadata = Audit_Metadata(
|
|
services_scanned=0,
|
|
expected_checks=expected_checks,
|
|
completed_checks=0,
|
|
audit_progress=0,
|
|
)
|
|
|
|
audit_metadata = update_audit_metadata(
|
|
audit_metadata, services_executed, checks_executed
|
|
)
|
|
|
|
assert audit_metadata.audit_progress == float(50)
|
|
assert audit_metadata.services_scanned == 1
|
|
assert audit_metadata.expected_checks == expected_checks
|
|
assert audit_metadata.completed_checks == 1
|
|
|
|
def test_list_checks_json_aws_lambda_and_s3(self):
|
|
provider = "aws"
|
|
check_list = {
|
|
"awslambda_function_invoke_api_operations_cloudtrail_logging_enabled",
|
|
"awslambda_function_no_secrets_in_code",
|
|
"awslambda_function_no_secrets_in_variables",
|
|
"awslambda_function_not_publicly_accessible",
|
|
"awslambda_function_url_cors_policy",
|
|
"awslambda_function_url_public",
|
|
"awslambda_function_using_supported_runtimes",
|
|
}
|
|
checks_json = list_checks_json(provider, sorted(check_list))
|
|
assert (
|
|
checks_json
|
|
== '{\n "aws": [\n "awslambda_function_invoke_api_operations_cloudtrail_logging_enabled",\n "awslambda_function_no_secrets_in_code",\n "awslambda_function_no_secrets_in_variables",\n "awslambda_function_not_publicly_accessible",\n "awslambda_function_url_cors_policy",\n "awslambda_function_url_public",\n "awslambda_function_using_supported_runtimes"\n ]\n}'
|
|
)
|
|
|
|
def test_execute(self):
|
|
accessanalyzer_client = mock.MagicMock
|
|
accessanalyzer_client.region = AWS_REGION_US_EAST_1
|
|
accessanalyzer_client.analyzers = [
|
|
Analyzer(
|
|
arn=AWS_ACCOUNT_ARN,
|
|
name=AWS_ACCOUNT_NUMBER,
|
|
status="NOT_AVAILABLE",
|
|
tags=[],
|
|
type="",
|
|
region=AWS_REGION_US_EAST_1,
|
|
)
|
|
]
|
|
finding = Mock()
|
|
finding.status = "PASS"
|
|
findings = [finding]
|
|
check = Mock()
|
|
check.CheckID = "test-check"
|
|
check.execute = Mock(return_value=findings)
|
|
|
|
with (
|
|
patch("prowler.lib.check.check.execute", return_value=findings),
|
|
patch(
|
|
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
|
|
accessanalyzer_client,
|
|
),
|
|
):
|
|
findings = execute(
|
|
check=check,
|
|
global_provider=set_mocked_aws_provider(
|
|
expected_checks=["accessanalyzer_enabled"]
|
|
),
|
|
custom_checks_metadata=None,
|
|
output_options=None,
|
|
)
|
|
assert len(findings) == 1
|
|
|
|
def test_execute_with_filtering_status(self):
|
|
accessanalyzer_client = mock.MagicMock
|
|
accessanalyzer_client.region = AWS_REGION_US_EAST_1
|
|
accessanalyzer_client.analyzers = [
|
|
Analyzer(
|
|
arn=AWS_ACCOUNT_ARN,
|
|
name=AWS_ACCOUNT_NUMBER,
|
|
status="NOT_AVAILABLE",
|
|
tags=[],
|
|
type="",
|
|
region=AWS_REGION_US_EAST_1,
|
|
)
|
|
]
|
|
status = ["PASS"]
|
|
check = mock.MagicMock()
|
|
check.CheckID = "accessanalyzer_enabled"
|
|
check.Status = "PASS"
|
|
output_options = mock.MagicMock()
|
|
output_options.status = status
|
|
with mock.patch(
|
|
"prowler.providers.aws.services.accessanalyzer.accessanalyzer_service.AccessAnalyzer",
|
|
accessanalyzer_client,
|
|
):
|
|
findings = execute(
|
|
check=check,
|
|
global_provider=set_mocked_aws_provider(
|
|
expected_checks=["accessanalyzer_enabled"]
|
|
),
|
|
custom_checks_metadata=None,
|
|
output_options=output_options,
|
|
)
|
|
assert len(findings) == 0
|
|
|
|
def test_aws_checks_metadata_is_valid(self):
|
|
# Check if the checkID in the metadata.json of the checks is correct
|
|
# Define the base directory for the checks
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__), "../../../", "prowler/providers/aws/services"
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_azure_checks_metadata_is_valid(self):
|
|
# Check if the checkID in the metadata.json of the checks is correct
|
|
# Define the base directory for the checks
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/azure/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_gcp_checks_metadata_is_valid(self):
|
|
# Check if the checkID in the metadata.json of the checks is correct
|
|
# Define the base directory for the checks
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__), "../../../", "prowler/providers/gcp/services"
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_kubernetes_checks_metadata_is_valid(self):
|
|
# Check if the checkID in the metadata.json of the checks is correct
|
|
# Define the base directory for the checks
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/kubernetes/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_alibabacloud_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/alibabacloud/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_cloudflare_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/cloudflare/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_github_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/github/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_googleworkspace_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/googleworkspace/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_m365_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/m365/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_mongodbatlas_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/mongodbatlas/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_nhn_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/nhn/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_openstack_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/openstack/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_oraclecloud_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/oraclecloud/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_vercel_checks_metadata_is_valid(self):
|
|
base_directory = os.path.abspath(
|
|
os.path.join(
|
|
os.path.dirname(__file__),
|
|
"../../../",
|
|
"prowler/providers/vercel/services",
|
|
)
|
|
)
|
|
self.verify_metadata_check_id(base_directory)
|
|
|
|
def test_vercel_checks_metadata_use_canonical_hub_urls(self):
|
|
base_directory = pathlib.Path(__file__).resolve().parents[3] / "prowler"
|
|
provider_path = base_directory / "providers" / "vercel" / "services"
|
|
|
|
invalid_urls = []
|
|
|
|
for metadata_file_path in provider_path.rglob("*.metadata.json"):
|
|
with metadata_file_path.open("r") as metadata_file:
|
|
data = json.load(metadata_file)
|
|
|
|
recommendation = data.get("Remediation", {}).get("Recommendation", {})
|
|
url = recommendation.get("Url", "")
|
|
|
|
if url.startswith("https://hub.prowler.com/checks/vercel/"):
|
|
invalid_urls.append(f"{metadata_file_path}: {url}")
|
|
|
|
assert not invalid_urls, "\n".join(invalid_urls)
|
|
|
|
def verify_metadata_check_id(self, provider_path):
|
|
errors = []
|
|
# Walk through the base directory to find all service directories
|
|
for root, dirs, _ in os.walk(provider_path):
|
|
# We only want to look at directories that are direct children of the base directory
|
|
if root == provider_path:
|
|
for service_dir in dirs:
|
|
service_path = os.path.join(root, service_dir)
|
|
|
|
# Walk through each service directory to find check directories
|
|
for check_root, check_dirs, _ in os.walk(service_path):
|
|
for check_dir in check_dirs:
|
|
check_directory = os.path.join(check_root, check_dir)
|
|
metadata_file_name = f"{check_dir}.metadata.json"
|
|
metadata_file_path = os.path.join(
|
|
check_directory, metadata_file_name
|
|
)
|
|
|
|
if os.path.isfile(metadata_file_path):
|
|
# Read the JSON file
|
|
with open(metadata_file_path, "r") as f:
|
|
data = json.load(f)
|
|
|
|
# Extract the CheckID field
|
|
check_id = data.get("CheckID", None)
|
|
|
|
# Compare CheckID to the check name
|
|
if check_id != check_dir:
|
|
errors.append(
|
|
f"CheckID in metadata does not match the check name in {check_directory}. Found CheckID: {check_id}"
|
|
)
|
|
|
|
# Validate metadata against Pydantic validators
|
|
try:
|
|
CheckMetadata.parse_file(metadata_file_path)
|
|
except Exception as e:
|
|
errors.append(
|
|
f"Metadata validation failed for {metadata_file_path}: {e}"
|
|
)
|
|
|
|
assert not errors, "\n\n".join(errors)
|
|
|
|
def test_execute_oraclecloud_mutelist_passes_tenancy_id(self):
|
|
"""Test that execute() passes tenancy_id to is_finding_muted for OCI provider."""
|
|
tenancy_id = "ocid1.tenancy.oc1..aaaaaaaexample"
|
|
|
|
finding = Mock()
|
|
finding.status = "PASS"
|
|
finding.muted = False
|
|
|
|
check = Mock()
|
|
check.CheckID = "oci_test_check"
|
|
check.execute = Mock(return_value=[finding])
|
|
|
|
provider = mock.MagicMock()
|
|
provider.type = "oraclecloud"
|
|
provider.identity.tenancy_id = tenancy_id
|
|
provider.mutelist.mutelist = {"Accounts": {tenancy_id: {}}}
|
|
provider.mutelist.is_finding_muted = Mock(return_value=True)
|
|
|
|
findings = execute(
|
|
check=check,
|
|
global_provider=provider,
|
|
custom_checks_metadata=None,
|
|
output_options=None,
|
|
)
|
|
|
|
provider.mutelist.is_finding_muted.assert_called_once_with(
|
|
tenancy_id=tenancy_id,
|
|
finding=finding,
|
|
)
|
|
assert findings[0].muted is True
|
|
|
|
def test_execute_azure_mutelist_passes_subscription_id_and_name(self):
|
|
"""Test that execute() passes Azure subscription ID and display name."""
|
|
subscription_id = "12345678-1234-1234-1234-123456789012"
|
|
subscription_name = "subscription_1"
|
|
|
|
finding = Mock()
|
|
finding.status = "PASS"
|
|
finding.muted = False
|
|
finding.subscription = subscription_id
|
|
|
|
check = Mock()
|
|
check.CheckID = "azure_test_check"
|
|
check.execute = Mock(return_value=[finding])
|
|
|
|
provider = mock.MagicMock()
|
|
provider.type = "azure"
|
|
provider.identity.subscriptions = {subscription_id: subscription_name}
|
|
provider.mutelist.mutelist = {"Accounts": {subscription_name: {}}}
|
|
provider.mutelist.is_finding_muted = Mock(return_value=True)
|
|
|
|
findings = execute(
|
|
check=check,
|
|
global_provider=provider,
|
|
custom_checks_metadata=None,
|
|
output_options=None,
|
|
)
|
|
|
|
provider.mutelist.is_finding_muted.assert_called_once_with(
|
|
subscription_id=subscription_id,
|
|
subscription_name=subscription_name,
|
|
finding=finding,
|
|
)
|
|
assert findings[0].muted is True
|
|
|
|
def test_execute_check_exception_only_logs(self, caplog):
|
|
caplog.set_level(ERROR)
|
|
|
|
findings = []
|
|
check = Mock()
|
|
checks = ["test-check"]
|
|
|
|
provider = mock.MagicMock()
|
|
provider.type = "aws"
|
|
|
|
output_options = mock.MagicMock()
|
|
output_options.only_logs = True
|
|
error = Exception()
|
|
check.execute = Mock(side_effect=error)
|
|
|
|
with patch("prowler.lib.check.check.execute", return_value=findings):
|
|
assert (
|
|
execute_checks(
|
|
checks,
|
|
provider,
|
|
custom_checks_metadata=None,
|
|
config_file=None,
|
|
output_options=output_options,
|
|
)
|
|
== findings
|
|
)
|
|
assert caplog.record_tuples == [
|
|
("root", 40, f"Check '{checks[0]}' was not found for the AWS provider")
|
|
]
|