refactor(mcp): standardize Prowler Hub and Docs tools format for AI optimization (#9578)

This commit is contained in:
Rubén De la Torre Vico
2025-12-17 17:19:32 +01:00
committed by GitHub
parent 13ec7c13b9
commit 597364fb09
6 changed files with 552 additions and 287 deletions

View File

@@ -84,12 +84,13 @@ Tools for managing finding muting, including pattern-based bulk muting (mutelist
Access Prowler's security check catalog and compliance frameworks. **No authentication required.**
### Check Discovery
Tools follow a **two-tier pattern**: lightweight listing for browsing + detailed retrieval for complete information.
- **`prowler_hub_get_checks`** - List security checks with advanced filtering options
- **`prowler_hub_get_check_filters`** - Return available filter values for checks (providers, services, severities, categories, compliances)
- **`prowler_hub_search_checks`** - Full-text search across check metadata
- **`prowler_hub_get_check_raw_metadata`** - Fetch raw check metadata in JSON format
### Check Discovery and Details
- **`prowler_hub_list_checks`** - List security checks with lightweight data (id, title, severity, provider) and advanced filtering options
- **`prowler_hub_semantic_search_checks`** - Full-text search across check metadata with lightweight results
- **`prowler_hub_get_check_details`** - Get comprehensive details for a specific check including risk, remediation guidance, and compliance mappings
### Check Code
@@ -98,20 +99,21 @@ Access Prowler's security check catalog and compliance frameworks. **No authenti
### Compliance Frameworks
- **`prowler_hub_get_compliance_frameworks`** - List and filter compliance frameworks
- **`prowler_hub_search_compliance_frameworks`** - Full-text search across compliance frameworks
- **`prowler_hub_list_compliances`** - List compliance frameworks with lightweight data (id, name, provider) and filtering options
- **`prowler_hub_semantic_search_compliances`** - Full-text search across compliance frameworks with lightweight results
- **`prowler_hub_get_compliance_details`** - Get comprehensive compliance details including requirements and mapped checks
### Provider Information
### Providers Information
- **`prowler_hub_list_providers`** - List Prowler official providers and their services
- **`prowler_hub_get_artifacts_count`** - Get total count of checks and frameworks in Prowler Hub
- **`prowler_hub_list_providers`** - List Prowler official providers
- **`prowler_hub_get_provider_services`** - Get available services for a specific provider
## Prowler Documentation Tools
Search and access official Prowler documentation. **No authentication required.**
- **`prowler_docs_search`** - Search the official Prowler documentation using full-text search
- **`prowler_docs_get_document`** - Retrieve the full markdown content of a specific documentation file
- **`prowler_docs_search`** - Search the official Prowler documentation using full-text search with the `term` parameter
- **`prowler_docs_get_document`** - Retrieve the full markdown content of a specific documentation file using the path from search results
## Usage Tips

View File

@@ -2,11 +2,12 @@
All notable changes to the **Prowler MCP Server** are documented in this file.
## [0.2.1] (UNRELEASED)
## [0.3.0] (UNRELEASED)
### Changed
- Update API base URL environment variable to include complete path [(#9542)](https://github.com/prowler-cloud/prowler/pull/9300)
- Update API base URL environment variable to include complete path [(#9542)](https://github.com/prowler-cloud/prowler/pull/9542)
- Standardize Prowler Hub and Docs tools format for AI optimization [(#9578)](https://github.com/prowler-cloud/prowler/pull/9578)
## [0.2.0] (Prowler v5.15.0)

View File

@@ -22,7 +22,7 @@ Access to Prowler's comprehensive security knowledge base:
- **Check Implementation**: View the Python code that powers each security check
- **Automated Fixers**: Access remediation scripts for common security issues
- **Compliance Frameworks**: Explore mappings to **over 70 compliance standards and frameworks**
- **Provider Services**: View available services and checks for each cloud provider
- **Provider Services**: View available services and checks for all supported Prowler providers
### Prowler Documentation

View File

@@ -1,5 +1,3 @@
from typing import List, Optional
import httpx
from prowler_mcp_server import __version__
from pydantic import BaseModel, Field
@@ -11,7 +9,7 @@ class SearchResult(BaseModel):
path: str = Field(description="Document path")
title: str = Field(description="Document title")
url: str = Field(description="Documentation URL")
highlights: List[str] = Field(
highlights: list[str] = Field(
description="Highlighted content snippets showing query matches with <mark><b> tags",
default_factory=list,
)
@@ -54,7 +52,7 @@ class ProwlerDocsSearchEngine:
},
)
def search(self, query: str, page_size: int = 5) -> List[SearchResult]:
def search(self, query: str, page_size: int = 5) -> list[SearchResult]:
"""
Search documentation using Mintlify API.
@@ -63,7 +61,7 @@ class ProwlerDocsSearchEngine:
page_size: Maximum number of results to return
Returns:
List of search results
list of search results
"""
try:
# Construct request body
@@ -139,7 +137,7 @@ class ProwlerDocsSearchEngine:
print(f"Search error: {e}")
return []
def get_document(self, doc_path: str) -> Optional[str]:
def get_document(self, doc_path: str) -> str | None:
"""
Get full document content from Mintlify documentation.

View File

@@ -1,6 +1,8 @@
from typing import Any, List
from typing import Any
from fastmcp import FastMCP
from pydantic import Field
from prowler_mcp_server.prowler_documentation.search_engine import (
ProwlerDocsSearchEngine,
)
@@ -12,46 +14,44 @@ prowler_docs_search_engine = ProwlerDocsSearchEngine()
@docs_mcp_server.tool()
def search(
query: str,
page_size: int = 5,
) -> List[dict[str, Any]]:
"""
Search in Prowler documentation.
term: str = Field(description="The term to search for in the documentation"),
page_size: int = Field(
5,
description="Number of top results to return to return. It must be between 1 and 20.",
gt=1,
lt=20,
),
) -> list[dict[str, Any]]:
"""Search in Prowler documentation.
This tool searches through the official Prowler documentation
to find relevant information about security checks, cloud providers,
compliance frameworks, and usage instructions.
to find relevant information about everything related to Prowler.
Uses fulltext search to find the most relevant documentation pages
based on your query.
Args:
query: The search query
page_size: Number of top results to return (default: 5)
Returns:
List of search results with highlights showing matched terms (in <mark><b> tags)
"""
return prowler_docs_search_engine.search(query, page_size)
return prowler_docs_search_engine.search(term, page_size) # type: ignore In the hint we cannot put SearchResult type because JSON API MCP Generator cannot handle Pydantic models yet
@docs_mcp_server.tool()
def get_document(
doc_path: str,
) -> str:
"""
Retrieve the full content of a Prowler documentation file.
doc_path: str = Field(
description="Path to the documentation file to retrieve. It is the same as the 'path' field of the search results. Use `prowler_docs_search` to find the path first."
),
) -> dict[str, str]:
"""Retrieve the full content of a Prowler documentation file.
Use this after searching to get the complete content of a specific
documentation file.
Args:
doc_path: Path to the documentation file. It is the same as the "path" field of the search results.
Returns:
Full content of the documentation file
Full content of the documentation file in markdown format.
"""
content = prowler_docs_search_engine.get_document(doc_path)
content: str | None = prowler_docs_search_engine.get_document(doc_path)
if content is None:
raise ValueError(f"Document not found: {doc_path}")
return content
return {"error": f"Document '{doc_path}' not found."}
else:
return {"content": content}

View File

@@ -4,10 +4,10 @@ Prowler Hub MCP module
Provides access to Prowler Hub API for security checks and compliance frameworks.
"""
from typing import Any, Optional
import httpx
from fastmcp import FastMCP
from pydantic import Field
from prowler_mcp_server import __version__
# Initialize FastMCP for Prowler Hub
@@ -55,109 +55,90 @@ def github_check_path(provider_id: str, check_id: str, suffix: str) -> str:
return f"{GITHUB_RAW_BASE}/{provider_id}/services/{service_id}/{check_id}/{check_id}{suffix}"
@hub_mcp_server.tool()
async def get_check_filters() -> dict[str, Any]:
"""
Get available values for filtering for tool `get_checks`. Recommended to use before calling `get_checks` to get the available values for the filters.
Returns:
Available filter options including providers, types, services, severities,
categories, and compliance frameworks with their respective counts
"""
try:
response = prowler_hub_client.get("/check/filters")
response.raise_for_status()
filters = response.json()
return {"filters": filters}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Security Check Tools
@hub_mcp_server.tool()
async def get_checks(
providers: Optional[str] = None,
types: Optional[str] = None,
services: Optional[str] = None,
severities: Optional[str] = None,
categories: Optional[str] = None,
compliances: Optional[str] = None,
ids: Optional[str] = None,
fields: Optional[str] = "id,service,severity,title,description,risk",
) -> dict[str, Any]:
"""
List security Prowler Checks. The list can be filtered by the parameters defined for the tool.
It is recommended to use the tool `get_check_filters` to get the available values for the filters.
A not filtered request will return more than 1000 checks, so it is recommended to use the filters.
async def list_checks(
providers: list[str] = Field(
default=[],
description="Filter by Prowler provider IDs. Example: ['aws', 'azure']. Use `prowler_hub_list_providers` to get available provider IDs.",
),
services: list[str] = Field(
default=[],
description="Filter by provider services. Example: ['s3', 'ec2', 'keyvault']. Use `prowler_hub_get_provider_services` to get available services for a provider.",
),
severities: list[str] = Field(
default=[],
description="Filter by severity levels. Example: ['high', 'critical']. Available: 'low', 'medium', 'high', 'critical'.",
),
categories: list[str] = Field(
default=[],
description="Filter by security categories. Example: ['encryption', 'internet-exposed'].",
),
compliances: list[str] = Field(
default=[],
description="Filter by compliance framework IDs. Example: ['cis_4.0_aws', 'ens_rd2022_azure']. Use `prowler_hub_list_compliances` to get available compliance IDs.",
),
) -> dict:
"""List security Prowler Checks with filtering capabilities.
Args:
providers: Filter by Prowler provider IDs. Example: "aws,azure". Use the tool `list_providers` to get the available providers IDs.
types: Filter by check types.
services: Filter by provider services IDs. Example: "s3,keyvault". Use the tool `list_providers` to get the available services IDs in a provider.
severities: Filter by severity levels. Example: "medium,high". Available values are "low", "medium", "high", "critical".
categories: Filter by categories. Example: "cluster-security,encryption".
compliances: Filter by compliance framework IDs. Example: "cis_4.0_aws,ens_rd2022_azure".
ids: Filter by specific check IDs. Example: "s3_bucket_level_public_access_block".
fields: Specify which fields from checks metadata to return (id is always included). Example: "id,title,description,risk".
Available values are "id", "title", "description", "provider", "type", "service", "subservice", "severity", "risk", "reference", "remediation", "services_required", "aws_arn_template", "notes", "categories", "default_value", "resource_type", "related_url", "depends_on", "related_to", "fixer".
The default parameters are "id,title,description".
If null, all fields will be returned.
IMPORTANT: This tool returns LIGHTWEIGHT check data. Use this for fast browsing and filtering.
For complete details including risk, remediation guidance, and categories use `prowler_hub_get_check_details`.
IMPORTANT: An unfiltered request returns 1000+ checks. Use filters to narrow results.
Returns:
List of security checks matching the filters. The structure is as follows:
{
"count": N,
"checks": [
{"id": "check_id_1", "title": "check_title_1", "description": "check_description_1", ...},
{"id": "check_id_2", "title": "check_title_2", "description": "check_description_2", ...},
{"id": "check_id_3", "title": "check_title_3", "description": "check_description_3", ...},
{
"id": "check_id",
"provider": "provider_id",
"title": "Human-readable check title",
"severity": "critical|high|medium|low",
},
...
]
}
Useful Example Workflow:
1. Use `prowler_hub_list_providers` to see available Prowler providers
2. Use `prowler_hub_get_provider_services` to see services for a provider
3. Use this tool with filters to find relevant checks
4. Use `prowler_hub_get_check_details` to get complete information for a specific check
"""
params: dict[str, str] = {}
# Lightweight fields for listing
lightweight_fields = "id,title,severity,provider"
params: dict[str, str] = {"fields": lightweight_fields}
if providers:
params["providers"] = providers
if types:
params["types"] = types
params["providers"] = ",".join(providers)
if services:
params["services"] = services
params["services"] = ",".join(services)
if severities:
params["severities"] = severities
params["severities"] = ",".join(severities)
if categories:
params["categories"] = categories
params["categories"] = ",".join(categories)
if compliances:
params["compliances"] = compliances
if ids:
params["ids"] = ids
if fields:
params["fields"] = fields
params["compliances"] = ",".join(compliances)
try:
response = prowler_hub_client.get("/check", params=params)
response.raise_for_status()
checks = response.json()
checks_dict = {}
# Return checks as a lightweight list
checks_list = []
for check in checks:
check_data = {}
# Always include the id field as it's mandatory for the response structure
if "id" in check:
check_data["id"] = check["id"]
check_data = {
"id": check["id"],
"provider": check["provider"],
"title": check["title"],
"severity": check["severity"],
}
checks_list.append(check_data)
# Include other requested fields
for field in fields.split(","):
if field != "id" and field in check: # Skip id since it's already added
check_data[field] = check[field]
checks_dict[check["id"]] = check_data
return {"count": len(checks), "checks": checks_dict}
return {"count": len(checks), "checks": checks_list}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
@@ -167,60 +148,220 @@ async def get_checks(
@hub_mcp_server.tool()
async def get_check_raw_metadata(
provider_id: str,
check_id: str,
) -> dict[str, Any]:
"""
Fetch the raw check metadata JSON, this is a low level version of the tool `get_checks`.
It is recommended to use the tool `get_checks` filtering about the `ids` parameter instead of using this tool.
async def semantic_search_checks(
term: str = Field(
description="Search term. Examples: 'public access', 'encryption', 'MFA', 'logging'.",
),
) -> dict:
"""Search for security checks using free-text search across all metadata.
Args:
provider_id: Prowler provider ID (e.g., "aws", "azure").
check_id: Prowler check ID (folder and base filename).
IMPORTANT: This tool returns LIGHTWEIGHT check data. Use this for discovering checks by topic.
For complete details including risk, remediation guidance, and categories use `prowler_hub_get_check_details`.
Searches across check titles, descriptions, risk statements, remediation guidance,
and other text fields. Use this when you don't know the exact check ID or want to
explore checks related to a topic.
Returns:
Raw metadata JSON as stored in Prowler.
"""
if provider_id and check_id:
url = github_check_path(provider_id, check_id, ".metadata.json")
try:
resp = github_raw_client.get(url)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {
"error": f"Check {check_id} not found in Prowler",
}
else:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {
"error": f"Error fetching check {check_id} from Prowler: {str(e)}",
}
else:
return {
"error": "Provider ID and check ID are required",
{
"count": N,
"checks": [
{
"id": "check_id",
"provider": "provider_id",
"title": "Human-readable check title",
"severity": "critical|high|medium|low",
},
...
]
}
Useful Example Workflow:
1. Use this tool to search for checks by keyword or topic
2. Use `prowler_hub_list_checks` with filters for more targeted browsing
3. Use `prowler_hub_get_check_details` to get complete information for a specific check
"""
try:
response = prowler_hub_client.get("/check/search", params={"term": term})
response.raise_for_status()
checks = response.json()
# Return checks as a lightweight list
checks_list = []
for check in checks:
check_data = {
"id": check["id"],
"provider": check["provider"],
"title": check["title"],
"severity": check["severity"],
}
checks_list.append(check_data)
return {"count": len(checks), "checks": checks_list}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
@hub_mcp_server.tool()
async def get_check_details(
check_id: str = Field(
description="The check ID to retrieve details for. Example: 's3_bucket_level_public_access_block'"
),
) -> dict:
"""Retrieve comprehensive details about a specific security check by its ID.
IMPORTANT: This tool returns COMPLETE check details.
Use this after finding a specific check ID, you can get it via `prowler_hub_list_checks` or `prowler_hub_semantic_search_checks`.
Returns:
{
"id": "string",
"title": "string",
"description": "string",
"provider": "string",
"service": "string",
"severity": "low",
"risk": "string",
"reference": [
"string"
],
"additional_urls": [
"string"
],
"remediation": {
"cli": {
"description": "string"
},
"terraform": {
"description": "string"
},
"nativeiac": {
"description": "string"
},
"other": {
"description": "string"
},
"wui": {
"description": "string",
"reference": "string"
}
},
"services_required": [
"string"
],
"notes": "string",
"compliances": [
{
"name": "string",
"id": "string"
}
],
"categories": [
"string"
],
"resource_type": "string",
"related_url": "string",
"fixer": bool
}
Useful Example Workflow:
1. Use `prowler_hub_list_checks` or `prowler_hub_search_checks` to find check IDs
2. Use this tool with the check 'id' to get complete information including remediation guidance
"""
try:
response = prowler_hub_client.get(f"/check/{check_id}")
response.raise_for_status()
check = response.json()
if not check:
return {"error": f"Check '{check_id}' not found"}
# Build response with only non-empty fields to save tokens
result = {}
# Core fields
result["id"] = check["id"]
if check.get("title"):
result["title"] = check["title"]
if check.get("description"):
result["description"] = check["description"]
if check.get("provider"):
result["provider"] = check["provider"]
if check.get("service"):
result["service"] = check["service"]
if check.get("severity"):
result["severity"] = check["severity"]
if check.get("risk"):
result["risk"] = check["risk"]
if check.get("resource_type"):
result["resource_type"] = check["resource_type"]
# List fields
if check.get("reference"):
result["reference"] = check["reference"]
if check.get("additional_urls"):
result["additional_urls"] = check["additional_urls"]
if check.get("services_required"):
result["services_required"] = check["services_required"]
if check.get("categories"):
result["categories"] = check["categories"]
if check.get("compliances"):
result["compliances"] = check["compliances"]
# Other fields
if check.get("notes"):
result["notes"] = check["notes"]
if check.get("related_url"):
result["related_url"] = check["related_url"]
if check.get("fixer") is not None:
result["fixer"] = check["fixer"]
# Remediation - filter out empty nested values
remediation = check.get("remediation", {})
if remediation:
filtered_remediation = {}
for key, value in remediation.items():
if value and isinstance(value, dict):
# Filter out empty values within nested dict
filtered_value = {k: v for k, v in value.items() if v}
if filtered_value:
filtered_remediation[key] = filtered_value
elif value:
filtered_remediation[key] = value
if filtered_remediation:
result["remediation"] = filtered_remediation
return result
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
@hub_mcp_server.tool()
async def get_check_code(
provider_id: str,
check_id: str,
) -> dict[str, Any]:
"""
Fetch the check implementation Python code from Prowler.
provider_id: str = Field(
description="Prowler Provider ID. Example: 'aws', 'azure', 'gcp', 'kubernetes'. Use `prowler_hub_list_providers` to get available provider IDs.",
),
check_id: str = Field(
description="The check ID. Example: 's3_bucket_public_access'. Get IDs from `prowler_hub_list_checks` or `prowler_hub_search_checks`.",
),
) -> dict:
"""Fetch the Python implementation code of a Prowler security check.
Args:
provider_id: Prowler provider ID (e.g., "aws", "azure").
check_id: Prowler check ID (e.g., "opensearch_service_domains_not_publicly_accessible").
The check code shows exactly how Prowler evaluates resources for security issues.
Use this to understand check logic, customize checks, or create new ones.
Returns:
Dict with the code content as text.
{
"content": "Python source code of the check implementation"
}
"""
if provider_id and check_id:
url = github_check_path(provider_id, check_id, ".py")
@@ -251,18 +392,29 @@ async def get_check_code(
@hub_mcp_server.tool()
async def get_check_fixer(
provider_id: str,
check_id: str,
) -> dict[str, Any]:
"""
Fetch the check fixer Python code from Prowler, if it exists.
provider_id: str = Field(
description="Prowler Provider ID. Example: 'aws', 'azure', 'gcp', 'kubernetes'. Use `prowler_hub_list_providers` to get available provider IDs.",
),
check_id: str = Field(
description="The check ID. Example: 's3_bucket_public_access'. Get IDs from `prowler_hub_list_checks` or `prowler_hub_search_checks`.",
),
) -> dict:
"""Fetch the auto-remediation (fixer) code for a Prowler security check.
Args:
provider_id: Prowler provider ID (e.g., "aws", "azure").
check_id: Prowler check ID (e.g., "opensearch_service_domains_not_publicly_accessible").
IMPORTANT: Not all checks have fixers. A "fixer not found" response means the check
doesn't have auto-remediation code - this is normal for many checks.
Fixer code provides automated remediation that can fix security issues detected by checks.
Use this to understand how to programmatically remediate findings.
Returns:
Dict with fixer content as text if present, existence flag.
{
"content": "Python source code of the auto-remediation implementation"
}
Or if no fixer exists:
{
"error": "Fixer not found for check {check_id}"
}
"""
if provider_id and check_id:
url = github_check_path(provider_id, check_id, "_fixer.py")
@@ -295,95 +447,66 @@ async def get_check_fixer(
}
@hub_mcp_server.tool()
async def search_checks(term: str) -> dict[str, Any]:
"""
Search the term across all text properties of check metadata.
Args:
term: Search term to find in check titles, descriptions, and other text fields
Returns:
List of checks matching the search term
"""
try:
response = prowler_hub_client.get("/check/search", params={"term": term})
response.raise_for_status()
checks = response.json()
return {
"count": len(checks),
"checks": checks,
}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Compliance Framework Tools
@hub_mcp_server.tool()
async def get_compliance_frameworks(
provider: Optional[str] = None,
fields: Optional[
str
] = "id,framework,provider,description,total_checks,total_requirements",
) -> dict[str, Any]:
"""
List and filter compliance frameworks. The list can be filtered by the parameters defined for the tool.
async def list_compliances(
provider: list[str] = Field(
default=[],
description="Filter by cloud provider. Example: ['aws']. Use `prowler_hub_list_providers` to get available provider IDs.",
),
) -> dict:
"""List compliance frameworks supported by Prowler.
Args:
provider: Filter by one Prowler provider ID. Example: "aws". Use the tool `list_providers` to get the available providers IDs.
fields: Specify which fields to return (id is always included). Example: "id,provider,description,version".
It is recommended to run with the default parameters because the full response is too large.
Available values are "id", "framework", "provider", "description", "total_checks", "total_requirements", "created_at", "updated_at".
The default parameters are "id,framework,provider,description,total_checks,total_requirements".
If null, all fields will be returned.
IMPORTANT: This tool returns LIGHTWEIGHT compliance data. Use this for fast browsing and filtering.
For complete details including requirements use `prowler_hub_get_compliance_details`.
Compliance frameworks define sets of security requirements that checks map to.
Use this to discover available frameworks for compliance reporting.
WARNING: An unfiltered request may return a large number of frameworks. Use the provider with not more than 3 different providers to make easier the response handling.
Returns:
List of compliance frameworks. The structure is as follows:
{
"count": N,
"frameworks": {
"framework_id": {
"id": "framework_id",
"provider": "provider_id",
"description": "framework_description",
"version": "framework_version"
}
}
"compliances": [
{
"id": "cis_4.0_aws",
"name": "CIS Amazon Web Services Foundations Benchmark v4.0",
"provider": "aws",
},
...
]
}
Useful Example Workflow:
1. Use `prowler_hub_list_providers` to see available cloud providers
2. Use this tool to browse compliance frameworks
3. Use `prowler_hub_get_compliance_details` with the compliance 'id' to get complete information
"""
params = {}
# Lightweight fields for listing
lightweight_fields = "id,name,provider"
params: dict[str, str] = {"fields": lightweight_fields}
if provider:
params["provider"] = provider
if fields:
params["fields"] = fields
params["provider"] = ",".join(provider)
try:
response = prowler_hub_client.get("/compliance", params=params)
response.raise_for_status()
frameworks = response.json()
compliances = response.json()
frameworks_dict = {}
for framework in frameworks:
framework_data = {}
# Always include the id field as it's mandatory for the response structure
if "id" in framework:
framework_data["id"] = framework["id"]
# Return compliances as a lightweight list
compliances_list = []
for compliance in compliances:
compliance_data = {
"id": compliance["id"],
"name": compliance["name"],
"provider": compliance["provider"],
}
compliances_list.append(compliance_data)
# Include other requested fields
for field in fields.split(","):
if (
field != "id" and field in framework
): # Skip id since it's already added
framework_data[field] = framework[field]
frameworks_dict[framework["id"]] = framework_data
return {"count": len(frameworks), "frameworks": frameworks_dict}
return {"count": len(compliances), "compliances": compliances_list}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
@@ -393,26 +516,48 @@ async def get_compliance_frameworks(
@hub_mcp_server.tool()
async def search_compliance_frameworks(term: str) -> dict[str, Any]:
"""
Search compliance frameworks by term.
async def semantic_search_compliances(
term: str = Field(
description="Search term. Examples: 'CIS', 'HIPAA', 'PCI', 'GDPR', 'SOC2', 'NIST'.",
),
) -> dict:
"""Search for compliance frameworks using free-text search.
Args:
term: Search term to find in framework names and descriptions
IMPORTANT: This tool returns LIGHTWEIGHT compliance data. Use this for discovering frameworks by topic.
For complete details including requirements use `prowler_hub_get_compliance_details`.
Searches across framework names, descriptions, and metadata. Use this when you
want to find frameworks related to a specific regulation, standard, or topic.
Returns:
List of compliance frameworks matching the search term
{
"count": N,
"compliances": [
{
"id": "cis_4.0_aws",
"name": "CIS Amazon Web Services Foundations Benchmark v4.0",
"provider": "aws",
},
...
]
}
"""
try:
response = prowler_hub_client.get("/compliance/search", params={"term": term})
response.raise_for_status()
frameworks = response.json()
compliances = response.json()
return {
"count": len(frameworks),
"search_term": term,
"frameworks": frameworks,
}
# Return compliances as a lightweight list
compliances_list = []
for compliance in compliances:
compliance_data = {
"id": compliance["id"],
"name": compliance["name"],
"provider": compliance["provider"],
}
compliances_list.append(compliance_data)
return {"count": len(compliances), "compliances": compliances_list}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
@@ -421,22 +566,121 @@ async def search_compliance_frameworks(term: str) -> dict[str, Any]:
return {"error": str(e)}
@hub_mcp_server.tool()
async def get_compliance_details(
compliance_id: str = Field(
description="The compliance framework ID to retrieve details for. Example: 'cis_4.0_aws'. Use `prowler_hub_list_compliances` or `prowler_hub_semantic_search_compliances` to find available compliance IDs.",
),
) -> dict:
"""Retrieve comprehensive details about a specific compliance framework by its ID.
IMPORTANT: This tool returns COMPLETE compliance details.
Use this after finding a specific compliance via `prowler_hub_list_compliances` or `prowler_hub_semantic_search_compliances`.
Returns:
{
"id": "string",
"name": "string",
"framework": "string",
"provider": "string",
"version": "string",
"description": "string",
"total_checks": int,
"total_requirements": int,
"requirements": [
{
"id": "string",
"name": "string",
"description": "string",
"checks": ["check_id_1", "check_id_2"]
}
]
}
"""
try:
response = prowler_hub_client.get(f"/compliance/{compliance_id}")
response.raise_for_status()
compliance = response.json()
if not compliance:
return {"error": f"Compliance '{compliance_id}' not found"}
# Build response with only non-empty fields to save tokens
result = {}
# Core fields
result["id"] = compliance["id"]
if compliance.get("name"):
result["name"] = compliance["name"]
if compliance.get("framework"):
result["framework"] = compliance["framework"]
if compliance.get("provider"):
result["provider"] = compliance["provider"]
if compliance.get("version"):
result["version"] = compliance["version"]
if compliance.get("description"):
result["description"] = compliance["description"]
# Numeric fields
if compliance.get("total_checks"):
result["total_checks"] = compliance["total_checks"]
if compliance.get("total_requirements"):
result["total_requirements"] = compliance["total_requirements"]
# Requirements - filter out empty nested values
requirements = compliance.get("requirements", [])
if requirements:
filtered_requirements = []
for req in requirements:
filtered_req = {}
if req.get("id"):
filtered_req["id"] = req["id"]
if req.get("name"):
filtered_req["name"] = req["name"]
if req.get("description"):
filtered_req["description"] = req["description"]
if req.get("checks"):
filtered_req["checks"] = req["checks"]
if filtered_req:
filtered_requirements.append(filtered_req)
if filtered_requirements:
result["requirements"] = filtered_requirements
return result
except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return {"error": f"Compliance '{compliance_id}' not found"}
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
}
except Exception as e:
return {"error": str(e)}
# Provider Tools
@hub_mcp_server.tool()
async def list_providers() -> dict[str, Any]:
"""
Get all available Prowler providers and their associated services.
async def list_providers() -> dict:
"""List all providers supported by Prowler.
This is a reference tool that shows available providers (aws, azure, gcp, kubernetes, etc.)
that can be scanned for finding security issues.
Use the provider IDs from this tool as filter values in other tools.
Returns:
List of Prowler providers with their associated services. The structure is as follows:
{
"count": N,
"providers": {
"provider_id": {
"name": "provider_name",
"services": ["service_id_1", "service_id_2", "service_id_3", ...]
}
}
"providers": [
{
"id": "aws",
"name": "Amazon Web Services"
},
{
"id": "azure",
"name": "Microsoft Azure"
},
...
]
}
"""
try:
@@ -444,14 +688,16 @@ async def list_providers() -> dict[str, Any]:
response.raise_for_status()
providers = response.json()
providers_dict = {}
providers_list = []
for provider in providers:
providers_dict[provider["id"]] = {
"name": provider.get("name", ""),
"services": provider.get("services", []),
}
providers_list.append(
{
"id": provider["id"],
"name": provider.get("name", ""),
}
)
return {"count": len(providers), "providers": providers_dict}
return {"count": len(providers), "providers": providers_list}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",
@@ -460,24 +706,42 @@ async def list_providers() -> dict[str, Any]:
return {"error": str(e)}
# Analytics Tools
@hub_mcp_server.tool()
async def get_artifacts_count() -> dict[str, Any]:
"""
Get total count of security artifacts (checks + compliance frameworks).
async def get_provider_services(
provider_id: str = Field(
description="The provider ID to get services for. Example: 'aws', 'azure', 'gcp', 'kubernetes'. Use `prowler_hub_list_providers` to get available provider IDs.",
),
) -> dict:
"""Get the list of services IDs available for a specific cloud provider.
Services represent the different resources and capabilities that Prowler can scan
within a provider (e.g., s3, ec2, iam for AWS or keyvault, storage for Azure).
Use service IDs from this tool as filter values in other tools.
Returns:
Total number of artifacts in the Prowler Hub.
{
"provider_id": "aws",
"provider_name": "Amazon Web Services",
"count": N,
"services": ["s3", "ec2", "iam", "rds", "lambda", ...]
}
"""
try:
response = prowler_hub_client.get("/n_artifacts")
response = prowler_hub_client.get("/providers")
response.raise_for_status()
data = response.json()
providers = response.json()
return {
"total_artifacts": data.get("n", 0),
"details": "Total count includes both security checks and compliance frameworks",
}
for provider in providers:
if provider["id"] == provider_id:
return {
"provider_id": provider["id"],
"provider_name": provider.get("name", ""),
"count": len(provider.get("services", [])),
"services": provider.get("services", []),
}
return {"error": f"Provider '{provider_id}' not found"}
except httpx.HTTPStatusError as e:
return {
"error": f"HTTP error {e.response.status_code}: {e.response.text}",