feat(llm): add LLM provider (#8555)

Co-authored-by: Andoni A. <14891798+andoniaf@users.noreply.github.com>
This commit is contained in:
Sergio Garcia
2025-09-29 11:24:10 -04:00
committed by GitHub
parent 52a5fff61f
commit 53bb5aff22
25 changed files with 175953 additions and 15 deletions

View File

@@ -6,6 +6,7 @@ repos:
- id: check-merge-conflict
- id: check-yaml
args: ["--unsafe"]
exclude: prowler/config/llm_config.yaml
- id: check-json
- id: end-of-file-fixer
- id: trailing-whitespace

View File

@@ -90,6 +90,7 @@ prowler dashboard
| M365 | 70 | 7 | 3 | 2 | Official | Stable | UI, API, CLI |
| IaC | [See `trivy` docs.](https://trivy.dev/latest/docs/coverage/iac/) | N/A | N/A | N/A | Official | Beta | CLI |
| MongoDB Atlas | 10 | 3 | 0 | 0 | Official | Beta | CLI |
| LLM | [See `promptfoo` docs.](https://www.promptfoo.dev/docs/red-team/plugins/) | N/A | N/A | N/A | Official | Beta | CLI |
| NHN | 6 | 2 | 1 | 0 | Unofficial | Beta | CLI |
> [!Note]

View File

@@ -0,0 +1,102 @@
# LLM Provider
This page details the [Large Language Model (LLM)](https://en.wikipedia.org/wiki/Large_language_model) provider implementation in Prowler.
The LLM provider enables security testing of language models using red team techniques. By default, Prowler uses the built-in LLM configuration that targets OpenAI models with comprehensive security test suites. To configure it, follow the [LLM getting started guide](../tutorials/llm/getting-started-llm.md).
## LLM Provider Classes Architecture
The LLM provider implementation follows the general [Provider structure](./provider.md). This section focuses on the LLM-specific implementation, highlighting how the generic provider concepts are realized for LLM security testing in Prowler. For a full overview of the provider pattern, base classes, and extension guidelines, see [Provider documentation](./provider.md).
### Main Class
- **Location:** [`prowler/providers/llm/llm_provider.py`](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/llm/llm_provider.py)
- **Base Class:** Inherits from `Provider` (see [base class details](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/common/provider.py)).
- **Purpose:** Central orchestrator for LLM-specific logic, configuration management, and integration with promptfoo for red team testing.
- **Key LLM Responsibilities:**
- Initializes and manages LLM configuration using promptfoo.
- Validates configuration and sets up the LLM testing context.
- Loads and manages red team test configuration, plugins, and target models.
- Provides properties and methods for downstream LLM security testing.
- Integrates with promptfoo for comprehensive LLM security evaluation.
### Data Models
- **Location:** [`prowler/providers/llm/models.py`](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/llm/models.py)
- **Purpose:** Define structured data for LLM output options and configuration.
- **Key LLM Models:**
- `LLMOutputOptions`: Customizes output filename logic for LLM-specific reporting.
### LLM Security Testing Integration
- **Location:** [`prowler/providers/llm/llm_provider.py`](https://github.com/prowler-cloud/prowler/blob/master/prowler/providers/llm/llm_provider.py)
- **Purpose:** Integrates with promptfoo for comprehensive LLM security testing.
- **Key LLM Responsibilities:**
- Executes promptfoo red team evaluations against target LLMs.
- Processes security test results and converts them to Prowler reports.
- Manages test concurrency and progress tracking.
- Handles real-time streaming of test results.
### Configuration Management
The LLM provider uses promptfoo configuration files to define:
- **Target Models**: The LLM models to test (e.g., OpenAI GPT, Anthropic Claude)
- **Red Team Plugins**: Security test suites (OWASP, MITRE, NIST, EU AI Act)
- **Test Parameters**: Concurrency, test counts, and evaluation criteria
### Default Configuration
Prowler includes a comprehensive default LLM configuration that:
- Targets OpenAI models by default
- Includes multiple security test frameworks (OWASP, MITRE, NIST, EU AI Act)
- Provides extensive test coverage for LLM security vulnerabilities
- Supports custom configuration for specific testing needs
## Specific Patterns in LLM Security Testing
The LLM provider implements security testing through integration with promptfoo, following these patterns:
### Red Team Testing Framework
- **Plugin-based Architecture**: Uses promptfoo plugins for different security test categories
- **Comprehensive Coverage**: Includes OWASP LLM Top 10, MITRE ATLAS, NIST AI Risk Management, and EU AI Act compliance
- **Real-Time Evaluation**: Streams test results as they are generated
- **Progress Tracking**: Provides detailed progress information during test execution
### Test Execution Flow
1. **Configuration Loading**: Loads promptfoo configuration with target models and test plugins
2. **Test Generation**: Generates security test cases based on configured plugins
3. **Concurrent Execution**: Runs tests with configurable concurrency limits
4. **Result Processing**: Converts promptfoo results to Prowler security reports
5. **Progress Monitoring**: Tracks and displays test execution progress
### Security Test Categories
The LLM provider supports comprehensive security testing across multiple frameworks:
- **OWASP LLM Top 10**: Covers prompt injection, data leakage, and model security
- **MITRE ATLAS**: Adversarial threat landscape for AI systems
- **NIST AI Risk Management**: AI system risk assessment and mitigation
- **EU AI Act**: European Union AI regulation compliance
- **Custom Tests**: Support for organization-specific security requirements
## Error Handling and Validation
The LLM provider includes comprehensive error handling for:
- **Configuration Validation**: Ensures valid promptfoo configuration files
- **Model Access**: Handles authentication and access issues with target LLMs
- **Test Execution**: Manages test failures and timeout scenarios
- **Result Processing**: Handles malformed or incomplete test results
## Integration with Prowler Ecosystem
The LLM provider seamlessly integrates with Prowler's existing infrastructure:
- **Output Formats**: Supports all Prowler output formats (JSON, CSV, HTML, etc.)
- **Compliance Frameworks**: Integrates with Prowler's compliance reporting
- **Fixer Integration**: Supports automated remediation recommendations
- **Dashboard Integration**: Compatible with Prowler App for centralized management

View File

@@ -14,6 +14,7 @@ The official supported providers right now are:
| **Github** | Official | Stable | UI, API, CLI |
| **IaC** | Official | Beta | CLI |
| **MongoDB Atlas** | Official | Beta | CLI |
| **LLM** | Official | Beta | CLI |
| **NHN** | Unofficial | Beta | CLI |
Prowler supports **auditing, incident response, continuous monitoring, hardening, forensic readiness, and remediation**.

View File

@@ -0,0 +1,142 @@
# Getting Started With LLM on Prowler
## Overview
Prowler's LLM provider enables comprehensive security testing of large language models using red team techniques. It integrates with [promptfoo](https://promptfoo.dev/) to provide extensive security evaluation capabilities.
## Prerequisites
Before using the LLM provider, ensure the following requirements are met:
- **promptfoo installed**: The LLM provider requires promptfoo to be installed on the system
- **LLM API access**: Valid API keys for the target LLM models to test
- **Email verification**: promptfoo requires email verification for red team evaluations
## Installation
### Install promptfoo
Install promptfoo using one of the following methods:
**Using npm:**
```bash
npm install -g promptfoo
```
**Using Homebrew (macOS):**
```bash
brew install promptfoo
```
**Using other package managers:**
See the [promptfoo installation guide](https://promptfoo.dev/docs/installation/) for additional installation methods.
### Verify Installation
```bash
promptfoo --version
```
## Configuration
### Step 1: Email Verification
promptfoo requires email verification for red team evaluations. Set the email address:
```bash
promptfoo config set email your-email@company.com
```
### Step 2: Configure LLM API Keys
Set up API keys for the target LLM models. For OpenAI (default configuration):
```bash
export OPENAI_API_KEY="your-openai-api-key"
```
For other providers, see the [promptfoo documentation](https://promptfoo.dev/docs/providers/) for specific configuration requirements.
### Step 3: Generate Test Cases (Optional)
Prowler provides a default suite of red team tests but to customize the test cases, generate them first:
```bash
promptfoo redteam generate
```
This creates test cases based on your configuration.
## Usage
### Basic Usage
Run LLM security testing with the default configuration:
```bash
prowler llm
```
### Custom Configuration
Use a custom promptfoo configuration file:
```bash
prowler llm --config-path /path/to/your/config.yaml
```
### Output Options
Generate reports in various formats:
```bash
# JSON output
prowler llm --output-format json
# CSV output
prowler llm --output-format csv
# HTML report
prowler llm --output-format html
```
### Concurrency Control
Adjust the number of concurrent tests:
```bash
prowler llm --max-concurrency 5
```
## Default Configuration
Prowler includes a comprehensive default LLM configuration that provides:
- **Target Models**: OpenAI GPT models by default
- **Security Frameworks**:
- OWASP LLM Top 10
- OWASP API Top 10
- MITRE ATLAS
- NIST AI Risk Management Framework
- EU AI Act compliance
- **Test Coverage**: Over 5,000 security test cases
- **Plugin Support**: Multiple security testing plugins
## Advanced Configuration
### Custom Test Suites
Create custom test configurations by modifying the promptfoo config file in `prowler/config/llm_config.yaml` or pass a custom configuration with `--config-file` flag:
```yaml
description: Custom LLM Security Tests
targets:
- id: openai:gpt-4
redteam:
plugins:
- id: owasp:llm
numTests: 10
- id: mitre:atlas
numTests: 5
```

View File

@@ -138,7 +138,8 @@ nav:
- MongoDB Atlas:
- Getting Started: tutorials/mongodbatlas/getting-started-mongodbatlas.md
- Authentication: tutorials/mongodbatlas/authentication.md
- LLM:
- Getting Started: tutorials/llm/getting-started-llm.md
- Compliance:
- ThreatScore: tutorials/compliance/threatscore.md
@@ -159,6 +160,7 @@ nav:
- Kubernetes: developer-guide/kubernetes-details.md
- Microsoft 365: developer-guide/m365-details.md
- GitHub: developer-guide/github-details.md
- LLM: developer-guide/llm-details.md
- Miscellaneous:
- Documentation: developer-guide/documentation.md
- Testing:

View File

@@ -7,6 +7,7 @@ All notable changes to the **Prowler SDK** are documented in this file.
### Added
- Support for AdditionalURLs in outputs [(#8651)](https://github.com/prowler-cloud/prowler/pull/8651)
- Support for markdown metadata fields in Dashboard [(#8667)](https://github.com/prowler-cloud/prowler/pull/8667)
- LLM provider using `promptfoo` [(#8555)](https://github.com/prowler-cloud/prowler/pull/8555)
- Documentation for renaming checks [(#8717)](https://github.com/prowler-cloud/prowler/pull/8717)
- Add explicit "name" field for each compliance framework and include "FRAMEWORK" and "NAME" in CSV output [(#7920)](https://github.com/prowler-cloud/prowler/pull/7920)

View File

@@ -89,7 +89,7 @@ from prowler.lib.outputs.csv.csv import CSV
from prowler.lib.outputs.finding import Finding
from prowler.lib.outputs.html.html import HTML
from prowler.lib.outputs.ocsf.ocsf import OCSF
from prowler.lib.outputs.outputs import extract_findings_statistics
from prowler.lib.outputs.outputs import extract_findings_statistics, report
from prowler.lib.outputs.slack.slack import Slack
from prowler.lib.outputs.summary_table import display_summary_table
from prowler.providers.aws.lib.s3.s3 import S3
@@ -102,6 +102,7 @@ from prowler.providers.gcp.models import GCPOutputOptions
from prowler.providers.github.models import GithubOutputOptions
from prowler.providers.iac.models import IACOutputOptions
from prowler.providers.kubernetes.models import KubernetesOutputOptions
from prowler.providers.llm.models import LLMOutputOptions
from prowler.providers.m365.models import M365OutputOptions
from prowler.providers.mongodbatlas.models import MongoDBAtlasOutputOptions
from prowler.providers.nhn.models import NHNOutputOptions
@@ -180,8 +181,8 @@ def prowler():
# Load compliance frameworks
logger.debug("Loading compliance frameworks from .json files")
# Skip compliance frameworks for IAC provider
if provider != "iac":
# Skip compliance frameworks for IAC and LLM providers
if provider != "iac" and provider != "llm":
bulk_compliance_frameworks = Compliance.get_bulk(provider)
# Complete checks metadata with the compliance framework specification
bulk_checks_metadata = update_checks_metadata_with_compliance(
@@ -238,8 +239,8 @@ def prowler():
if not args.only_logs:
global_provider.print_credentials()
# Skip service and check loading for IAC provider
if provider != "iac":
# Skip service and check loading for IAC and LLM providers
if provider != "iac" and provider != "llm":
# Import custom checks from folder
if checks_folder:
custom_checks = parse_checks_from_folder(global_provider, checks_folder)
@@ -322,6 +323,8 @@ def prowler():
)
elif provider == "iac":
output_options = IACOutputOptions(args, bulk_checks_metadata)
elif provider == "llm":
output_options = LLMOutputOptions(args, bulk_checks_metadata)
# Run the quick inventory for the provider if available
if hasattr(args, "quick_inventory") and args.quick_inventory:
@@ -331,9 +334,20 @@ def prowler():
# Execute checks
findings = []
if provider == "iac":
# For IAC provider, run the scan directly
findings = global_provider.run()
if provider == "iac" or provider == "llm":
# For IAC and LLM providers, run the scan directly
if provider == "llm":
def streaming_callback(findings_batch):
"""Callback to report findings as they are processed in real-time."""
report(findings_batch, global_provider, output_options)
findings = global_provider.run_scan(streaming_callback=streaming_callback)
else:
# Original behavior for IAC or non-verbose LLM
findings = global_provider.run()
# Report findings for verbose output
report(findings, global_provider, output_options)
elif len(checks_to_execute):
findings = execute_checks(
checks_to_execute,

View File

View File

@@ -76,6 +76,9 @@ default_config_file_path = (
default_fixer_config_file_path = (
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/fixer_config.yaml"
)
default_redteam_config_file_path = (
f"{pathlib.Path(os.path.dirname(os.path.realpath(__file__)))}/llm_config.yaml"
)
encoding_format_utf_8 = "utf-8"
available_output_formats = ["csv", "json-asff", "json-ocsf", "html"]

175015
prowler/config/llm_config.yaml Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -157,7 +157,11 @@ class CheckMetadata(BaseModel):
raise ValueError("ServiceName must be a non-empty string")
check_id = values.get("CheckID")
if check_id and values.get("Provider") != "iac":
if (
check_id
and values.get("Provider") != "iac"
and values.get("Provider") != "llm"
):
service_from_check_id = check_id.split("_")[0]
if service_name != service_from_check_id:
raise ValueError(
@@ -173,7 +177,11 @@ class CheckMetadata(BaseModel):
if not check_id:
raise ValueError("CheckID must be a non-empty string")
if check_id and values.get("Provider") != "iac":
if (
check_id
and values.get("Provider") != "iac"
and values.get("Provider") != "llm"
):
if "-" in check_id:
raise ValueError(
f"CheckID {check_id} contains a hyphen, which is not allowed"
@@ -694,6 +702,31 @@ class CheckReportIAC(Check_Report):
)
@dataclass
class CheckReportLLM(Check_Report):
"""Contains the LLM Check's finding information."""
prompt: str
response: str
model: str
def __init__(self, metadata: dict = {}, finding: dict = {}) -> None:
"""
Initialize the LLM Check's finding information from a promptfoo finding dict.
Args:
metadata (Dict): Optional check metadata (can be None).
finding (dict): A single finding result from promptfoo's JSON output.
"""
super().__init__(metadata, finding)
self.prompt = finding.get("prompt", {}).get("raw", "No prompt available.")
self.response = finding.get("response", {}).get(
"output", "No output available."
)
self.model = finding.get("provider", {}).get("id", "No model available.")
@dataclass
class CheckReportNHN(Check_Report):
"""Contains the NHN Check's finding information."""

View File

@@ -15,7 +15,7 @@ def recover_checks_from_provider(
"""
try:
# Bypass check loading for IAC provider since it uses Trivy directly
if provider == "iac":
if provider == "iac" or provider == "llm":
return []
checks = []

View File

@@ -29,16 +29,17 @@ class ProwlerArgumentParser:
usage="prowler [-h] [--version] {aws,azure,gcp,kubernetes,m365,github,nhn,mongodbatlas,dashboard,iac} ...",
epilog="""
Available Cloud Providers:
{aws,azure,gcp,kubernetes,m365,github,iac,nhn,mongodbatlas}
{aws,azure,gcp,kubernetes,m365,github,iac,llm,nhn,mongodbatlas}
aws AWS Provider
azure Azure Provider
gcp GCP Provider
kubernetes Kubernetes Provider
m365 Microsoft 365 Provider
github GitHub Provider
iac IaC Provider (Preview)
iac IaC Provider (Beta)
llm LLM Provider (Beta)
nhn NHN Provider (Unofficial)
mongodbatlas MongoDB Atlas Provider
mongodbatlas MongoDB Atlas Provider (Beta)
Available components:
dashboard Local dashboard

View File

@@ -315,6 +315,14 @@ class Finding(BaseModel):
output_data["resource_line_range"] = check_output.resource_line_range
output_data["framework"] = check_output.check_metadata.ServiceName
elif provider.type == "llm":
output_data["auth_method"] = provider.auth_method
output_data["account_uid"] = "llm"
output_data["account_name"] = "llm"
output_data["resource_name"] = check_output.model
output_data["resource_uid"] = check_output.model
output_data["region"] = check_output.model
# check_output Unique ID
# TODO: move this to a function
# TODO: in Azure, GCP and K8s there are findings without resource_name

View File

@@ -872,6 +872,49 @@ class HTML(Output):
)
return ""
@staticmethod
def get_llm_assessment_summary(provider: Provider) -> str:
"""
get_llm_assessment_summary gets the HTML assessment summary for the LLM provider
Args:
provider (Provider): the LLM provider object
Returns:
str: HTML assessment summary for the LLM provider
"""
try:
return f"""
<div class="card">
<div class="card-header">
<h5 class="card-title mb-0">
<i class="fas fa-robot"></i> LLM Security Assessment Summary
</h5>
</div>
<div class="card-body">
<ul class="list-group
list-group-flush">
<li class="list-group-item">
<b>Target LLM:</b> {provider.model}
</li>
<li class="list-group-item">
<b>Plugins:</b> {", ".join(provider.plugins)}
</li>
<li class="list-group-item">
<b>Max concurrency:</b> {provider.max_concurrency}
</li>
<li class="list-group-item">
<b>Config file:</b> {provider.config_path if provider.config_path else "Using promptfoo defaults"}
</li>
</ul>
</div>
</div>"""
except Exception as error:
logger.error(
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}] -- {error}"
)
return ""
@staticmethod
def get_assessment_summary(provider: Provider) -> str:
"""

View File

@@ -24,6 +24,10 @@ def stdout_report(finding, color, verbose, status, fix):
details = finding.location
if finding.check_metadata.Provider == "nhn":
details = finding.location
if finding.check_metadata.Provider == "llm":
details = finding.check_metadata.CheckID
if finding.check_metadata.Provider == "iac":
details = finding.check_metadata.CheckID
if (verbose or fix) and (not status or finding.status in status):
if finding.muted:

View File

@@ -64,6 +64,9 @@ def display_summary_table(
else:
entity_type = "Directory"
audited_entities = provider.scan_path
elif provider.type == "llm":
entity_type = "LLM"
audited_entities = provider.model
# Check if there are findings and that they are not all MANUAL
if findings and not all(finding.status == "MANUAL" for finding in findings):

View File

@@ -261,6 +261,12 @@ class Provider(ABC):
personal_access_token=arguments.personal_access_token,
oauth_app_token=arguments.oauth_app_token,
)
elif "llm" in provider_class_name.lower():
provider_class(
max_concurrency=arguments.max_concurrency,
config_path=arguments.config_file,
fixer_config=fixer_config,
)
elif "mongodbatlas" in provider_class_name.lower():
provider_class(
atlas_public_key=arguments.atlas_public_key,

View File

View File

View File

@@ -0,0 +1,13 @@
def init_parser(self):
"""Init the LLM Provider CLI parser"""
llm_parser = self.subparsers.add_parser(
"llm", parents=[self.common_providers_parser], help="LLM Provider"
)
llm_parser.add_argument(
"--max-concurrency",
dest="max_concurrency",
type=int,
default=10,
help="Maximum number of concurrent requests. Default: 10",
)

View File

@@ -0,0 +1,518 @@
import json
import os
import subprocess
import sys
from typing import List
import yaml
from alive_progress import alive_bar
from colorama import Fore, Style
from prowler.config.config import (
default_config_file_path,
default_redteam_config_file_path,
load_and_validate_config_file,
)
from prowler.lib.check.models import CheckReportLLM
from prowler.lib.logger import logger
from prowler.lib.utils.utils import print_boxes
from prowler.providers.common.models import Audit_Metadata
from prowler.providers.common.provider import Provider
class LlmProvider(Provider):
_type: str = "llm"
audit_metadata: Audit_Metadata
model: str = ""
def __init__(
self,
max_concurrency: int = 10,
config_path: str = None,
config_content: dict = None,
fixer_config: dict = {},
):
logger.info("Instantiating LLM Provider...")
logger.info(f"Received config_path: {config_path}")
self.max_concurrency = max_concurrency
# For LLM provider, only use config_path if it's not the default Prowler config
if config_path and config_path != default_config_file_path:
self.config_path = config_path
else:
self.config_path = default_redteam_config_file_path
# Read config file and extract model
with open(self.config_path, "r") as config_file:
config = yaml.safe_load(config_file)
self.model = config.get("targets", [])[0].get("id", "No model available.")
# Extract only the plugin IDs
plugins_data = config.get("redteam", {}).get("plugins", [])
self.plugins = [
plugin.get("id") for plugin in plugins_data if plugin.get("id")
]
self.region = "global"
self.audited_account = "local-llm"
self._session = None
self._identity = "prowler"
self._auth_method = "No auth"
# Audit Config
if config_content:
self._audit_config = config_content
elif self.config_path:
self._audit_config = load_and_validate_config_file(
self._type, self.config_path
)
else:
# For LLM provider, use empty config if no config file provided
self._audit_config = {}
# Fixer Config
self._fixer_config = fixer_config
# Mutelist (not needed for LLM since promptfoo has its own logic)
self._mutelist = None
self.audit_metadata = Audit_Metadata(
provider=self._type,
account_id=self.audited_account,
account_name="llm",
region=self.region,
services_scanned=0, # LLM doesn't use services
expected_checks=[], # LLM doesn't use checks
completed_checks=0, # LLM doesn't use progress tracking
audit_progress=0, # LLM doesn't use progress tracking
)
# Set this provider as the global provider
Provider.set_global_provider(self)
@property
def type(self):
return self._type
@property
def identity(self):
return self._identity
@property
def session(self):
return self._session
@property
def audit_config(self):
return self._audit_config
@property
def fixer_config(self):
return self._fixer_config
@property
def auth_method(self):
return self._auth_method
def setup_session(self):
"""LLM provider doesn't need a session since it uses promptfoo directly"""
def _process_check(self, finding: dict) -> CheckReportLLM:
"""
Process a single check (failed or passed) and create a CheckReportIAC object.
Args:
finding: The finding object from Trivy output
file_path: The path to the file that contains the finding
type: The type of the finding
Returns:
CheckReportIAC: The processed check report
"""
try:
status = "FAIL"
if finding.get("success"):
status = "PASS"
metadata_dict = {
"Provider": "llm",
"CheckID": finding["metadata"]["pluginId"],
"CheckTitle": finding["metadata"]["goal"],
"CheckType": ["LLM Security"],
"ServiceName": finding["metadata"]["pluginId"].split(":")[0],
"SubServiceName": "",
"ResourceIdTemplate": "",
"Severity": finding["metadata"]["severity"].lower(),
"ResourceType": "llm",
"Description": finding["metadata"]["goal"],
"Risk": "",
"RelatedUrl": "",
"Remediation": {
"Code": {
"NativeIaC": "",
"Terraform": "",
"CLI": "",
"Other": "",
},
"Recommendation": {
"Text": "",
"Url": "",
},
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": "",
}
# Convert metadata dict to JSON string
metadata = json.dumps(metadata_dict)
report = CheckReportLLM(
metadata=metadata,
finding=finding,
)
report.status = status
status_extended = (
finding.get("gradingResult", {})
.get("componentResults", [{}])[0]
.get("reason", "No assertions found.")
)
report.status_extended = status_extended
return report
except Exception as error:
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
sys.exit(1)
def _process_finding_line(
self, line: str, reports: list, streaming_callback=None, progress_counter=None
) -> bool:
"""
Process a single line from the report file and add to reports if valid.
Args:
line: JSON line from the report file
reports: List to append the processed report to
streaming_callback: Optional callback for streaming mode
progress_counter: Optional dict to track progress {'completed': int, 'total': int, 'completed_test_ids': set}
Returns:
bool: True if a valid finding was processed, False otherwise
"""
try:
finding = json.loads(line.strip())
# Extract testIdx and track unique tests
test_idx = finding.get("testIdx")
if test_idx is not None and progress_counter is not None:
if test_idx not in progress_counter["completed_test_ids"]:
progress_counter["completed_test_ids"].add(test_idx)
progress_counter["completed"] += 1
if finding.get("prompt", {}).get("raw"):
if finding.get("response", {}).get("error"):
logger.error(f"Error: {finding.get('response', {}).get('error')}")
return False
elif finding.get("error"):
logger.error(f"{finding.get('error')}")
return False
report = self._process_check(finding)
if report:
reports.append(report)
if streaming_callback:
streaming_callback([report])
return True
except json.JSONDecodeError as json_error:
logger.error(
f"Error decoding JSON line: {json_error} - Line content: {line.strip()}"
)
return False
def run(self) -> List[CheckReportLLM]:
"""Main method to run the LLM security scan"""
try:
return self.run_scan()
except Exception as error:
logger.error(f"Error running LLM scan: {error}")
return []
def run_scan(self, streaming_callback) -> List[CheckReportLLM]:
"""Run promptfoo red team scan and process its output."""
report_path = None
try:
logger.info("Running LLM security scan...")
# Use config file if provided, otherwise let promptfoo use its defaults
if self.config_path:
if not os.path.exists(self.config_path):
logger.error(f"Config file not found: {self.config_path}")
return []
config_path = self.config_path
logger.info(f"Using provided config file: {config_path}")
# Set output path for the scan results
report_path = "/tmp/prowler_promptfoo_results.jsonl"
promptfoo_command = [
"promptfoo",
"redteam",
"eval",
"--output",
report_path,
"--max-concurrency",
str(self.max_concurrency),
"--no-cache",
"--config",
config_path,
]
logger.info(f"Running promptfoo command: {' '.join(promptfoo_command)}")
process = subprocess.Popen(
promptfoo_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
env=os.environ,
)
return self._stream_findings(process, report_path, streaming_callback)
except Exception as error:
if "No such file or directory: 'promptfoo'" in str(error):
logger.critical(
"Promptfoo binary not found. Please install promptfoo from https://promptfoo.dev/docs/installation/ or use your system package manager (e.g., 'npm install -g promptfoo' or 'brew install promptfoo' on macOS)"
)
sys.exit(1)
logger.critical(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)
return []
finally:
# Clean up temporary report file
if report_path and os.path.exists(report_path):
os.remove(report_path)
logger.info(f"Cleaned up promptfoo report file: {report_path}")
def _stream_findings(self, process, report_path, streaming_callback):
"""Stream findings in real-time as they are written to the output file."""
import queue
import re
import threading
import time
reports = []
processed_lines = set() # Track which lines we've already processed
error_queue = queue.Queue() # Thread-safe communication for errors
def monitor_file():
"""Monitor the output file for new findings."""
try:
while process.poll() is None: # While process is still running
if os.path.exists(report_path):
try:
with open(
report_path, "r", encoding="utf-8"
) as report_file:
lines = report_file.readlines()
# Process only new lines
for i, line in enumerate(lines):
if i not in processed_lines and line.strip():
if self._process_finding_line(
line,
reports,
streaming_callback,
progress_counter,
):
processed_lines.add(i)
except Exception as e:
logger.debug(f"Error reading report file: {e}")
time.sleep(0.5) # Check every 500ms
except Exception as e:
logger.debug(f"Monitor file thread error: {e}")
def process_stdout(error_queue):
"""Process stdout to extract test count information and detect errors."""
try:
for line in process.stdout:
if (
"Redteam evals require email verification. Please enter your work email"
in line
):
error_queue.put(
"Please, provide first your work email in promptfoo with `promptfoo config set email <email>` command."
)
process.terminate()
return
if "No promptfooconfig found" in line:
error_queue.put(
"No config file found. Please, provide a valid promptfoo config file."
)
process.terminate()
return
if (
"Warning: Config file has a redteam section but no test cases."
in line
):
error_queue.put(
"Please, generate first the test cases using `promptfoo redteam generate` command."
)
process.terminate()
return
# Extract total number of tests from stdout
test_count_match = re.search(
r"Running (\d+) test cases \(up to \d+ at a time\)", line
)
if test_count_match and progress_counter["total"] == 0:
progress_counter["total"] = int(test_count_match.group(1))
logger.info(
f"Found {progress_counter['total']} test cases to run"
)
except Exception as e:
logger.debug(f"Process stdout thread error: {e}")
# Create progress counter dictionary
progress_counter = {"completed": 0, "total": 0, "completed_test_ids": set()}
previous_completed = 0 # Track previous completed count for bar updates
# Start monitoring in separate threads
monitor_thread = threading.Thread(target=monitor_file)
monitor_thread.daemon = True
monitor_thread.start()
stdout_thread = threading.Thread(target=process_stdout, args=(error_queue,))
stdout_thread.daemon = True
stdout_thread.start()
# Wait for total number of tests to be detected or error
while process.poll() is None and progress_counter["total"] == 0:
# Check for errors from background thread
try:
error_msg = error_queue.get_nowait()
logger.critical(error_msg)
process.terminate()
process.wait() # Ensure cleanup
sys.exit(1)
except queue.Empty:
pass
time.sleep(0.5) # Wait for total to be detected
# If process finished before we detected total, handle it
if process.poll() is not None and progress_counter["total"] == 0:
# Check for any final errors
try:
error_msg = error_queue.get_nowait()
logger.critical(error_msg)
sys.exit(1)
except queue.Empty:
pass
process.wait()
logger.critical(
f"Promptfoo exited with a non-zero exit code {process.returncode} {process.stderr.read()}"
)
sys.exit(1)
# Now create the progress bar with the known total
with alive_bar(
total=progress_counter["total"],
ctrl_c=False,
bar="blocks",
spinner="classic",
stats=False,
enrich_print=False,
) as bar:
try:
bar.title = f"-> Running LLM security scan on {self.model}..."
# Update progress bar while process is running
while process.poll() is None:
# Check for errors from background thread during execution
try:
error_msg = error_queue.get_nowait()
logger.critical(error_msg)
process.terminate()
process.wait() # Ensure cleanup
bar.title = "-> LLM security scan failed!"
sys.exit(1)
except queue.Empty:
pass
# Update the progress by incrementing by the difference
if progress_counter["completed"] > previous_completed:
bar(progress_counter["completed"] - previous_completed)
previous_completed = progress_counter["completed"]
time.sleep(0.5) # Update every 500ms
# Wait for process to complete
process.wait()
# Wait a bit more for any final findings to be written
time.sleep(1)
# Process any remaining findings
if os.path.exists(report_path):
try:
with open(report_path, "r", encoding="utf-8") as report_file:
lines = report_file.readlines()
for i, line in enumerate(lines):
if i not in processed_lines and line.strip():
self._process_finding_line(
line,
reports,
streaming_callback,
progress_counter,
)
except Exception as e:
logger.error(f"Error processing final findings: {e}")
bar.title = "-> LLM security scan completed!"
except Exception as error:
bar.title = "-> LLM security scan failed!"
raise error
# Check for errors
stderr = process.stderr.read()
if stderr:
logger.error(f"Promptfoo stderr:\n{stderr}")
if (
process.returncode != 0
and process.returncode != 100
and process.returncode is not None
and process.returncode != -2
):
logger.error(
f"Promptfoo exited with a non-zero exit code: {process.returncode}"
)
sys.exit(1)
return reports
def print_credentials(self):
"""Print the LLM provider credentials and configuration"""
report_title = f"{Style.BRIGHT}Scanning LLM:{Style.RESET_ALL}"
report_lines = [
f"Target LLM: {Fore.YELLOW}{self.model}{Style.RESET_ALL}",
]
if self.plugins:
report_lines.append(
f"Plugins: {Fore.YELLOW}{', '.join(self.plugins)}{Style.RESET_ALL}"
)
if self.config_path:
report_lines.append(
f"Config file: {Fore.YELLOW}{self.config_path}{Style.RESET_ALL}"
)
else:
report_lines.append("Using promptfoo default configuration")
report_lines.append(
f"Max concurrency: {Fore.YELLOW}{self.max_concurrency}{Style.RESET_ALL}"
)
print_boxes(report_lines, report_title)

View File

@@ -0,0 +1,27 @@
from prowler.config.config import output_file_timestamp
from prowler.providers.common.models import ProviderOutputOptions
class LLMOutputOptions(ProviderOutputOptions):
"""
LLMOutputOptions overrides ProviderOutputOptions for LLM-specific output logic.
For example, generating a filename that includes the LLM tenant_id.
Attributes inherited from ProviderOutputOptions:
- output_filename (str): The base filename used for generated reports.
- output_directory (str): The directory to store the output files.
- ... see ProviderOutputOptions for more details.
Methods:
- __init__: Customizes the output filename logic for LLM.
"""
def __init__(self, arguments, bulk_checks_metadata):
super().__init__(arguments, bulk_checks_metadata)
# If --output-filename is not specified, build a default name.
if not getattr(arguments, "output_filename", None):
self.output_filename = f"prowler-output-llm-{output_file_timestamp}"
# If --output-filename was explicitly given, respect that
else:
self.output_filename = arguments.output_filename