feat(prowler-check-kreator): ProwlerChecKreator first version (#5099)

Co-authored-by: Sergio <sergio@prowler.com>
This commit is contained in:
Rubén De la Torre Vico
2024-11-12 21:00:09 +01:00
committed by GitHub
parent 9d65b49cb4
commit b8b60e6bc5
9 changed files with 958 additions and 0 deletions

View File

@@ -0,0 +1,36 @@
# Prowler Check Kreator
???+ note
Currently, this tool is only available for creating checks for the AWS provider.
**Prowler Check Kreator** is a utility designed to streamline the creation of new checks for Prowler. This tool generates all necessary files required to add a new check to the Prowler repository. Specifically, it creates:
- A dedicated folder for the check.
- The main check script.
- A metadata file with essential details.
- A folder and file structure for testing the check.
## Usage
To use the tool, execute the main script with the following command:
```bash
python util/prowler_check_kreator/prowler_check_kreator.py <prowler_provider> <check_name>
```
Parameters:
- `<prowler_provider>`: Currently only AWS is supported.
- `<check_name>`: The name you wish to assign to the new check.
## AI integration
This tool optionally integrates AI to assist in generating the check code and metadata file content. When AI assistance is chosen, the tool uses [Gemini](https://gemini.google.com/) to produce preliminary code and metadata.
???+ note
For this feature to work, you must have the library `google-generativeai` installed in your Python environment.
???+ warning
AI-generated code and metadata might contain errors or require adjustments to align with specific Prowler requirements. Carefully review all AI-generated content before committing.
To enable AI assistance, simply confirm when prompted by the tool. Additionally, ensure that the `GEMINI_API_KEY` environment variable is set with a valid Gemini API key. For instructions on obtaining your API key, refer to the [Gemini documentation](https://ai.google.dev/gemini-api/docs/api-key).

View File

@@ -65,6 +65,7 @@ nav:
- Pentesting: tutorials/pentesting.md
- Parallel Execution: tutorials/parallel-execution.md
- Developer Guide: developer-guide/introduction.md
- Prowler Check Kreator: tutorials/prowler-check-kreator.md
- AWS:
- Authentication: tutorials/aws/authentication.md
- Assume Role: tutorials/aws/role-assumption.md

0
util/__init__.py Normal file
View File

View File

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,246 @@
def get_metadata_valid_check_type(provider: str = "aws") -> list:
"""Get the valid check types for the provider
Args:
provider: The Prowler provider.
Returns:
A list of valid check types for the given provider.
"""
check_types = []
if provider == "aws":
check_types = [
{
"namespace": "Software and Configuration Checks",
"children": [
{
"category": "Vulnerabilities",
"children": [{"classifier": "CVE"}],
},
{
"category": "AWS Security Best Practices",
"children": [
{"classifier": "Network Reachability"},
{"classifier": "Runtime Behavior Analysis"},
],
},
{
"category": "Industry and Regulatory Standards",
"children": [
{"classifier": "AWS Foundational Security Best Practices"},
{"classifier": "CIS Host Hardening Benchmarks"},
{"classifier": "CIS AWS Foundations Benchmark"},
{"classifier": "PCI-DSS"},
{"classifier": "Cloud Security Alliance Controls"},
{"classifier": "ISO 90001 Controls"},
{"classifier": "ISO 27001 Controls"},
{"classifier": "ISO 27017 Controls"},
{"classifier": "ISO 27018 Controls"},
{"classifier": "SOC 1"},
{"classifier": "SOC 2"},
{"classifier": "HIPAA Controls (USA)"},
{"classifier": "NIST 800-53 Controls (USA)"},
{"classifier": "NIST CSF Controls (USA)"},
{"classifier": "IRAP Controls (Australia)"},
{"classifier": "K-ISMS Controls (Korea)"},
{"classifier": "MTCS Controls (Singapore)"},
{"classifier": "FISC Controls (Japan)"},
{"classifier": "My Number Act Controls (Japan)"},
{"classifier": "ENS Controls (Spain)"},
{"classifier": "Cyber Essentials Plus Controls (UK)"},
{"classifier": "G-Cloud Controls (UK)"},
{"classifier": "C5 Controls (Germany)"},
{"classifier": "IT-Grundschutz Controls (Germany)"},
{"classifier": "GDPR Controls (Europe)"},
{"classifier": "TISAX Controls (Europe)"},
],
},
{"category": "Patch Management"},
],
},
{
"namespace": "TTPs",
"children": [
{"category": "Initial Access"},
{"category": "Execution"},
{"category": "Persistence"},
{"category": "Privilege Escalation"},
{"category": "Defense Evasion"},
{"category": "Credential Access"},
{"category": "Discovery"},
{"category": "Lateral Movement"},
{"category": "Collection"},
{"category": "Command and Control"},
],
},
{
"namespace": "Effects",
"children": [
{"category": "Data Exposure"},
{"category": "Data Exfiltration"},
{"category": "Data Destruction"},
{"category": "Denial of Service"},
{"category": "Resource Consumption"},
],
},
{
"namespace": "Unusual Behaviors",
"children": [
{"category": "Application"},
{"category": "Network Flow"},
{"category": "IP address"},
{"category": "User"},
{"category": "VM"},
{"category": "Container"},
{"category": "Serverless"},
{"category": "Process"},
{"category": "Database"},
{"category": "Data"},
],
},
{
"namespace": "Sensitive Data Identifications",
"children": [
{"category": "PII"},
{"category": "Passwords"},
{"category": "Legal"},
{"category": "Financial"},
{"category": "Security"},
{"category": "Business"},
],
},
]
return check_types
def get_metadata_valid_resource_type(provider: str = "aws") -> set:
"""Get the valid resource types for the provider
Args:
provider: The Prowler provider.
Returns:
A set of valid resource types for the given provider.
"""
valid_resource_types = set()
if provider == "aws":
valid_resource_types = {
"AwsIamAccessKey",
"AwsElbLoadBalancer",
"AwsRedshiftCluster",
"AwsEventsEndpoint",
"AwsElbv2LoadBalancer",
"AwsAutoScalingLaunchConfiguration",
"AwsWafv2RuleGroup",
"AwsWafRegionalRule",
"AwsCloudFrontDistribution",
"AwsWafRegionalWebAcl",
"AwsWafRateBasedRule",
"AwsCertificateManagerCertificate",
"AwsKmsKey",
"AwsDmsEndpoint",
"AwsLambdaLayerVersion",
"AwsIamRole",
"AwsElasticBeanstalkEnvironment",
"AwsBackupBackupPlan",
"AwsEc2ClientVpnEndpoint",
"AwsEcrContainerImage",
"AwsSqsQueue",
"AwsIamGroup",
"AwsOpenSearchServiceDomain",
"AwsApiGatewayV2Api",
"AwsCloudTrailTrail",
"AwsWafWebAcl",
"AwsEc2Subnet",
"AwsEc2VpcPeeringConnection",
"AwsEc2VpcEndpointService",
"AwsCodeBuildProject",
"AwsLambdaFunction",
"AwsNetworkFirewallRuleGroup",
"AwsDmsReplicationInstance",
"AwsRdsEventSubscription",
"AwsCloudWatchAlarm",
"AwsS3AccountPublicAccessBlock",
"AwsWafRegionalRateBasedRule",
"AwsRdsDbInstance",
"AwsEksCluster",
"AwsXrayEncryptionConfig",
"AwsWafv2WebAcl",
"AwsWafRuleGroup",
"AwsBackupBackupVault",
"AwsKinesisStream",
"AwsNetworkFirewallFirewallPolicy",
"AwsEc2NetworkInterface",
"AwsEcsTaskDefinition",
"AwsMskCluster",
"AwsApiGatewayRestApi",
"AwsS3Object",
"AwsRdsDbSnapshot",
"AwsBackupRecoveryPoint",
"AwsWafRule",
"AwsS3AccessPoint",
"AwsApiGatewayV2Stage",
"AwsGuardDutyDetector",
"AwsEfsAccessPoint",
"AwsEcsContainer",
"AwsEcsTask",
"AwsS3Bucket",
"AwsSageMakerNotebookInstance",
"AwsNetworkFirewallFirewall",
"AwsStepFunctionStateMachine",
"AwsIamUser",
"AwsAppSyncGraphQLApi",
"AwsApiGatewayStage",
"AwsEcrRepository",
"AwsEcsService",
"AwsEc2Vpc",
"AwsAmazonMQBroker",
"AwsWafRegionalRuleGroup",
"AwsEventSchemasRegistry",
"AwsRoute53HostedZone",
"AwsEventsEventbus",
"AwsDmsReplicationTask",
"AwsEc2Instance",
"AwsEcsCluster",
"AwsRdsDbSecurityGroup",
"AwsCloudFormationStack",
"AwsSnsTopic",
"AwsDynamoDbTable",
"AwsRdsDbCluster",
"AwsEc2Eip",
"AwsEc2RouteTable",
"AwsEc2TransitGateway",
"AwsElasticSearchDomain",
"AwsEc2LaunchTemplate",
"AwsEc2Volume",
"AwsAthenaWorkGroup",
"AwsSecretsManagerSecret",
"AwsEc2SecurityGroup",
"AwsIamPolicy",
"AwsSsmPatchCompliance",
"AwsAutoScalingAutoScalingGroup",
"AwsEc2NetworkAcl",
"AwsRdsDbClusterSnapshot",
}
return valid_resource_types
def get_metadata_placeholder_resource_type(provider: str = "aws") -> str:
"""Get the placeholder for the resource type for the provider
Args:
provider: The Prowler provider.
Returns:
A placeholder for the resource type for the given provider.
"""
placeholder = ""
if provider == "aws":
placeholder = "Other"
return placeholder

View File

@@ -0,0 +1,132 @@
def load_check_template(provider: str, service: str, check_name: str) -> str:
"""Load the template for the check file.
Args:
provider (str): The provider of the service.
service (str): The service to check.
check_name (str): The name of the check.
Returns:
A check template used when the user does not want to generate the check with AI.
Raises:
ValueError: If the provider is not implemented yet.
"""
if provider == "aws":
return f"""
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.{service}.{service}_client import {service}_client
from typing import List
class {check_name}(Check):
def execute(self) -> List[Check_Report_AWS]:
findings = []
for <resource_arn>, <resource_to_check> in {service}_client.<resources_dict>.items():
report = Check_Report_AWS(self.metadata())
report.region = <resource_to_check>.region
report.resource_id = <resource_to_check>.name
report.resource_arn = <resource_arn>
report.resource_tags = <resource_to_check>.tags
report.status = "FAIL"
report.status_extended = f"..."
if <check_logic>:
report.status = "PASS"
report.status_extended = f"..."
findings.append(report)
return findings
"""
else:
raise ValueError(f"Template for {provider} not implemented yet")
def load_test_template(provider: str, service: str, check_name: str) -> str:
"""Load the template for the test file.
Args:
provider: The provider of the service (e.g., "aws").
service: The service to check (e.g., "s3").
check_name: The name of the check (e.g., "check_bucket_encryption").
Returns:
A test template used when the user does not want to generate the check with AI.
Raises:
ValueError: If the template for the given provider is not implemented.
"""
if provider == "aws":
return f"""
from unittest import mock
from boto3 import client
from moto import mock_aws
from tests.providers.aws.utils import (
AWS_REGION_EU_WEST_1,
set_mocked_aws_provider,
)
class Test_{check_name}:
@mock_aws
def test_<no_attribute>(self):
from prowler.providers.aws.services.{service}.{service}_service import <service_class_name>
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.{service}.{check_name}.{check_name}.{service}_client",
new=<service_class_name>(aws_provider),
):
# Test Check
from prowler.providers.aws.services.{service}.{check_name}.{check_name} import (
{check_name},
)
check = {check_name}()
result = check.execute()
assert len(result) == 0
@mock_aws
def test_one_compliant_{service}(self):
{service}_client = client("{service}", region_name=AWS_REGION_EU_WEST_1)
# Create a compliant resource
from prowler.providers.aws.services.{service}.{service}_service import <service_class_name>
aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1])
with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.{service}.{check_name}.{check_name}.{service}_client",
new=<service_class_name>(aws_provider),
):
from prowler.providers.aws.services.{service}.{check_name}.{check_name} import (
{check_name},
)
check = {check_name}()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert result[0].status_extended == "..."
assert result[0].region == AWS_REGION_EU_WEST_1
assert result[0].resource_id == <resource>.id
assert (
result[0].resource_arn
== f"arn:(aws_partition):{service}:(region):(account_id):(resource)"
)
assert result[0].resource_tags == <resource>.tags
"""
else:
raise ValueError(f"Template for {provider} not implemented yet")

View File

@@ -0,0 +1,304 @@
#!/usr/bin/env python3
import json
import os
import sys
from util.prowler_check_kreator.lib.templates import (
load_check_template,
load_test_template,
)
class ProwlerCheckKreator:
def __init__(self, provider: str, check_name: str):
# Validate provider
SUPPORTED_PROVIDERS = {"aws"}
if provider in SUPPORTED_PROVIDERS:
self._provider = provider
else:
raise ValueError(
f"Invalid provider. Supported providers: {', '.join(SUPPORTED_PROVIDERS)}"
)
# Find the Prowler folder
self._prowler_folder = os.path.abspath(
os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)
)
# Validate if service exists for the selected provider
service_name = check_name.split("_")[0]
service_path = os.path.join(
self._prowler_folder,
"prowler/providers/",
provider,
"services/",
service_name,
)
if os.path.exists(service_path):
self._service_name = service_name
else:
raise ValueError(
f"Service {service_name} does not exist for {provider}. Please introduce a valid service"
)
# Ask user if want to use Gemini for all the process
user_input = (
input(
"Do you want to use Gemini to create the check and metadata? Type 'yes'/'no' and press enter: "
)
.strip()
.lower()
)
if user_input == "yes":
# Let the user to use the model that he wants
supported_models = [
"gemini-1.5-flash",
"gemini-1.5-pro",
"gemini-1.0-pro",
]
print("Select the model that you want to use:")
for i, model in enumerate(supported_models):
print(f"{i + 1}. {model}")
user_input = input(
"Type the number of the model and press enter (default is 1): "
).strip()
if not user_input:
model_index = 1
else:
model_index = int(user_input)
if model_index < 1 or model_index > len(supported_models):
raise ValueError("Invalid model selected.")
model_name = supported_models[model_index - 1]
if "gemini" in model_name:
from util.prowler_check_kreator.lib.llms.gemini import Gemini
self._model = Gemini(model_name)
# Provide some context about the check to create
self._context = (
input(
"Please provide some context to generate the check and metadata:\n"
)
).strip()
else:
raise ValueError("Invalid model selected.")
elif user_input == "no":
self._model = None
self._context = ""
else:
raise ValueError("Invalid input. Please type 'yes' or 'no'.")
if not self._check_exists(check_name):
self._check_name = check_name
self._check_path = os.path.join(
self._prowler_folder,
"prowler/providers/",
provider,
"services/",
service_name,
check_name,
)
else:
# Check already exists, give the user the possibility to continue or not
user_input = (
input(
f"Some files of {check_name} already exists. Do you want to continue and overwrite it? Type 'yes' if you want to continue: "
)
.strip()
.lower()
)
if user_input == "yes":
self._check_name = check_name
self._check_path = os.path.join(
self._prowler_folder,
"prowler/providers/",
provider,
"services/",
service_name,
check_name,
)
else:
raise ValueError(f"Check {check_name} already exists.")
def kreate_check(self) -> None:
"""Create a new check in Prowler"""
# Create the check
print(f"Creating check {self._check_name} for {self._provider}")
# Inside the check folder, create the check files: __init__.py, check_name.py, and check_name.metadata.json
os.makedirs(self._check_path, exist_ok=True)
with open(os.path.join(self._check_path, "__init__.py"), "w") as f:
f.write("")
self._write_check_file()
self._write_metadata_file()
# Create test directory if it does not exist
test_folder = os.path.join(
self._prowler_folder,
"tests/providers/",
self._provider,
"services/",
self._service_name,
self._check_name,
)
os.makedirs(test_folder, exist_ok=True)
self._write_test_file()
print(f"Check {self._check_name} created successfully")
def _check_exists(self, check_name: str) -> bool:
"""Ensure if any file related to the check already exists.
Args:
check_name: The name of the check.
Returns:
True if the check already exists, False otherwise.
"""
# Get the check path
check_path = os.path.join(
self._prowler_folder,
"prowler/providers/",
self._provider,
"services/",
self._service_name,
check_name,
)
# Get the test path
_test_path = os.path.join(
self._prowler_folder,
"tests/providers/",
self._provider,
"services/",
self._service_name,
check_name,
)
# Check if exits check.py, check_metadata.json or check_test.py
return (
os.path.exists(check_path)
or os.path.exists(os.path.join(check_path, "__init__.py"))
or os.path.exists(os.path.join(check_path, f"{check_name}.py"))
or os.path.exists(os.path.join(check_path, f"{check_name}.metadata.json"))
or os.path.exists(_test_path)
)
def _write_check_file(self) -> None:
"""Write the check file"""
if self._model is None:
check_content = load_check_template(
self._provider, self._service_name, self._check_name
)
else:
check_content = self._model.generate_check(
check_name=self._check_name, context=self._context
)
with open(os.path.join(self._check_path, f"{self._check_name}.py"), "w") as f:
f.write(check_content)
def _write_metadata_file(self) -> None:
"""Write the metadata file"""
metadata_template = {
"Provider": self._provider,
"CheckID": self._check_name,
"CheckTitle": "",
"CheckType": [],
"ServiceName": self._service_name,
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": "<critical, high, medium or low>",
"ResourceType": "",
"Description": "",
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "",
"Terraform": "",
},
"Recommendation": {"Text": "", "Url": ""},
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
}
if self._model is None:
filled_metadata = metadata_template
else:
filled_metadata = self._model.generate_metadata(
metadata_template, self._context
)
with open(
os.path.join(self._check_path, f"{self._check_name}.metadata.json"), "w"
) as f:
f.write(json.dumps(filled_metadata, indent=2))
def _write_test_file(self) -> None:
"""Write the test file"""
test_folder = os.path.join(
self._prowler_folder,
"tests/providers/",
self._provider,
"services/",
self._service_name,
self._check_name,
)
if self._model is None:
test_template = load_test_template(
self._provider, self._service_name, self._check_name
)
else:
test_template = self._model.generate_test(self._check_name)
with open(os.path.join(test_folder, f"{self._check_name}_test.py"), "w") as f:
f.write(test_template)
if __name__ == "__main__":
try:
if len(sys.argv) != 3:
raise ValueError(
"Invalid arguments. Usage: python prowler_check_kreator.py <cloud_provider> <check_name>"
)
prowler_check_creator = ProwlerCheckKreator(sys.argv[1], sys.argv[2])
sys.exit(prowler_check_creator.kreate_check())
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
except Exception as e:
print(f"Unexpected error: {e}")
sys.exit(1)